FastRestore:Applier:Add more trace for perf tracking
This commit is contained in:
parent
0b27786811
commit
0d668ea0c3
|
@ -553,7 +553,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
|
|||
init( FASTRESTORE_ROLE_LOGGING_DELAY, 5 ); if( randomize ) { FASTRESTORE_ROLE_LOGGING_DELAY = deterministicRandom()->random01() * 60 + 1; }
|
||||
init( FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL, 5 ); if( randomize ) { FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL = deterministicRandom()->random01() * 60 + 1; }
|
||||
init( FASTRESTORE_ATOMICOP_WEIGHT, 100 ); if( randomize ) { FASTRESTORE_ATOMICOP_WEIGHT = deterministicRandom()->random01() * 200 + 1; }
|
||||
init( FASTRESTORE_APPLYING_PARALLELISM, 100 ); if( randomize ) { FASTRESTORE_APPLYING_PARALLELISM = deterministicRandom()->random01() * 200 + 1; }
|
||||
init( FASTRESTORE_APPLYING_PARALLELISM, 100 ); if( randomize ) { FASTRESTORE_APPLYING_PARALLELISM = deterministicRandom()->random01() * 10 + 1; }
|
||||
|
||||
// clang-format on
|
||||
|
||||
|
|
|
@ -247,7 +247,8 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
|
|||
// Apply range mutations (i.e., clearRange) to database cx
|
||||
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("Step", "Applying clear range mutations to DB");
|
||||
.detail("Step", "Applying clear range mutations to DB")
|
||||
.detail("ClearRanges", batchData->stagingKeyRanges.size());
|
||||
state std::vector<Future<Void>> fClearRanges;
|
||||
std::vector<Standalone<VectorRef<KeyRangeRef>>> clearBuf;
|
||||
clearBuf.push_back(Standalone<VectorRef<KeyRangeRef>>());
|
||||
|
@ -271,7 +272,8 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
|
|||
// Apply range mutations (i.e., clearRange) to stagingKeyRanges
|
||||
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("Step", "Applying clear range mutations to staging keys");
|
||||
.detail("Step", "Applying clear range mutations to staging keys")
|
||||
.detail("ClearRanges", batchData->stagingKeyRanges.size());
|
||||
for (auto& rangeMutation : batchData->stagingKeyRanges) {
|
||||
std::map<Key, StagingKey>::iterator lb = batchData->stagingKeys.lower_bound(rangeMutation.mutation.param1);
|
||||
std::map<Key, StagingKey>::iterator ub = batchData->stagingKeys.upper_bound(rangeMutation.mutation.param2);
|
||||
|
@ -284,7 +286,8 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
|
|||
wait(waitForAll(fClearRanges));
|
||||
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("Step", "Getting and computing staging keys");
|
||||
.detail("Step", "Getting and computing staging keys")
|
||||
.detail("StagingKeys", batchData->stagingKeys.size());
|
||||
|
||||
// Get keys in stagingKeys which does not have a baseline key by reading database cx, and precompute the key's value
|
||||
std::map<Key, std::map<Key, StagingKey>::iterator> imcompleteStagingKeys;
|
||||
|
@ -299,7 +302,8 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
|
|||
|
||||
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("Step", "Compute the other staging keys");
|
||||
.detail("Step", "Compute the other staging keys")
|
||||
.detail("StagingKeys", batchData->stagingKeys.size());
|
||||
// Pre-compute pendingMutations to other keys in stagingKeys that has base value
|
||||
for (stagingKeyIter = batchData->stagingKeys.begin(); stagingKeyIter != batchData->stagingKeys.end();
|
||||
stagingKeyIter++) {
|
||||
|
@ -322,10 +326,11 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
|
|||
// Apply mutations in batchData->stagingKeys [begin, end).
|
||||
ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::iterator begin,
|
||||
std::map<Key, StagingKey>::iterator end, Database cx,
|
||||
FlowLock* applyStagingKeysBatchLock) {
|
||||
FlowLock* applyStagingKeysBatchLock, UID applierID) {
|
||||
wait(applyStagingKeysBatchLock->take(TaskPriority::RestoreApplierWriteDB));
|
||||
state FlowLock::Releaser releaser(*applyStagingKeysBatchLock);
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
TraceEvent("FastRestoreApplierPhaseApplyStagingKeysBatch", applierID).detail("Begin", begin->first);
|
||||
loop {
|
||||
try {
|
||||
tr->reset();
|
||||
|
@ -358,20 +363,23 @@ ACTOR static Future<Void> applyStagingKeys(Reference<ApplierBatchData> batchData
|
|||
std::map<Key, StagingKey>::iterator cur = begin;
|
||||
double txnSize = 0;
|
||||
std::vector<Future<Void>> fBatches;
|
||||
TraceEvent("FastRestoreApplerPhaseApplyStagingKeys", applierID).detail("BatchIndex", batchIndex);
|
||||
TraceEvent("FastRestoreApplerPhaseApplyStagingKeys", applierID).detail("BatchIndex", batchIndex).detail("StagingKeys", batchData->stagingKeys.size());
|
||||
while (cur != batchData->stagingKeys.end()) {
|
||||
txnSize += cur->second.expectedMutationSize();
|
||||
if (txnSize > SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) {
|
||||
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock));
|
||||
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID));
|
||||
begin = cur;
|
||||
txnSize = 0;
|
||||
}
|
||||
cur++;
|
||||
}
|
||||
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock));
|
||||
if (begin != batchData->stagingKeys.end()) {
|
||||
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID));
|
||||
}
|
||||
|
||||
wait(waitForAll(fBatches));
|
||||
|
||||
TraceEvent("FastRestoreApplerPhaseApplyStagingKeysDone", applierID).detail("BatchIndex", batchIndex);
|
||||
TraceEvent("FastRestoreApplerPhaseApplyStagingKeysDone", applierID).detail("BatchIndex", batchIndex).detail("StagingKeys", batchData->stagingKeys.size());
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -380,6 +388,7 @@ ACTOR Future<Void> applyToDBV2(UID applierID, int64_t batchIndex, Reference<Appl
|
|||
wait(precomputeMutationsResult(batchData, applierID, batchIndex, cx));
|
||||
|
||||
wait(applyStagingKeys(batchData, applierID, batchIndex, cx));
|
||||
TraceEvent("FastRestoreApplerPhaseApplyTxnDone", applierID).detail("BatchIndex", batchIndex);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -738,7 +738,7 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<MasterBatchDat
|
|||
int batchIndex, NotifiedVersion* finishedBatch) {
|
||||
|
||||
wait(finishedBatch->whenAtLeast(batchIndex - 1));
|
||||
TraceEvent("FastRestoreMasterPhaseApplyToDBStart")
|
||||
TraceEvent("FastRestoreMasterPhaseApplyToDB")
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("FinishedBatch", finishedBatch->get());
|
||||
|
||||
|
@ -746,7 +746,7 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<MasterBatchDat
|
|||
// Prepare the applyToDB requests
|
||||
std::vector<std::pair<UID, RestoreVersionBatchRequest>> requests;
|
||||
|
||||
TraceEvent("FastRestoreMasterPhaseApplyMutations")
|
||||
TraceEvent("FastRestoreMasterPhaseApplyToDB")
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("Appliers", appliersInterf.size());
|
||||
for (auto& applier : appliersInterf) {
|
||||
|
@ -762,7 +762,7 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<MasterBatchDat
|
|||
batchData->applyToDB = getBatchReplies(&RestoreApplierInterface::applyToDB, appliersInterf, requests,
|
||||
&replies, TaskPriority::RestoreApplierWriteDB);
|
||||
} else {
|
||||
TraceEvent(SevError, "FastRestoreNotifyApplierToApplierMutations")
|
||||
TraceEvent(SevError, "FastRestoreMasterPhaseApplyToDB")
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("Attention", "Actor should not be invoked twice for the same batch index");
|
||||
}
|
||||
|
@ -775,7 +775,7 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<MasterBatchDat
|
|||
if (batchStatus->applyStatus[reply.id] == RestoreApplyStatus::Applying) {
|
||||
batchStatus->applyStatus[reply.id] = RestoreApplyStatus::Applied;
|
||||
if (reply.isDuplicated) {
|
||||
TraceEvent(SevWarn, "FastRestoreNotifyApplierToApplierMutations")
|
||||
TraceEvent(SevWarn, "FastRestoreMasterPhaseApplyToDB")
|
||||
.detail("Applier", reply.id)
|
||||
.detail("DuplicateRequestReturnEarlier", "Apply db request should have been processed");
|
||||
}
|
||||
|
@ -783,7 +783,7 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<MasterBatchDat
|
|||
}
|
||||
for (auto& applier : appliersInterf) {
|
||||
if (batchStatus->applyStatus[applier.first] != RestoreApplyStatus::Applied) {
|
||||
TraceEvent(SevError, "FastRestoreNotifyApplierToApplierMutations")
|
||||
TraceEvent(SevError, "FastRestoreMasterPhaseApplyToDB")
|
||||
.detail("Applier", applier.first)
|
||||
.detail("ApplyStatus", batchStatus->applyStatus[applier.first]);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue