FastRestore:Applier:Fix precompute mutation result
This commit is contained in:
parent
b1b44d4477
commit
b5e60585aa
|
@ -111,7 +111,7 @@ ACTOR static Future<Void> handleSendMutationVectorRequestV2(RestoreSendVersioned
|
|||
// Note: Insert new items into processedFileState will not invalidate the reference.
|
||||
state NotifiedVersion& curFilePos = batchData->processedFileState[req.asset];
|
||||
|
||||
TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations", self->id())
|
||||
TraceEvent(SevDebug, "FastRestoreApplierPhaseReceiveMutations", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("RestoreAsset", req.asset.toString())
|
||||
.detail("ProcessedFileVersion", curFilePos.get())
|
||||
|
@ -165,6 +165,11 @@ ACTOR static Future<Void> handleSendMutationVectorRequestV2(RestoreSendVersioned
|
|||
}
|
||||
|
||||
req.reply.send(RestoreCommonReply(self->id(), isDuplicated));
|
||||
TraceEvent(SevDebug, "FastRestoreApplierPhaseReceiveMutationsDone", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("RestoreAsset", req.asset.toString())
|
||||
.detail("ProcessedFileVersion", curFilePos.get())
|
||||
.detail("Request", req.toString());
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -238,11 +243,10 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
|
|||
|
||||
ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData> batchData, UID applierID,
|
||||
int64_t batchIndex, Database cx) {
|
||||
// Apply range mutations (i.e., clearRange) to database cx
|
||||
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("Step", "Applying clear range mutations");
|
||||
;
|
||||
// Apply range mutations (i.e., clearRange) to database cx
|
||||
.detail("Step", "Applying clear range mutations to DB");
|
||||
state std::vector<Future<Void>> fClearRanges;
|
||||
std::vector<Standalone<VectorRef<KeyRangeRef>>> clearBuf;
|
||||
clearBuf.push_back(Standalone<VectorRef<KeyRangeRef>>());
|
||||
|
@ -264,11 +268,15 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
|
|||
}
|
||||
|
||||
// Apply range mutations (i.e., clearRange) to stagingKeyRanges
|
||||
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
|
||||
.detail("BatchIndex", batchIndex)
|
||||
.detail("Step", "Applying clear range mutations to staging keys");
|
||||
for (auto& rangeMutation : batchData->stagingKeyRanges) {
|
||||
std::map<Key, StagingKey>::iterator lb = batchData->stagingKeys.lower_bound(rangeMutation.mutation.param1);
|
||||
std::map<Key, StagingKey>::iterator ub = batchData->stagingKeys.upper_bound(rangeMutation.mutation.param1);
|
||||
std::map<Key, StagingKey>::iterator ub = batchData->stagingKeys.upper_bound(rangeMutation.mutation.param2);
|
||||
while (lb != ub) {
|
||||
lb->second.add(rangeMutation.mutation, rangeMutation.version);
|
||||
lb++;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -70,7 +70,7 @@ struct StagingKey {
|
|||
MutationsVec& mutations = pendingMutations[newVersion];
|
||||
mutations.push_back_deep(mutations.arena(), m);
|
||||
}
|
||||
} else if (version == newVersion) {
|
||||
} else if (version == newVersion) { // Sanity check
|
||||
TraceEvent("FastRestoreApplierStagingKeyMutationAtSameVersion")
|
||||
.detail("Version", newVersion)
|
||||
.detail("NewMutation", m.toString())
|
||||
|
@ -113,6 +113,7 @@ struct StagingKey {
|
|||
}
|
||||
while (lb != pendingMutations.end()) {
|
||||
if (lb->first == version) {
|
||||
lb++;
|
||||
continue;
|
||||
}
|
||||
for (auto& mutation : lb->second) {
|
||||
|
@ -138,6 +139,7 @@ struct StagingKey {
|
|||
}
|
||||
}
|
||||
version = lb->first;
|
||||
lb++;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -264,7 +264,7 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
|
|||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("UseRangeFile", req.useRangeFile);
|
||||
wait(batchStatus->sendAllRanges.get());
|
||||
} else if (req.useRangeFile) {
|
||||
} else {
|
||||
TraceEvent(SevDebug, "FastRestoreSendMutationsSkipDuplicateRangeRequest", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("UseRangeFile", req.useRangeFile);
|
||||
|
|
Loading…
Reference in New Issue