Adding bg read amp metrics (#8275)
This commit is contained in:
parent
339183228d
commit
f78eb8c778
|
@ -1545,7 +1545,8 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
|
|||
const KeyRangeRef& keyRange,
|
||||
Version beginVersion,
|
||||
Version readVersion,
|
||||
ReadBlobGranuleContext granuleContext) {
|
||||
ReadBlobGranuleContext granuleContext,
|
||||
GranuleMaterializeStats& stats) {
|
||||
int64_t parallelism = granuleContext.granuleParallelism;
|
||||
if (parallelism < 1) {
|
||||
parallelism = 1;
|
||||
|
@ -1555,6 +1556,8 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
|
|||
}
|
||||
|
||||
GranuleLoadIds loadIds[files.size()];
|
||||
int64_t inputBytes = 0;
|
||||
int64_t outputBytes = 0;
|
||||
|
||||
try {
|
||||
// Kick off first file reads if parallelism > 1
|
||||
|
@ -1579,6 +1582,7 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
|
|||
if (!snapshotData.get().begin()) {
|
||||
return ErrorOr<RangeResult>(blob_granule_file_load_error());
|
||||
}
|
||||
inputBytes += snapshotData.get().size();
|
||||
}
|
||||
|
||||
// +1 to avoid UBSAN variable length array of size zero
|
||||
|
@ -1591,18 +1595,25 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
|
|||
if (!deltaData[i].begin()) {
|
||||
return ErrorOr<RangeResult>(blob_granule_file_load_error());
|
||||
}
|
||||
inputBytes += deltaData[i].size();
|
||||
}
|
||||
|
||||
inputBytes += files[chunkIdx].newDeltas.expectedSize();
|
||||
|
||||
// materialize rows from chunk
|
||||
chunkRows =
|
||||
materializeBlobGranule(files[chunkIdx], keyRange, beginVersion, readVersion, snapshotData, deltaData);
|
||||
|
||||
outputBytes += chunkRows.expectedSize();
|
||||
|
||||
results.arena().dependsOn(chunkRows.arena());
|
||||
results.append(results.arena(), chunkRows.begin(), chunkRows.size());
|
||||
|
||||
// free once done by forcing FreeHandles to trigger
|
||||
loadIds[chunkIdx].freeHandles.clear();
|
||||
}
|
||||
stats.inputBytes = inputBytes;
|
||||
stats.outputBytes = outputBytes;
|
||||
return ErrorOr<RangeResult>(results);
|
||||
} catch (Error& e) {
|
||||
return ErrorOr<RangeResult>(e);
|
||||
|
|
|
@ -1470,13 +1470,13 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
|
|||
transactionsProcessBehind("ProcessBehind", cc), transactionsThrottled("Throttled", cc),
|
||||
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
|
||||
transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc),
|
||||
transactionCommitVersionNotFoundForSS("CommitVersionNotFoundForSS", cc), usedAnyChangeFeeds(false),
|
||||
ccFeed("ChangeFeedClientMetrics"), feedStreamStarts("FeedStreamStarts", ccFeed),
|
||||
feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed), feedErrors("FeedErrors", ccFeed),
|
||||
feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed), feedPops("FeedPops", ccFeed),
|
||||
feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000), commitLatencies(1000),
|
||||
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), bgLatencies(1000), bgGranulesPerRequest(1000),
|
||||
outstandingWatches(0), sharedStatePtr(nullptr), lastGrvTime(0.0), cachedReadVersion(0),
|
||||
transactionCommitVersionNotFoundForSS("CommitVersionNotFoundForSS", cc), bgReadInputBytes("BGReadInputBytes", cc),
|
||||
bgReadOutputBytes("BGReadOutputBytes", cc), usedAnyChangeFeeds(false), ccFeed("ChangeFeedClientMetrics"),
|
||||
feedStreamStarts("FeedStreamStarts", ccFeed), feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed),
|
||||
feedErrors("FeedErrors", ccFeed), feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed),
|
||||
feedPops("FeedPops", ccFeed), feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000),
|
||||
commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), bgLatencies(1000),
|
||||
bgGranulesPerRequest(1000), outstandingWatches(0), sharedStatePtr(nullptr), lastGrvTime(0.0), cachedReadVersion(0),
|
||||
lastRkBatchThrottleTime(0.0), lastRkDefaultThrottleTime(0.0), lastProxyRequestTime(0.0),
|
||||
transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor),
|
||||
coordinator(coordinator), apiVersion(_apiVersion), mvCacheInsertLocation(0), healthMetricsLastUpdated(0),
|
||||
|
@ -1770,13 +1770,13 @@ DatabaseContext::DatabaseContext(const Error& err)
|
|||
transactionsProcessBehind("ProcessBehind", cc), transactionsThrottled("Throttled", cc),
|
||||
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
|
||||
transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc),
|
||||
transactionCommitVersionNotFoundForSS("CommitVersionNotFoundForSS", cc), usedAnyChangeFeeds(false),
|
||||
ccFeed("ChangeFeedClientMetrics"), feedStreamStarts("FeedStreamStarts", ccFeed),
|
||||
feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed), feedErrors("FeedErrors", ccFeed),
|
||||
feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed), feedPops("FeedPops", ccFeed),
|
||||
feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000), commitLatencies(1000),
|
||||
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), bgLatencies(1000), bgGranulesPerRequest(1000),
|
||||
sharedStatePtr(nullptr), transactionTracingSample(false),
|
||||
transactionCommitVersionNotFoundForSS("CommitVersionNotFoundForSS", cc), bgReadInputBytes("BGReadInputBytes", cc),
|
||||
bgReadOutputBytes("BGReadOutputBytes", cc), usedAnyChangeFeeds(false), ccFeed("ChangeFeedClientMetrics"),
|
||||
feedStreamStarts("FeedStreamStarts", ccFeed), feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed),
|
||||
feedErrors("FeedErrors", ccFeed), feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed),
|
||||
feedPops("FeedPops", ccFeed), feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000),
|
||||
commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), bgLatencies(1000),
|
||||
bgGranulesPerRequest(1000), sharedStatePtr(nullptr), transactionTracingSample(false),
|
||||
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
|
||||
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {}
|
||||
|
||||
|
@ -8077,6 +8077,11 @@ Transaction::summarizeBlobGranules(const KeyRange& range, Optional<Version> summ
|
|||
return summarizeBlobGranulesActor(this, range, summaryVersion, rangeLimit);
|
||||
}
|
||||
|
||||
void Transaction::addGranuleMaterializeStats(const GranuleMaterializeStats& stats) {
|
||||
trState->cx->bgReadInputBytes += stats.inputBytes;
|
||||
trState->cx->bgReadOutputBytes += stats.outputBytes;
|
||||
}
|
||||
|
||||
ACTOR Future<Version> setPerpetualStorageWiggle(Database cx, bool enable, LockAware lockAware) {
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
state Version version = invalidVersion;
|
||||
|
|
|
@ -1842,6 +1842,13 @@ Future<Standalone<VectorRef<BlobGranuleSummaryRef>>> ReadYourWritesTransaction::
|
|||
return waitOrError(tr.summarizeBlobGranules(range, summaryVersion, rangeLimit), resetPromise.getFuture());
|
||||
}
|
||||
|
||||
void ReadYourWritesTransaction::addGranuleMaterializeStats(const GranuleMaterializeStats& stats) {
|
||||
if (checkUsedDuringCommit()) {
|
||||
throw used_during_commit();
|
||||
}
|
||||
tr.addGranuleMaterializeStats(stats);
|
||||
}
|
||||
|
||||
void ReadYourWritesTransaction::addReadConflictRange(KeyRangeRef const& keys) {
|
||||
if (checkUsedDuringCommit()) {
|
||||
throw used_during_commit();
|
||||
|
|
|
@ -454,7 +454,13 @@ ThreadResult<RangeResult> ThreadSafeTransaction::readBlobGranulesFinish(
|
|||
ReadBlobGranuleContext granuleContext) {
|
||||
// do this work off of fdb network threads for performance!
|
||||
Standalone<VectorRef<BlobGranuleChunkRef>> files = startFuture.get();
|
||||
return loadAndMaterializeBlobGranules(files, keyRange, beginVersion, readVersion, granuleContext);
|
||||
GranuleMaterializeStats stats;
|
||||
auto ret = loadAndMaterializeBlobGranules(files, keyRange, beginVersion, readVersion, granuleContext, stats);
|
||||
if (!ret.isError()) {
|
||||
ISingleThreadTransaction* tr = this->tr;
|
||||
onMainThreadVoid([tr, stats]() { tr->addGranuleMaterializeStats(stats); });
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
ThreadFuture<Standalone<VectorRef<BlobGranuleSummaryRef>>> ThreadSafeTransaction::summarizeBlobGranules(
|
||||
|
|
|
@ -55,6 +55,13 @@ struct GranuleDeltas : VectorRef<MutationsAndVersionRef> {
|
|||
}
|
||||
};
|
||||
|
||||
struct GranuleMaterializeStats {
|
||||
int64_t inputBytes;
|
||||
int64_t outputBytes;
|
||||
|
||||
GranuleMaterializeStats() : inputBytes(0), outputBytes(0) {}
|
||||
};
|
||||
|
||||
struct BlobGranuleCipherKeysMeta {
|
||||
EncryptCipherDomainId textDomainId;
|
||||
EncryptCipherBaseKeyId textBaseCipherId;
|
||||
|
|
|
@ -43,7 +43,8 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
|
|||
const KeyRangeRef& keyRange,
|
||||
Version beginVersion,
|
||||
Version readVersion,
|
||||
ReadBlobGranuleContext granuleContext);
|
||||
ReadBlobGranuleContext granuleContext,
|
||||
GranuleMaterializeStats& stats);
|
||||
|
||||
RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
|
||||
KeyRangeRef keyRange,
|
||||
|
|
|
@ -544,6 +544,8 @@ public:
|
|||
Counter transactionGrvFullBatches;
|
||||
Counter transactionGrvTimedOutBatches;
|
||||
Counter transactionCommitVersionNotFoundForSS;
|
||||
Counter bgReadInputBytes;
|
||||
Counter bgReadOutputBytes;
|
||||
|
||||
// Change Feed metrics. Omit change feed metrics from logging if not used
|
||||
bool usedAnyChangeFeeds;
|
||||
|
|
|
@ -70,6 +70,7 @@ public:
|
|||
throw client_invalid_operation();
|
||||
}
|
||||
Future<int64_t> getEstimatedRangeSizeBytes(KeyRange const& keys) override { throw client_invalid_operation(); }
|
||||
void addGranuleMaterializeStats(const GranuleMaterializeStats& stats) override { throw client_invalid_operation(); }
|
||||
void addReadConflictRange(KeyRangeRef const& keys) override { throw client_invalid_operation(); }
|
||||
void makeSelfConflicting() override { throw client_invalid_operation(); }
|
||||
void atomicOp(KeyRef const& key, ValueRef const& operand, uint32_t operationType) override {
|
||||
|
|
|
@ -88,6 +88,7 @@ public:
|
|||
virtual Future<Standalone<VectorRef<BlobGranuleSummaryRef>>> summarizeBlobGranules(KeyRange const& range,
|
||||
Optional<Version> summaryVersion,
|
||||
int rangeLimit) = 0;
|
||||
virtual void addGranuleMaterializeStats(const GranuleMaterializeStats& stats) = 0;
|
||||
virtual void addReadConflictRange(KeyRangeRef const& keys) = 0;
|
||||
virtual void makeSelfConflicting() = 0;
|
||||
virtual void atomicOp(KeyRef const& key, ValueRef const& operand, uint32_t operationType) = 0;
|
||||
|
|
|
@ -423,6 +423,8 @@ public:
|
|||
Optional<Version> summaryVersion,
|
||||
int rangeLimit);
|
||||
|
||||
void addGranuleMaterializeStats(const GranuleMaterializeStats& stats);
|
||||
|
||||
// If checkWriteConflictRanges is true, existing write conflict ranges will be searched for this key
|
||||
void set(const KeyRef& key, const ValueRef& value, AddConflictRange = AddConflictRange::True);
|
||||
void atomicOp(const KeyRef& key,
|
||||
|
|
|
@ -130,6 +130,7 @@ public:
|
|||
Future<Standalone<VectorRef<BlobGranuleSummaryRef>>> summarizeBlobGranules(const KeyRange& range,
|
||||
Optional<Version> summaryVersion,
|
||||
int rangeLimit) override;
|
||||
void addGranuleMaterializeStats(const GranuleMaterializeStats& stats) override;
|
||||
|
||||
void addReadConflictRange(KeyRangeRef const& keys) override;
|
||||
void makeSelfConflicting() override { tr.makeSelfConflicting(); }
|
||||
|
|
Loading…
Reference in New Issue