diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index 18843ca496..3b2755b290 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -36,6 +36,8 @@ ACTOR static Future handleSendMutationVectorRequest(RestoreSendVersionedMutationsRequest req, Reference self); +ACTOR static Future handleSendMutationVectorRequestV2(RestoreSendVersionedMutationsRequest req, + Reference self); ACTOR static Future handleApplyToDBRequest(RestoreVersionBatchRequest req, Reference self, Database cx); @@ -61,7 +63,7 @@ ACTOR Future restoreApplierCore(RestoreApplierInterface applierInterf, int when(RestoreSendVersionedMutationsRequest req = waitNext(applierInterf.sendMutationVector.getFuture())) { requestTypeStr = "sendMutationVector"; - actors.add(handleSendMutationVectorRequest(req, self)); + actors.add(handleSendMutationVectorRequestV2(req, self)); } when(RestoreVersionBatchRequest req = waitNext(applierInterf.applyToDB.getFuture())) { requestTypeStr = "applyToDB"; @@ -98,6 +100,68 @@ ACTOR Future restoreApplierCore(RestoreApplierInterface applierInterf, int return Void(); } +// The actor may be invovked multiple times and executed async. +// No race condition as long as we do not wait or yield when operate the shared data. +// Multiple such actors can run on different fileIDs, because mutations in different files belong to different versions; +// Only one actor can process mutations from the same file +ACTOR static Future handleSendMutationVectorRequestV2(RestoreSendVersionedMutationsRequest req, + Reference self) { + state Reference batchData = self->batch[req.batchIndex]; + // Assume: processedFileState[req.asset] will not be erased while the actor is active. + // Note: Insert new items into processedFileState will not invalidate the reference. + state NotifiedVersion& curFilePos = batchData->processedFileState[req.asset]; + + TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations", self->id()) + .detail("BatchIndex", req.batchIndex) + .detail("RestoreAsset", req.asset.toString()) + .detail("ProcessedFileVersion", curFilePos.get()) + .detail("Request", req.toString()); + + wait(curFilePos.whenAtLeast(req.prevVersion)); + + state bool isDuplicated = true; + if (curFilePos.get() == req.prevVersion) { + isDuplicated = false; + Version commitVersion = req.version; + MutationsVec mutations(req.mutations); + // Sanity check: mutations in range file is in [beginVersion, endVersion); + // mutations in log file is in [beginVersion, endVersion], both inclusive. + ASSERT_WE_THINK(commitVersion >= req.asset.beginVersion); + // Loader sends the endVersion to ensure all useful versions are sent + ASSERT_WE_THINK((req.isRangeFile && commitVersion <= req.asset.endVersion) || + (!req.isRangeFile && commitVersion <= req.asset.endVersion)); + + for (int mIndex = 0; mIndex < mutations.size(); mIndex++) { + MutationRef mutation = mutations[mIndex]; + TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations") + .detail("ApplierNode", self->id()) + .detail("RestoreAsset", req.asset.toString()) + .detail("Version", commitVersion) + .detail("Index", mIndex) + .detail("MutationReceived", mutation.toString()); + batchData->counters.receivedBytes += mutation.totalSize(); + batchData->counters.receivedWeightedBytes += mutation.weightedTotalSize(); // atomicOp will be amplified + batchData->counters.receivedMutations += 1; + batchData->counters.receivedAtomicOps += isAtomicOp((MutationRef::Type) mutation.type) ? 1 : 0; + // Sanity check + if (g_network->isSimulated()) { + if (isRangeMutation(mutation)) { + ASSERT(mutation.param1 >= req.asset.range.begin && + mutation.param2 <= req.asset.range.end); // Range mutation's right side is exclusive + } else { + ASSERT(mutation.param1 >= req.asset.range.begin && mutation.param1 < req.asset.range.end); + } + } + // Note: Log and range mutations may be delivered out of order. Can we handle it? + batchData->stagingKeys.addMutation(mutation, commitVersion); + } + curFilePos.set(req.version); + } + + req.reply.send(RestoreCommonReply(self->id(), isDuplicated)); + return Void(); +} + // The actor may be invovked multiple times and executed async. // No race condition as long as we do not wait or yield when operate the shared data. // Multiple such actors can run on different fileIDs, because mutations in different files belong to different versions; diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index 4883393da8..e18a3941fb 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -47,15 +47,9 @@ struct StagingKey { Version version; // largest version of set or clear for the key std::map pendingMutations; // mutations not set or clear type - // bool operator < (const StagingKey& rhs) const { - // return std::tie(key, version, type, value) - // } explicit StagingKey() : version(0) {} - explicit StagingKey(MutationRef m, Version version) - : key(m.param1), val(m.param2), type(m.type), version(versoin) {} void add(const MutationRef& m, Version newVersion) { - ASSERT(version > 0); // Only add mutation if (version < newVersion) { if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) { key = m.param1; @@ -81,7 +75,6 @@ struct StagingKey { .detail("ExistingKeyType", typeString[type]); } } // else input mutation is old and can be ignored - return; } } @@ -128,6 +121,13 @@ struct ApplierBatchData : public ReferenceCounted { } ~ApplierBatchData() = default; + void addMutation(MutationRef m, Version ver) { + if (stagingKeys.find(m.param1) == stagingKeys.end()) { + stagingKeys.emplace(m.param1, StagingKey()); + } + stagingKeys[m.param1].add(m, ver); + } + void reset() { kvOps.clear(); dbApplier = Optional>();