From 89519343a7f1f9b66362e32dadc84b3add0c8dcd Mon Sep 17 00:00:00 2001 From: Josh Slocum Date: Tue, 18 Oct 2022 14:13:35 -0500 Subject: [PATCH] adding new bg read metrics to client and refactoring them into a new trace event (#8493) --- fdbclient/BlobGranuleFiles.cpp | 62 +++++++---- fdbclient/BlobGranuleReader.actor.cpp | 4 +- fdbclient/NativeAPI.actor.cpp | 103 +++++++++++------- .../include/fdbclient/BlobGranuleCommon.h | 10 +- .../include/fdbclient/BlobGranuleFiles.h | 3 +- fdbclient/include/fdbclient/DatabaseContext.h | 12 +- 6 files changed, 131 insertions(+), 63 deletions(-) diff --git a/fdbclient/BlobGranuleFiles.cpp b/fdbclient/BlobGranuleFiles.cpp index 3747824437..66be4c1462 100644 --- a/fdbclient/BlobGranuleFiles.cpp +++ b/fdbclient/BlobGranuleFiles.cpp @@ -1324,7 +1324,8 @@ typedef std::priority_queue, Order static RangeResult mergeDeltaStreams(const BlobGranuleChunkRef& chunk, const std::vector>>& streams, - const std::vector startClears) { + const std::vector startClears, + GranuleMaterializeStats& stats) { ASSERT(streams.size() < std::numeric_limits::max()); ASSERT(startClears.size() == streams.size()); @@ -1357,6 +1358,10 @@ static RangeResult mergeDeltaStreams(const BlobGranuleChunkRef& chunk, } } + if (chunk.snapshotFile.present()) { + stats.snapshotRows += streams[0].size(); + } + RangeResult result; std::vector cur; cur.reserve(streams.size()); @@ -1373,6 +1378,7 @@ static RangeResult mergeDeltaStreams(const BlobGranuleChunkRef& chunk, // un-set clears and find latest value for key (if present) bool foundValue = false; + bool includesSnapshot = cur.back().streamIdx == 0 && chunk.snapshotFile.present(); for (auto& it : cur) { auto& v = streams[it.streamIdx][it.dataIdx]; if (clearActive[it.streamIdx]) { @@ -1392,6 +1398,13 @@ static RangeResult mergeDeltaStreams(const BlobGranuleChunkRef& chunk, KeyRef finalKey = chunk.tenantPrefix.present() ? v.key.removePrefix(chunk.tenantPrefix.get()) : v.key; result.push_back_deep(result.arena(), KeyValueRef(finalKey, v.value)); + if (!includesSnapshot) { + stats.rowsInserted++; + } else if (it.streamIdx > 0) { + stats.rowsUpdated++; + } + } else if (includesSnapshot) { + stats.rowsCleared++; } } } @@ -1413,6 +1426,8 @@ static RangeResult mergeDeltaStreams(const BlobGranuleChunkRef& chunk, } } + stats.outputBytes += result.expectedSize(); + return result; } @@ -1421,7 +1436,8 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk, Version beginVersion, Version readVersion, Optional snapshotData, - StringRef deltaFileData[]) { + StringRef deltaFileData[], + GranuleMaterializeStats& stats) { // TODO REMOVE with early replying ASSERT(readVersion == chunk.includedVersion); @@ -1444,6 +1460,7 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk, streams.reserve(chunk.deltaFiles.size() + 2); if (snapshotData.present()) { + stats.inputBytes += snapshotData.get().size(); ASSERT(chunk.snapshotFile.present()); Standalone> snapshotRows = loadSnapshotFile(chunk.snapshotFile.get().filename, @@ -1461,6 +1478,7 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk, fmt::print("Applying {} delta files\n", chunk.deltaFiles.size()); } for (int deltaIdx = 0; deltaIdx < chunk.deltaFiles.size(); deltaIdx++) { + stats.inputBytes += deltaFileData[deltaIdx].size(); bool startClear = false; auto deltaRows = loadChunkedDeltaFile(chunk.deltaFiles[deltaIdx].filename, deltaFileData[deltaIdx], @@ -1480,6 +1498,7 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk, fmt::print("Applying {} memory deltas\n", chunk.newDeltas.size()); } if (!chunk.newDeltas.empty()) { + stats.inputBytes += chunk.newDeltas.expectedSize(); // TODO REMOVE validation ASSERT(beginVersion <= chunk.newDeltas.front().version); ASSERT(readVersion >= chunk.newDeltas.back().version); @@ -1491,7 +1510,7 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk, } } - return mergeDeltaStreams(chunk, streams, startClears); + return mergeDeltaStreams(chunk, streams, startClears, stats); } struct GranuleLoadFreeHandle : NonCopyable, ReferenceCounted { @@ -1560,8 +1579,6 @@ ErrorOr loadAndMaterializeBlobGranules(const Standalone 1 @@ -1586,7 +1603,6 @@ ErrorOr loadAndMaterializeBlobGranules(const Standalone(blob_granule_file_load_error()); } - inputBytes += snapshotData.get().size(); } // +1 to avoid UBSAN variable length array of size zero @@ -1599,16 +1615,11 @@ ErrorOr loadAndMaterializeBlobGranules(const Standalone(blob_granule_file_load_error()); } - inputBytes += deltaData[i].size(); } - inputBytes += files[chunkIdx].newDeltas.expectedSize(); - // materialize rows from chunk - chunkRows = - materializeBlobGranule(files[chunkIdx], keyRange, beginVersion, readVersion, snapshotData, deltaData); - - outputBytes += chunkRows.expectedSize(); + chunkRows = materializeBlobGranule( + files[chunkIdx], keyRange, beginVersion, readVersion, snapshotData, deltaData, stats); results.arena().dependsOn(chunkRows.arena()); results.append(results.arena(), chunkRows.begin(), chunkRows.size()); @@ -1616,8 +1627,6 @@ ErrorOr loadAndMaterializeBlobGranules(const Standalone(results); } catch (Error& e) { return ErrorOr(e); @@ -2303,6 +2312,7 @@ void checkDeltaRead(const KeyValueGen& kvGen, // expected answer std::map expectedData; Version lastFileEndVersion = 0; + GranuleMaterializeStats stats; fmt::print("Delta Read [{0} - {1}) @ {2} - {3}\n", range.begin.printable(), @@ -2322,7 +2332,7 @@ void checkDeltaRead(const KeyValueGen& kvGen, chunk.includedVersion = readVersion; chunk.snapshotVersion = invalidVersion; - RangeResult actualData = materializeBlobGranule(chunk, range, beginVersion, readVersion, {}, serialized); + RangeResult actualData = materializeBlobGranule(chunk, range, beginVersion, readVersion, {}, serialized, stats); if (expectedData.size() != actualData.size()) { fmt::print("Expected Data {0}:\n", expectedData.size()); @@ -2430,6 +2440,7 @@ void checkGranuleRead(const KeyValueGen& kvGen, } Version lastFileEndVersion = 0; applyDeltasByVersion(deltaData, range, beginVersion, readVersion, lastFileEndVersion, expectedData); + GranuleMaterializeStats stats; // actual answer Standalone chunk; @@ -2477,7 +2488,8 @@ void checkGranuleRead(const KeyValueGen& kvGen, if (beginVersion == 0) { snapshotPtr = serializedSnapshot; } - RangeResult actualData = materializeBlobGranule(chunk, range, beginVersion, readVersion, snapshotPtr, deltaPtrs); + RangeResult actualData = + materializeBlobGranule(chunk, range, beginVersion, readVersion, snapshotPtr, deltaPtrs, stats); if (expectedData.size() != actualData.size()) { fmt::print("Expected Size {0} != Actual Size {1}\n", expectedData.size(), actualData.size()); @@ -2822,6 +2834,7 @@ std::pair doReadBench(const FileSet& fileSet, Version readVersion = std::get<1>(fileSet.deltaFiles.back()); Standalone chunk; + GranuleMaterializeStats stats; StringRef deltaPtrs[fileSet.deltaFiles.size()]; MutationRef clearAllAtEndMutation; @@ -2875,14 +2888,25 @@ std::pair doReadBench(const FileSet& fileSet, } serializedBytes += actualData.expectedSize(); } else { - RangeResult actualData = - materializeBlobGranule(chunk, readRange, 0, readVersion, std::get<2>(fileSet.snapshotFile), deltaPtrs); + RangeResult actualData = materializeBlobGranule( + chunk, readRange, 0, readVersion, std::get<2>(fileSet.snapshotFile), deltaPtrs, stats); serializedBytes += actualData.expectedSize(); } } elapsed += timer_monotonic(); elapsed /= READ_RUNS; serializedBytes /= READ_RUNS; + + // TODO REMOVE + fmt::print("Materialize stats:\n"); + fmt::print(" Input bytes: {0}\n", stats.inputBytes); + fmt::print(" Output bytes: {0}\n", stats.outputBytes); + fmt::print(" Write Amp: {0}\n", (1.0 * stats.inputBytes) / stats.outputBytes); + fmt::print(" Snapshot Rows: {0}\n", stats.snapshotRows); + fmt::print(" Rows Cleared: {0}\n", stats.rowsCleared); + fmt::print(" Rows Inserted: {0}\n", stats.rowsInserted); + fmt::print(" Rows Updated: {0}\n", stats.rowsUpdated); + return { serializedBytes, elapsed }; } diff --git a/fdbclient/BlobGranuleReader.actor.cpp b/fdbclient/BlobGranuleReader.actor.cpp index 583da353f7..9ba1ccffdb 100644 --- a/fdbclient/BlobGranuleReader.actor.cpp +++ b/fdbclient/BlobGranuleReader.actor.cpp @@ -105,7 +105,9 @@ ACTOR Future readBlobGranule(BlobGranuleChunkRef chunk, arena.dependsOn(data.arena()); } - return materializeBlobGranule(chunk, keyRange, beginVersion, readVersion, snapshotData, deltaData); + // TODO do something useful with stats? + GranuleMaterializeStats stats; + return materializeBlobGranule(chunk, keyRange, beginVersion, readVersion, snapshotData, deltaData, stats); } catch (Error& e) { throw e; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 16a32097e5..bf5483f82c 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -689,25 +689,8 @@ ACTOR Future databaseLogger(DatabaseContext* cx) { .detail("MedianBytesPerCommit", cx->bytesPerCommit.median()) .detail("MaxBytesPerCommit", cx->bytesPerCommit.max()) .detail("NumLocalityCacheEntries", cx->locationCache.size()); - if (cx->anyBlobGranuleRequests) { - ev.detail("MeanBGLatency", cx->bgLatencies.mean()) - .detail("MedianBGLatency", cx->bgLatencies.median()) - .detail("MaxBGLatency", cx->bgLatencies.max()) - .detail("MeanBGGranulesPerRequest", cx->bgGranulesPerRequest.mean()) - .detail("MedianBGGranulesPerRequest", cx->bgGranulesPerRequest.median()) - .detail("MaxBGGranulesPerRequest", cx->bgGranulesPerRequest.max()); - } } - cx->latencies.clear(); - cx->readLatencies.clear(); - cx->GRVLatencies.clear(); - cx->commitLatencies.clear(); - cx->mutationsPerCommit.clear(); - cx->bytesPerCommit.clear(); - cx->bgLatencies.clear(); - cx->bgGranulesPerRequest.clear(); - if (cx->usedAnyChangeFeeds && logTraces) { TraceEvent feedEv("ChangeFeedClientMetrics", cx->dbId); @@ -721,6 +704,37 @@ ACTOR Future databaseLogger(DatabaseContext* cx) { cx->ccFeed.logToTraceEvent(feedEv); } + if (cx->anyBGReads && logTraces) { + TraceEvent bgReadEv("BlobGranuleReadMetrics", cx->dbId); + + bgReadEv.detail("Elapsed", (lastLogged == 0) ? 0 : now() - lastLogged) + .detail("Cluster", + cx->getConnectionRecord() + ? cx->getConnectionRecord()->getConnectionString().clusterKeyName().toString() + : "") + .detail("Internal", cx->internal); + + // add counters + cx->ccBG.logToTraceEvent(bgReadEv); + + // add latencies + bgReadEv.detail("MeanBGLatency", cx->bgLatencies.mean()) + .detail("MedianBGLatency", cx->bgLatencies.median()) + .detail("MaxBGLatency", cx->bgLatencies.max()) + .detail("MeanBGGranulesPerRequest", cx->bgGranulesPerRequest.mean()) + .detail("MedianBGGranulesPerRequest", cx->bgGranulesPerRequest.median()) + .detail("MaxBGGranulesPerRequest", cx->bgGranulesPerRequest.max()); + } + + cx->latencies.clear(); + cx->readLatencies.clear(); + cx->GRVLatencies.clear(); + cx->commitLatencies.clear(); + cx->mutationsPerCommit.clear(); + cx->bytesPerCommit.clear(); + cx->bgLatencies.clear(); + cx->bgGranulesPerRequest.clear(); + lastLogged = now(); } } @@ -1526,17 +1540,21 @@ DatabaseContext::DatabaseContext(ReferenceSHARD_STAT_SMOOTH_AMOUNT), + transactionCommitVersionNotFoundForSS("CommitVersionNotFoundForSS", cc), anyBGReads(false), + ccBG("BlobGranuleReadMetrics"), bgReadInputBytes("BGReadInputBytes", ccBG), + bgReadOutputBytes("BGReadOutputBytes", ccBG), bgReadSnapshotRows("BGReadSnapshotRows", ccBG), + bgReadRowsCleared("BGReadRowsCleared", ccBG), bgReadRowsInserted("BGReadRowsInserted", ccBG), + bgReadRowsUpdated("BGReadRowsUpdated", ccBG), bgLatencies(1000), bgGranulesPerRequest(1000), + usedAnyChangeFeeds(false), ccFeed("ChangeFeedClientMetrics"), feedStreamStarts("FeedStreamStarts", ccFeed), + feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed), feedErrors("FeedErrors", ccFeed), + feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed), feedPops("FeedPops", ccFeed), + feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000), commitLatencies(1000), + GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), outstandingWatches(0), sharedStatePtr(nullptr), + lastGrvTime(0.0), cachedReadVersion(0), lastRkBatchThrottleTime(0.0), lastRkDefaultThrottleTime(0.0), + lastProxyRequestTime(0.0), transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo), + clientInfoMonitor(clientInfoMonitor), coordinator(coordinator), apiVersion(_apiVersion), mvCacheInsertLocation(0), + healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0), + smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT), specialKeySpace(std::make_unique(specialKeys.begin, specialKeys.end, /* test */ false)), connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) { @@ -1826,14 +1844,17 @@ DatabaseContext::DatabaseContext(const Error& err) transactionsProcessBehind("ProcessBehind", cc), transactionsThrottled("Throttled", cc), transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc), transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc), - transactionCommitVersionNotFoundForSS("CommitVersionNotFoundForSS", cc), bgReadInputBytes("BGReadInputBytes", cc), - bgReadOutputBytes("BGReadOutputBytes", cc), usedAnyChangeFeeds(false), ccFeed("ChangeFeedClientMetrics"), - feedStreamStarts("FeedStreamStarts", ccFeed), feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed), - feedErrors("FeedErrors", ccFeed), feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed), - feedPops("FeedPops", ccFeed), feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000), - commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), bgLatencies(1000), - bgGranulesPerRequest(1000), sharedStatePtr(nullptr), transactionTracingSample(false), - smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT), + transactionCommitVersionNotFoundForSS("CommitVersionNotFoundForSS", cc), anyBGReads(false), + ccBG("BlobGranuleReadMetrics"), bgReadInputBytes("BGReadInputBytes", ccBG), + bgReadOutputBytes("BGReadOutputBytes", ccBG), bgReadSnapshotRows("BGReadSnapshotRows", ccBG), + bgReadRowsCleared("BGReadRowsCleared", ccBG), bgReadRowsInserted("BGReadRowsInserted", ccBG), + bgReadRowsUpdated("BGReadRowsUpdated", ccBG), bgLatencies(1000), bgGranulesPerRequest(1000), + usedAnyChangeFeeds(false), ccFeed("ChangeFeedClientMetrics"), feedStreamStarts("FeedStreamStarts", ccFeed), + feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed), feedErrors("FeedErrors", ccFeed), + feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed), feedPops("FeedPops", ccFeed), + feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000), commitLatencies(1000), + GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), sharedStatePtr(nullptr), + transactionTracingSample(false), smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT), connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {} // Static constructor used by server processes to create a DatabaseContext @@ -8088,8 +8109,7 @@ ACTOR Future>> readBlobGranulesActor( } if (blobGranuleMapping.more) { if (BG_REQUEST_DEBUG) { - fmt::print( - "BG Mapping for [{0} - %{1}) too large!\n", keyRange.begin.printable(), keyRange.end.printable()); + fmt::print("BG Mapping for [{0} - {1}) too large!\n", keyRange.begin.printable(), keyRange.end.printable()); } TraceEvent(SevWarn, "BGMappingTooLarge") .detail("Range", range) @@ -8302,7 +8322,7 @@ ACTOR Future>> readBlobGranulesActor( } } - self->trState->cx->anyBlobGranuleRequests = true; + self->trState->cx->anyBGReads = true; self->trState->cx->bgGranulesPerRequest.addSample(results.size()); self->trState->cx->bgLatencies.addSample(now() - startTime); @@ -8344,8 +8364,13 @@ Transaction::summarizeBlobGranules(const KeyRange& range, Optional summ } void Transaction::addGranuleMaterializeStats(const GranuleMaterializeStats& stats) { + trState->cx->anyBGReads = true; trState->cx->bgReadInputBytes += stats.inputBytes; trState->cx->bgReadOutputBytes += stats.outputBytes; + trState->cx->bgReadSnapshotRows += stats.snapshotRows; + trState->cx->bgReadRowsCleared += stats.rowsCleared; + trState->cx->bgReadRowsInserted += stats.rowsInserted; + trState->cx->bgReadRowsUpdated += stats.rowsUpdated; } ACTOR Future setPerpetualStorageWiggle(Database cx, bool enable, LockAware lockAware) { diff --git a/fdbclient/include/fdbclient/BlobGranuleCommon.h b/fdbclient/include/fdbclient/BlobGranuleCommon.h index 6f530f020d..7cdc72fb71 100644 --- a/fdbclient/include/fdbclient/BlobGranuleCommon.h +++ b/fdbclient/include/fdbclient/BlobGranuleCommon.h @@ -56,10 +56,18 @@ struct GranuleDeltas : VectorRef { }; struct GranuleMaterializeStats { + // file-level stats int64_t inputBytes; int64_t outputBytes; - GranuleMaterializeStats() : inputBytes(0), outputBytes(0) {} + // merge stats + int32_t snapshotRows; + int32_t rowsCleared; + int32_t rowsInserted; + int32_t rowsUpdated; + + GranuleMaterializeStats() + : inputBytes(0), outputBytes(0), snapshotRows(0), rowsCleared(0), rowsInserted(0), rowsUpdated(0) {} }; struct BlobGranuleCipherKeysMeta { diff --git a/fdbclient/include/fdbclient/BlobGranuleFiles.h b/fdbclient/include/fdbclient/BlobGranuleFiles.h index 23faff3d03..b9c6ceaf8c 100644 --- a/fdbclient/include/fdbclient/BlobGranuleFiles.h +++ b/fdbclient/include/fdbclient/BlobGranuleFiles.h @@ -51,7 +51,8 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk, Version beginVersion, Version readVersion, Optional snapshotData, - StringRef deltaFileData[]); + StringRef deltaFileData[], + GranuleMaterializeStats& stats); std::string randomBGFilename(UID blobWorkerID, UID granuleID, Version version, std::string suffix); diff --git a/fdbclient/include/fdbclient/DatabaseContext.h b/fdbclient/include/fdbclient/DatabaseContext.h index 0898485d08..390873e0ef 100644 --- a/fdbclient/include/fdbclient/DatabaseContext.h +++ b/fdbclient/include/fdbclient/DatabaseContext.h @@ -554,8 +554,17 @@ public: Counter transactionGrvFullBatches; Counter transactionGrvTimedOutBatches; Counter transactionCommitVersionNotFoundForSS; + + // Blob Granule Read metrics. Omit from logging if not used. + bool anyBGReads; + CounterCollection ccBG; Counter bgReadInputBytes; Counter bgReadOutputBytes; + Counter bgReadSnapshotRows; + Counter bgReadRowsCleared; + Counter bgReadRowsInserted; + Counter bgReadRowsUpdated; + ContinuousSample bgLatencies, bgGranulesPerRequest; // Change Feed metrics. Omit change feed metrics from logging if not used bool usedAnyChangeFeeds; @@ -568,7 +577,7 @@ public: Counter feedPopsFallback; ContinuousSample latencies, readLatencies, commitLatencies, GRVLatencies, mutationsPerCommit, - bytesPerCommit, bgLatencies, bgGranulesPerRequest; + bytesPerCommit; int outstandingWatches; int maxOutstandingWatches; @@ -597,7 +606,6 @@ public: bool transactionTracingSample; double verifyCausalReadsProp = 0.0; bool blobGranuleNoMaterialize = false; - bool anyBlobGranuleRequests = false; Future logger; Future throttleExpirer;