adding new bg read metrics to client and refactoring them into a new trace event (#8493)

This commit is contained in:
Josh Slocum 2022-10-18 14:13:35 -05:00 committed by GitHub
parent a6a53b40fd
commit 89519343a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 131 additions and 63 deletions

View File

@ -1324,7 +1324,8 @@ typedef std::priority_queue<MergeStreamNext, std::vector<MergeStreamNext>, Order
static RangeResult mergeDeltaStreams(const BlobGranuleChunkRef& chunk,
const std::vector<Standalone<VectorRef<ParsedDeltaBoundaryRef>>>& streams,
const std::vector<bool> startClears) {
const std::vector<bool> startClears,
GranuleMaterializeStats& stats) {
ASSERT(streams.size() < std::numeric_limits<int16_t>::max());
ASSERT(startClears.size() == streams.size());
@ -1357,6 +1358,10 @@ static RangeResult mergeDeltaStreams(const BlobGranuleChunkRef& chunk,
}
}
if (chunk.snapshotFile.present()) {
stats.snapshotRows += streams[0].size();
}
RangeResult result;
std::vector<MergeStreamNext> cur;
cur.reserve(streams.size());
@ -1373,6 +1378,7 @@ static RangeResult mergeDeltaStreams(const BlobGranuleChunkRef& chunk,
// un-set clears and find latest value for key (if present)
bool foundValue = false;
bool includesSnapshot = cur.back().streamIdx == 0 && chunk.snapshotFile.present();
for (auto& it : cur) {
auto& v = streams[it.streamIdx][it.dataIdx];
if (clearActive[it.streamIdx]) {
@ -1392,6 +1398,13 @@ static RangeResult mergeDeltaStreams(const BlobGranuleChunkRef& chunk,
KeyRef finalKey =
chunk.tenantPrefix.present() ? v.key.removePrefix(chunk.tenantPrefix.get()) : v.key;
result.push_back_deep(result.arena(), KeyValueRef(finalKey, v.value));
if (!includesSnapshot) {
stats.rowsInserted++;
} else if (it.streamIdx > 0) {
stats.rowsUpdated++;
}
} else if (includesSnapshot) {
stats.rowsCleared++;
}
}
}
@ -1413,6 +1426,8 @@ static RangeResult mergeDeltaStreams(const BlobGranuleChunkRef& chunk,
}
}
stats.outputBytes += result.expectedSize();
return result;
}
@ -1421,7 +1436,8 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
Version beginVersion,
Version readVersion,
Optional<StringRef> snapshotData,
StringRef deltaFileData[]) {
StringRef deltaFileData[],
GranuleMaterializeStats& stats) {
// TODO REMOVE with early replying
ASSERT(readVersion == chunk.includedVersion);
@ -1444,6 +1460,7 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
streams.reserve(chunk.deltaFiles.size() + 2);
if (snapshotData.present()) {
stats.inputBytes += snapshotData.get().size();
ASSERT(chunk.snapshotFile.present());
Standalone<VectorRef<ParsedDeltaBoundaryRef>> snapshotRows =
loadSnapshotFile(chunk.snapshotFile.get().filename,
@ -1461,6 +1478,7 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
fmt::print("Applying {} delta files\n", chunk.deltaFiles.size());
}
for (int deltaIdx = 0; deltaIdx < chunk.deltaFiles.size(); deltaIdx++) {
stats.inputBytes += deltaFileData[deltaIdx].size();
bool startClear = false;
auto deltaRows = loadChunkedDeltaFile(chunk.deltaFiles[deltaIdx].filename,
deltaFileData[deltaIdx],
@ -1480,6 +1498,7 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
fmt::print("Applying {} memory deltas\n", chunk.newDeltas.size());
}
if (!chunk.newDeltas.empty()) {
stats.inputBytes += chunk.newDeltas.expectedSize();
// TODO REMOVE validation
ASSERT(beginVersion <= chunk.newDeltas.front().version);
ASSERT(readVersion >= chunk.newDeltas.back().version);
@ -1491,7 +1510,7 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
}
}
return mergeDeltaStreams(chunk, streams, startClears);
return mergeDeltaStreams(chunk, streams, startClears, stats);
}
struct GranuleLoadFreeHandle : NonCopyable, ReferenceCounted<GranuleLoadFreeHandle> {
@ -1560,8 +1579,6 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
}
GranuleLoadIds loadIds[files.size()];
int64_t inputBytes = 0;
int64_t outputBytes = 0;
try {
// Kick off first file reads if parallelism > 1
@ -1586,7 +1603,6 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
if (!snapshotData.get().begin()) {
return ErrorOr<RangeResult>(blob_granule_file_load_error());
}
inputBytes += snapshotData.get().size();
}
// +1 to avoid UBSAN variable length array of size zero
@ -1599,16 +1615,11 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
if (!deltaData[i].begin()) {
return ErrorOr<RangeResult>(blob_granule_file_load_error());
}
inputBytes += deltaData[i].size();
}
inputBytes += files[chunkIdx].newDeltas.expectedSize();
// materialize rows from chunk
chunkRows =
materializeBlobGranule(files[chunkIdx], keyRange, beginVersion, readVersion, snapshotData, deltaData);
outputBytes += chunkRows.expectedSize();
chunkRows = materializeBlobGranule(
files[chunkIdx], keyRange, beginVersion, readVersion, snapshotData, deltaData, stats);
results.arena().dependsOn(chunkRows.arena());
results.append(results.arena(), chunkRows.begin(), chunkRows.size());
@ -1616,8 +1627,6 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
// free once done by forcing FreeHandles to trigger
loadIds[chunkIdx].freeHandles.clear();
}
stats.inputBytes = inputBytes;
stats.outputBytes = outputBytes;
return ErrorOr<RangeResult>(results);
} catch (Error& e) {
return ErrorOr<RangeResult>(e);
@ -2303,6 +2312,7 @@ void checkDeltaRead(const KeyValueGen& kvGen,
// expected answer
std::map<KeyRef, ValueRef> expectedData;
Version lastFileEndVersion = 0;
GranuleMaterializeStats stats;
fmt::print("Delta Read [{0} - {1}) @ {2} - {3}\n",
range.begin.printable(),
@ -2322,7 +2332,7 @@ void checkDeltaRead(const KeyValueGen& kvGen,
chunk.includedVersion = readVersion;
chunk.snapshotVersion = invalidVersion;
RangeResult actualData = materializeBlobGranule(chunk, range, beginVersion, readVersion, {}, serialized);
RangeResult actualData = materializeBlobGranule(chunk, range, beginVersion, readVersion, {}, serialized, stats);
if (expectedData.size() != actualData.size()) {
fmt::print("Expected Data {0}:\n", expectedData.size());
@ -2430,6 +2440,7 @@ void checkGranuleRead(const KeyValueGen& kvGen,
}
Version lastFileEndVersion = 0;
applyDeltasByVersion(deltaData, range, beginVersion, readVersion, lastFileEndVersion, expectedData);
GranuleMaterializeStats stats;
// actual answer
Standalone<BlobGranuleChunkRef> chunk;
@ -2477,7 +2488,8 @@ void checkGranuleRead(const KeyValueGen& kvGen,
if (beginVersion == 0) {
snapshotPtr = serializedSnapshot;
}
RangeResult actualData = materializeBlobGranule(chunk, range, beginVersion, readVersion, snapshotPtr, deltaPtrs);
RangeResult actualData =
materializeBlobGranule(chunk, range, beginVersion, readVersion, snapshotPtr, deltaPtrs, stats);
if (expectedData.size() != actualData.size()) {
fmt::print("Expected Size {0} != Actual Size {1}\n", expectedData.size(), actualData.size());
@ -2822,6 +2834,7 @@ std::pair<int64_t, double> doReadBench(const FileSet& fileSet,
Version readVersion = std::get<1>(fileSet.deltaFiles.back());
Standalone<BlobGranuleChunkRef> chunk;
GranuleMaterializeStats stats;
StringRef deltaPtrs[fileSet.deltaFiles.size()];
MutationRef clearAllAtEndMutation;
@ -2875,14 +2888,25 @@ std::pair<int64_t, double> doReadBench(const FileSet& fileSet,
}
serializedBytes += actualData.expectedSize();
} else {
RangeResult actualData =
materializeBlobGranule(chunk, readRange, 0, readVersion, std::get<2>(fileSet.snapshotFile), deltaPtrs);
RangeResult actualData = materializeBlobGranule(
chunk, readRange, 0, readVersion, std::get<2>(fileSet.snapshotFile), deltaPtrs, stats);
serializedBytes += actualData.expectedSize();
}
}
elapsed += timer_monotonic();
elapsed /= READ_RUNS;
serializedBytes /= READ_RUNS;
// TODO REMOVE
fmt::print("Materialize stats:\n");
fmt::print(" Input bytes: {0}\n", stats.inputBytes);
fmt::print(" Output bytes: {0}\n", stats.outputBytes);
fmt::print(" Write Amp: {0}\n", (1.0 * stats.inputBytes) / stats.outputBytes);
fmt::print(" Snapshot Rows: {0}\n", stats.snapshotRows);
fmt::print(" Rows Cleared: {0}\n", stats.rowsCleared);
fmt::print(" Rows Inserted: {0}\n", stats.rowsInserted);
fmt::print(" Rows Updated: {0}\n", stats.rowsUpdated);
return { serializedBytes, elapsed };
}

View File

@ -105,7 +105,9 @@ ACTOR Future<RangeResult> readBlobGranule(BlobGranuleChunkRef chunk,
arena.dependsOn(data.arena());
}
return materializeBlobGranule(chunk, keyRange, beginVersion, readVersion, snapshotData, deltaData);
// TODO do something useful with stats?
GranuleMaterializeStats stats;
return materializeBlobGranule(chunk, keyRange, beginVersion, readVersion, snapshotData, deltaData, stats);
} catch (Error& e) {
throw e;

View File

@ -689,25 +689,8 @@ ACTOR Future<Void> databaseLogger(DatabaseContext* cx) {
.detail("MedianBytesPerCommit", cx->bytesPerCommit.median())
.detail("MaxBytesPerCommit", cx->bytesPerCommit.max())
.detail("NumLocalityCacheEntries", cx->locationCache.size());
if (cx->anyBlobGranuleRequests) {
ev.detail("MeanBGLatency", cx->bgLatencies.mean())
.detail("MedianBGLatency", cx->bgLatencies.median())
.detail("MaxBGLatency", cx->bgLatencies.max())
.detail("MeanBGGranulesPerRequest", cx->bgGranulesPerRequest.mean())
.detail("MedianBGGranulesPerRequest", cx->bgGranulesPerRequest.median())
.detail("MaxBGGranulesPerRequest", cx->bgGranulesPerRequest.max());
}
}
cx->latencies.clear();
cx->readLatencies.clear();
cx->GRVLatencies.clear();
cx->commitLatencies.clear();
cx->mutationsPerCommit.clear();
cx->bytesPerCommit.clear();
cx->bgLatencies.clear();
cx->bgGranulesPerRequest.clear();
if (cx->usedAnyChangeFeeds && logTraces) {
TraceEvent feedEv("ChangeFeedClientMetrics", cx->dbId);
@ -721,6 +704,37 @@ ACTOR Future<Void> databaseLogger(DatabaseContext* cx) {
cx->ccFeed.logToTraceEvent(feedEv);
}
if (cx->anyBGReads && logTraces) {
TraceEvent bgReadEv("BlobGranuleReadMetrics", cx->dbId);
bgReadEv.detail("Elapsed", (lastLogged == 0) ? 0 : now() - lastLogged)
.detail("Cluster",
cx->getConnectionRecord()
? cx->getConnectionRecord()->getConnectionString().clusterKeyName().toString()
: "")
.detail("Internal", cx->internal);
// add counters
cx->ccBG.logToTraceEvent(bgReadEv);
// add latencies
bgReadEv.detail("MeanBGLatency", cx->bgLatencies.mean())
.detail("MedianBGLatency", cx->bgLatencies.median())
.detail("MaxBGLatency", cx->bgLatencies.max())
.detail("MeanBGGranulesPerRequest", cx->bgGranulesPerRequest.mean())
.detail("MedianBGGranulesPerRequest", cx->bgGranulesPerRequest.median())
.detail("MaxBGGranulesPerRequest", cx->bgGranulesPerRequest.max());
}
cx->latencies.clear();
cx->readLatencies.clear();
cx->GRVLatencies.clear();
cx->commitLatencies.clear();
cx->mutationsPerCommit.clear();
cx->bytesPerCommit.clear();
cx->bgLatencies.clear();
cx->bgGranulesPerRequest.clear();
lastLogged = now();
}
}
@ -1526,17 +1540,21 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
transactionsProcessBehind("ProcessBehind", cc), transactionsThrottled("Throttled", cc),
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc),
transactionCommitVersionNotFoundForSS("CommitVersionNotFoundForSS", cc), bgReadInputBytes("BGReadInputBytes", cc),
bgReadOutputBytes("BGReadOutputBytes", cc), usedAnyChangeFeeds(false), ccFeed("ChangeFeedClientMetrics"),
feedStreamStarts("FeedStreamStarts", ccFeed), feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed),
feedErrors("FeedErrors", ccFeed), feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed),
feedPops("FeedPops", ccFeed), feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000),
commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), bgLatencies(1000),
bgGranulesPerRequest(1000), outstandingWatches(0), sharedStatePtr(nullptr), lastGrvTime(0.0), cachedReadVersion(0),
lastRkBatchThrottleTime(0.0), lastRkDefaultThrottleTime(0.0), lastProxyRequestTime(0.0),
transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor),
coordinator(coordinator), apiVersion(_apiVersion), mvCacheInsertLocation(0), healthMetricsLastUpdated(0),
detailedHealthMetricsLastUpdated(0), smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
transactionCommitVersionNotFoundForSS("CommitVersionNotFoundForSS", cc), anyBGReads(false),
ccBG("BlobGranuleReadMetrics"), bgReadInputBytes("BGReadInputBytes", ccBG),
bgReadOutputBytes("BGReadOutputBytes", ccBG), bgReadSnapshotRows("BGReadSnapshotRows", ccBG),
bgReadRowsCleared("BGReadRowsCleared", ccBG), bgReadRowsInserted("BGReadRowsInserted", ccBG),
bgReadRowsUpdated("BGReadRowsUpdated", ccBG), bgLatencies(1000), bgGranulesPerRequest(1000),
usedAnyChangeFeeds(false), ccFeed("ChangeFeedClientMetrics"), feedStreamStarts("FeedStreamStarts", ccFeed),
feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed), feedErrors("FeedErrors", ccFeed),
feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed), feedPops("FeedPops", ccFeed),
feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000), commitLatencies(1000),
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), outstandingWatches(0), sharedStatePtr(nullptr),
lastGrvTime(0.0), cachedReadVersion(0), lastRkBatchThrottleTime(0.0), lastRkDefaultThrottleTime(0.0),
lastProxyRequestTime(0.0), transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo),
clientInfoMonitor(clientInfoMonitor), coordinator(coordinator), apiVersion(_apiVersion), mvCacheInsertLocation(0),
healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0),
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
specialKeySpace(std::make_unique<SpecialKeySpace>(specialKeys.begin, specialKeys.end, /* test */ false)),
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {
@ -1826,14 +1844,17 @@ DatabaseContext::DatabaseContext(const Error& err)
transactionsProcessBehind("ProcessBehind", cc), transactionsThrottled("Throttled", cc),
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc),
transactionCommitVersionNotFoundForSS("CommitVersionNotFoundForSS", cc), bgReadInputBytes("BGReadInputBytes", cc),
bgReadOutputBytes("BGReadOutputBytes", cc), usedAnyChangeFeeds(false), ccFeed("ChangeFeedClientMetrics"),
feedStreamStarts("FeedStreamStarts", ccFeed), feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed),
feedErrors("FeedErrors", ccFeed), feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed),
feedPops("FeedPops", ccFeed), feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000),
commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), bgLatencies(1000),
bgGranulesPerRequest(1000), sharedStatePtr(nullptr), transactionTracingSample(false),
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
transactionCommitVersionNotFoundForSS("CommitVersionNotFoundForSS", cc), anyBGReads(false),
ccBG("BlobGranuleReadMetrics"), bgReadInputBytes("BGReadInputBytes", ccBG),
bgReadOutputBytes("BGReadOutputBytes", ccBG), bgReadSnapshotRows("BGReadSnapshotRows", ccBG),
bgReadRowsCleared("BGReadRowsCleared", ccBG), bgReadRowsInserted("BGReadRowsInserted", ccBG),
bgReadRowsUpdated("BGReadRowsUpdated", ccBG), bgLatencies(1000), bgGranulesPerRequest(1000),
usedAnyChangeFeeds(false), ccFeed("ChangeFeedClientMetrics"), feedStreamStarts("FeedStreamStarts", ccFeed),
feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed), feedErrors("FeedErrors", ccFeed),
feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed), feedPops("FeedPops", ccFeed),
feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000), commitLatencies(1000),
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), sharedStatePtr(nullptr),
transactionTracingSample(false), smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {}
// Static constructor used by server processes to create a DatabaseContext
@ -8088,8 +8109,7 @@ ACTOR Future<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranulesActor(
}
if (blobGranuleMapping.more) {
if (BG_REQUEST_DEBUG) {
fmt::print(
"BG Mapping for [{0} - %{1}) too large!\n", keyRange.begin.printable(), keyRange.end.printable());
fmt::print("BG Mapping for [{0} - {1}) too large!\n", keyRange.begin.printable(), keyRange.end.printable());
}
TraceEvent(SevWarn, "BGMappingTooLarge")
.detail("Range", range)
@ -8302,7 +8322,7 @@ ACTOR Future<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranulesActor(
}
}
self->trState->cx->anyBlobGranuleRequests = true;
self->trState->cx->anyBGReads = true;
self->trState->cx->bgGranulesPerRequest.addSample(results.size());
self->trState->cx->bgLatencies.addSample(now() - startTime);
@ -8344,8 +8364,13 @@ Transaction::summarizeBlobGranules(const KeyRange& range, Optional<Version> summ
}
void Transaction::addGranuleMaterializeStats(const GranuleMaterializeStats& stats) {
trState->cx->anyBGReads = true;
trState->cx->bgReadInputBytes += stats.inputBytes;
trState->cx->bgReadOutputBytes += stats.outputBytes;
trState->cx->bgReadSnapshotRows += stats.snapshotRows;
trState->cx->bgReadRowsCleared += stats.rowsCleared;
trState->cx->bgReadRowsInserted += stats.rowsInserted;
trState->cx->bgReadRowsUpdated += stats.rowsUpdated;
}
ACTOR Future<Version> setPerpetualStorageWiggle(Database cx, bool enable, LockAware lockAware) {

View File

@ -56,10 +56,18 @@ struct GranuleDeltas : VectorRef<MutationsAndVersionRef> {
};
struct GranuleMaterializeStats {
// file-level stats
int64_t inputBytes;
int64_t outputBytes;
GranuleMaterializeStats() : inputBytes(0), outputBytes(0) {}
// merge stats
int32_t snapshotRows;
int32_t rowsCleared;
int32_t rowsInserted;
int32_t rowsUpdated;
GranuleMaterializeStats()
: inputBytes(0), outputBytes(0), snapshotRows(0), rowsCleared(0), rowsInserted(0), rowsUpdated(0) {}
};
struct BlobGranuleCipherKeysMeta {

View File

@ -51,7 +51,8 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
Version beginVersion,
Version readVersion,
Optional<StringRef> snapshotData,
StringRef deltaFileData[]);
StringRef deltaFileData[],
GranuleMaterializeStats& stats);
std::string randomBGFilename(UID blobWorkerID, UID granuleID, Version version, std::string suffix);

View File

@ -554,8 +554,17 @@ public:
Counter transactionGrvFullBatches;
Counter transactionGrvTimedOutBatches;
Counter transactionCommitVersionNotFoundForSS;
// Blob Granule Read metrics. Omit from logging if not used.
bool anyBGReads;
CounterCollection ccBG;
Counter bgReadInputBytes;
Counter bgReadOutputBytes;
Counter bgReadSnapshotRows;
Counter bgReadRowsCleared;
Counter bgReadRowsInserted;
Counter bgReadRowsUpdated;
ContinuousSample<double> bgLatencies, bgGranulesPerRequest;
// Change Feed metrics. Omit change feed metrics from logging if not used
bool usedAnyChangeFeeds;
@ -568,7 +577,7 @@ public:
Counter feedPopsFallback;
ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, mutationsPerCommit,
bytesPerCommit, bgLatencies, bgGranulesPerRequest;
bytesPerCommit;
int outstandingWatches;
int maxOutstandingWatches;
@ -597,7 +606,6 @@ public:
bool transactionTracingSample;
double verifyCausalReadsProp = 0.0;
bool blobGranuleNoMaterialize = false;
bool anyBlobGranuleRequests = false;
Future<Void> logger;
Future<Void> throttleExpirer;