From c0f75d77b14e596173675185c18b7c1604dd4fae Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Wed, 12 Feb 2020 13:57:08 -0800 Subject: [PATCH 01/22] FastRestore:Applier:Intro StagingKey struct --- fdbserver/RestoreApplier.actor.cpp | 2 +- fdbserver/RestoreApplier.actor.h | 53 ++++++++++++++++++++++++++++++ fdbserver/RestoreCommon.actor.cpp | 2 -- 3 files changed, 54 insertions(+), 3 deletions(-) diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index 01ee0e0b6d..18843ca496 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -134,7 +134,7 @@ ACTOR static Future handleSendMutationVectorRequest(RestoreSendVersionedMu } for (int mIndex = 0; mIndex < mutations.size(); mIndex++) { MutationRef mutation = mutations[mIndex]; - TraceEvent(SevFRMutationInfo, "FastRestore") + TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations") .detail("ApplierNode", self->id()) .detail("RestoreAsset", req.asset.toString()) .detail("Version", commitVersion) diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index 82bff8506e..4883393da8 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -40,11 +40,64 @@ #include "flow/actorcompiler.h" // has to be last include +struct StagingKey { + Key key; // TODO: Maybe not needed? + Value val; + MutationRef::Type type; // set or clear + Version version; // largest version of set or clear for the key + std::map pendingMutations; // mutations not set or clear type + + // bool operator < (const StagingKey& rhs) const { + // return std::tie(key, version, type, value) + // } + explicit StagingKey() : version(0) {} + explicit StagingKey(MutationRef m, Version version) + : key(m.param1), val(m.param2), type(m.type), version(versoin) {} + + void add(const MutationRef& m, Version newVersion) { + ASSERT(version > 0); // Only add mutation + if (version < newVersion) { + if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) { + key = m.param1; + val = m.param2; + type = m.type; + version = newVersion; + } else { + if (pendingMutations.find(newVersion) == pendingMutations.end()) { + pendingMutations.emplace(newVersion, MutationsVec()); + } + // TODO: Do we really need deep copy? + pendingMutations[newVersion].push_back_deep(pendingMutations.arena(), m); + } + } else if (version == newVersion) { + TraceEvent("FastRestoreApplierStagingKeyMutationAtSameVersion") + .detail("Version", newVersion) + .detail("NewMutation", m.toString()) + .detail("ExistingKeyType", typeString[type]); + if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) { + TraceEvent(SevError, "FastRestoreApplierStagingKeyMutationAtSameVersionUnhandled") + .detail("Version", newVersion) + .detail("NewMutation", m.toString()) + .detail("ExistingKeyType", typeString[type]); + } + } // else input mutation is old and can be ignored + return; + } +} + +struct StagingKeyRange { + KeyRange range; + MutationRef::Type type; // clearrange + Version version; +} + struct ApplierBatchData : public ReferenceCounted { // processedFileState: key: RestoreAsset; value: largest version of mutation received on the applier std::map processedFileState; Optional> dbApplier; VersionedMutationsMap kvOps; // Mutations at each version + std::map stagingKeys; + std::set stagingKeyRanges; Future pollMetrics; diff --git a/fdbserver/RestoreCommon.actor.cpp b/fdbserver/RestoreCommon.actor.cpp index d8689d136f..0c336538da 100644 --- a/fdbserver/RestoreCommon.actor.cpp +++ b/fdbserver/RestoreCommon.actor.cpp @@ -292,8 +292,6 @@ std::string RestoreConfigFR::toString() { return ss.str(); } -//typedef RestoreConfigFR::RestoreFile RestoreFileFR; - // parallelFileRestore is copied from FileBackupAgent.actor.cpp for the same reason as RestoreConfigFR is copied // The implementation of parallelFileRestore is copied from FileBackupAgent.actor.cpp // parallelFileRestore is copied from FileBackupAgent.actor.cpp for the same reason as RestoreConfigFR is copied From 2bc82ffd705d7780e709c8f77795b4d482e70cd3 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Wed, 12 Feb 2020 14:12:38 -0800 Subject: [PATCH 02/22] FastRestore:Applier:Store received mutation by key --- fdbserver/RestoreApplier.actor.cpp | 66 +++++++++++++++++++++++++++++- fdbserver/RestoreApplier.actor.h | 14 +++---- 2 files changed, 72 insertions(+), 8 deletions(-) diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index 18843ca496..3b2755b290 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -36,6 +36,8 @@ ACTOR static Future handleSendMutationVectorRequest(RestoreSendVersionedMutationsRequest req, Reference self); +ACTOR static Future handleSendMutationVectorRequestV2(RestoreSendVersionedMutationsRequest req, + Reference self); ACTOR static Future handleApplyToDBRequest(RestoreVersionBatchRequest req, Reference self, Database cx); @@ -61,7 +63,7 @@ ACTOR Future restoreApplierCore(RestoreApplierInterface applierInterf, int when(RestoreSendVersionedMutationsRequest req = waitNext(applierInterf.sendMutationVector.getFuture())) { requestTypeStr = "sendMutationVector"; - actors.add(handleSendMutationVectorRequest(req, self)); + actors.add(handleSendMutationVectorRequestV2(req, self)); } when(RestoreVersionBatchRequest req = waitNext(applierInterf.applyToDB.getFuture())) { requestTypeStr = "applyToDB"; @@ -98,6 +100,68 @@ ACTOR Future restoreApplierCore(RestoreApplierInterface applierInterf, int return Void(); } +// The actor may be invovked multiple times and executed async. +// No race condition as long as we do not wait or yield when operate the shared data. +// Multiple such actors can run on different fileIDs, because mutations in different files belong to different versions; +// Only one actor can process mutations from the same file +ACTOR static Future handleSendMutationVectorRequestV2(RestoreSendVersionedMutationsRequest req, + Reference self) { + state Reference batchData = self->batch[req.batchIndex]; + // Assume: processedFileState[req.asset] will not be erased while the actor is active. + // Note: Insert new items into processedFileState will not invalidate the reference. + state NotifiedVersion& curFilePos = batchData->processedFileState[req.asset]; + + TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations", self->id()) + .detail("BatchIndex", req.batchIndex) + .detail("RestoreAsset", req.asset.toString()) + .detail("ProcessedFileVersion", curFilePos.get()) + .detail("Request", req.toString()); + + wait(curFilePos.whenAtLeast(req.prevVersion)); + + state bool isDuplicated = true; + if (curFilePos.get() == req.prevVersion) { + isDuplicated = false; + Version commitVersion = req.version; + MutationsVec mutations(req.mutations); + // Sanity check: mutations in range file is in [beginVersion, endVersion); + // mutations in log file is in [beginVersion, endVersion], both inclusive. + ASSERT_WE_THINK(commitVersion >= req.asset.beginVersion); + // Loader sends the endVersion to ensure all useful versions are sent + ASSERT_WE_THINK((req.isRangeFile && commitVersion <= req.asset.endVersion) || + (!req.isRangeFile && commitVersion <= req.asset.endVersion)); + + for (int mIndex = 0; mIndex < mutations.size(); mIndex++) { + MutationRef mutation = mutations[mIndex]; + TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations") + .detail("ApplierNode", self->id()) + .detail("RestoreAsset", req.asset.toString()) + .detail("Version", commitVersion) + .detail("Index", mIndex) + .detail("MutationReceived", mutation.toString()); + batchData->counters.receivedBytes += mutation.totalSize(); + batchData->counters.receivedWeightedBytes += mutation.weightedTotalSize(); // atomicOp will be amplified + batchData->counters.receivedMutations += 1; + batchData->counters.receivedAtomicOps += isAtomicOp((MutationRef::Type) mutation.type) ? 1 : 0; + // Sanity check + if (g_network->isSimulated()) { + if (isRangeMutation(mutation)) { + ASSERT(mutation.param1 >= req.asset.range.begin && + mutation.param2 <= req.asset.range.end); // Range mutation's right side is exclusive + } else { + ASSERT(mutation.param1 >= req.asset.range.begin && mutation.param1 < req.asset.range.end); + } + } + // Note: Log and range mutations may be delivered out of order. Can we handle it? + batchData->stagingKeys.addMutation(mutation, commitVersion); + } + curFilePos.set(req.version); + } + + req.reply.send(RestoreCommonReply(self->id(), isDuplicated)); + return Void(); +} + // The actor may be invovked multiple times and executed async. // No race condition as long as we do not wait or yield when operate the shared data. // Multiple such actors can run on different fileIDs, because mutations in different files belong to different versions; diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index 4883393da8..e18a3941fb 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -47,15 +47,9 @@ struct StagingKey { Version version; // largest version of set or clear for the key std::map pendingMutations; // mutations not set or clear type - // bool operator < (const StagingKey& rhs) const { - // return std::tie(key, version, type, value) - // } explicit StagingKey() : version(0) {} - explicit StagingKey(MutationRef m, Version version) - : key(m.param1), val(m.param2), type(m.type), version(versoin) {} void add(const MutationRef& m, Version newVersion) { - ASSERT(version > 0); // Only add mutation if (version < newVersion) { if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) { key = m.param1; @@ -81,7 +75,6 @@ struct StagingKey { .detail("ExistingKeyType", typeString[type]); } } // else input mutation is old and can be ignored - return; } } @@ -128,6 +121,13 @@ struct ApplierBatchData : public ReferenceCounted { } ~ApplierBatchData() = default; + void addMutation(MutationRef m, Version ver) { + if (stagingKeys.find(m.param1) == stagingKeys.end()) { + stagingKeys.emplace(m.param1, StagingKey()); + } + stagingKeys[m.param1].add(m, ver); + } + void reset() { kvOps.clear(); dbApplier = Optional>(); From acf34319c137b23aa8b6cce2f89c27d59ade4680 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Wed, 12 Feb 2020 21:45:29 -0800 Subject: [PATCH 03/22] FastRestore:Applier:Precompute mutations and apply in parallel Precompute mutations received by an applier; Only apply the final result to the destination DB; Execute multiple txns in parallel to apply final results to the destination DB. --- fdbserver/Knobs.cpp | 1 + fdbserver/Knobs.h | 1 + fdbserver/RestoreApplier.actor.cpp | 240 ++++++++++++++++++- fdbserver/RestoreApplier.actor.h | 96 +++++++- fdbserver/workloads/RandomSelector.actor.cpp | 4 +- 5 files changed, 326 insertions(+), 16 deletions(-) diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index be3c93ba76..89f2d4bb6c 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -553,6 +553,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula init( FASTRESTORE_ROLE_LOGGING_DELAY, 5 ); if( randomize ) { FASTRESTORE_ROLE_LOGGING_DELAY = deterministicRandom()->random01() * 60 + 1; } init( FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL, 5 ); if( randomize ) { FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL = deterministicRandom()->random01() * 60 + 1; } init( FASTRESTORE_ATOMICOP_WEIGHT, 100 ); if( randomize ) { FASTRESTORE_ATOMICOP_WEIGHT = deterministicRandom()->random01() * 200 + 1; } + init( FASTRESTORE_APPLYING_PARALLELISM, 100 ); if( randomize ) { FASTRESTORE_APPLYING_PARALLELISM = deterministicRandom()->random01() * 200 + 1; } // clang-format on diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index 16cff4f74d..3e471a4a36 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -495,6 +495,7 @@ public: int64_t FASTRESTORE_ROLE_LOGGING_DELAY; int64_t FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL; // How quickly to update process metrics for restore int64_t FASTRESTORE_ATOMICOP_WEIGHT; // workload amplication factor for atomic op + int64_t FASTRESTORE_APPLYING_PARALLELISM; // number of outstanding txns writing to dest. DB ServerKnobs(bool randomize = false, ClientKnobs* clientKnobs = NULL, bool isSimulated = false); }; diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index 3b2755b290..28867a11e6 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -153,7 +153,7 @@ ACTOR static Future handleSendMutationVectorRequestV2(RestoreSendVersioned } } // Note: Log and range mutations may be delivered out of order. Can we handle it? - batchData->stagingKeys.addMutation(mutation, commitVersion); + batchData->addMutation(mutation, commitVersion); } curFilePos.set(req.version); } @@ -162,6 +162,212 @@ ACTOR static Future handleSendMutationVectorRequestV2(RestoreSendVersioned return Void(); } +// Clear all ranges in input ranges +ACTOR static Future applyClearRangeMutations(Standalone> ranges, Database cx) { + state Reference tr(new ReadYourWritesTransaction(cx)); + loop { + try { + tr->reset(); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + for (auto& range : ranges) { + tr->clear(range); + } + wait(tr->commit()); + break; + } catch (Error& e) { + wait(tr->onError(e)); + } + } + return Void(); +} + +// Get keys in imcompleteStagingKeys and precompute the stagingKey which is stored in batchData->stagingKeys +ACTOR static Future getAndComputeStagingKeys( + std::map::iterator> imcompleteStagingKeys, Database cx, UID applierID) { + state Reference tr(new ReadYourWritesTransaction(cx)); + state std::vector>>> fKVs; + std::vector>> fValues; + TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID) + .detail("GetKeys", imcompleteStagingKeys.size()); + try { + tr->reset(); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + for (auto& key : imcompleteStagingKeys) { + fKVs.push_back(std::make_pair(key.first, tr->get(key.first))); + fValues.push_back(fKVs.back().second); + } + wait(waitForAll(fValues)); + } catch (Error& e) { + TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") + .detail("GetKeys", imcompleteStagingKeys.size()) + .detail("Error", e.what()) + .detail("ErrorCode", e.code()); + wait(tr->onError(e)); + } + + ASSERT(fKVs.size() == imcompleteStagingKeys.size()); + // TODO: Optimize the performance by reducing map lookup: making getKey's future a field in the input map + for (auto& kv : fKVs) { + if (!kv.second.get().present()) { + TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") + .detail("ValueNotPresent", kv.first); + continue; + } else { + std::map::iterator iter = imcompleteStagingKeys[kv.first]; + // The key's version ideally should be the most recently committed version. + // But as long as it is > 1 and less than the start version of the version batch, it is the same result. + MutationRef m(MutationRef::SetValue, kv.first, kv.second.get().get()); + iter->second.add(m, (Version)1); + iter->second.precomputeResult(); + } + } + + TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID) + .detail("GetKeys", imcompleteStagingKeys.size()); + + return Void(); +} + +ACTOR static Future precomputeMutationsResult(Reference batchData, UID applierID, + int64_t batchIndex, Database cx) { + TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID) + .detail("BatchIndex", batchIndex) + .detail("Step", "Applying clear range mutations"); + ; + // Apply range mutations (i.e., clearRange) to database cx + state std::vector> fClearRanges; + std::vector>> clearBuf; + clearBuf.push_back(Standalone>()); + Standalone> clearRanges = clearBuf.back(); + double curTxnSize = 0; + for (auto& rangeMutation : batchData->stagingKeyRanges) { + KeyRangeRef range(rangeMutation.mutation.param1, rangeMutation.mutation.param2); + clearRanges.push_back(clearRanges.arena(), range); + curTxnSize += range.expectedSize(); + if (curTxnSize >= SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) { + fClearRanges.push_back(applyClearRangeMutations(clearRanges, cx)); + clearBuf.push_back(Standalone>()); + clearRanges = clearBuf.back(); + curTxnSize = 0; + } + } + if (curTxnSize > 0) { + fClearRanges.push_back(applyClearRangeMutations(clearRanges, cx)); + } + + // Apply range mutations (i.e., clearRange) to stagingKeyRanges + for (auto& rangeMutation : batchData->stagingKeyRanges) { + std::map::iterator lb = batchData->stagingKeys.lower_bound(rangeMutation.mutation.param1); + std::map::iterator ub = batchData->stagingKeys.upper_bound(rangeMutation.mutation.param1); + while (lb != ub) { + lb->second.add(rangeMutation.mutation, rangeMutation.version); + } + } + + wait(waitForAll(fClearRanges)); + TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID) + .detail("BatchIndex", batchIndex) + .detail("Step", "Getting and computing staging keys"); + + // Get keys in stagingKeys which does not have a baseline key by reading database cx, and precompute the key's value + std::map::iterator> imcompleteStagingKeys; + std::map::iterator stagingKeyIter = batchData->stagingKeys.begin(); + for (; stagingKeyIter != batchData->stagingKeys.end(); stagingKeyIter++) { + if (!stagingKeyIter->second.hasBaseValue()) { + imcompleteStagingKeys.emplace(stagingKeyIter->first, stagingKeyIter); + } + } + + Future fGetAndComputeKeys = getAndComputeStagingKeys(imcompleteStagingKeys, cx, applierID); + + TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID) + .detail("BatchIndex", batchIndex) + .detail("Step", "Compute the other staging keys"); + // Pre-compute pendingMutations to other keys in stagingKeys that has base value + for (stagingKeyIter = batchData->stagingKeys.begin(); stagingKeyIter != batchData->stagingKeys.end(); + stagingKeyIter++) { + if (stagingKeyIter->second.hasBaseValue()) { + stagingKeyIter->second.precomputeResult(); + } + } + + wait(fGetAndComputeKeys); + + // Sanity check all stagingKeys have been precomputed + ASSERT_WE_THINK(batchData->allKeysPrecomputed()); + + TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResultDone", applierID).detail("BatchIndex", batchIndex); + + return Void(); +} + +// Apply mutations in batchData->stagingKeys [begin, end). +ACTOR static Future applyStagingKeysBatch(std::map::iterator begin, + std::map::iterator end, Database cx, + FlowLock* applyStagingKeysBatchLock) { + wait(applyStagingKeysBatchLock->take(TaskPriority::RestoreApplierWriteDB)); + state FlowLock::Releaser releaser(*applyStagingKeysBatchLock); + state Reference tr(new ReadYourWritesTransaction(cx)); + loop { + try { + tr->reset(); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + std::map::iterator iter = begin; + while (iter != end) { + if (iter->second.type == MutationRef::SetValue) { + tr->set(iter->second.key, iter->second.val); + } else if (iter->second.type == MutationRef::ClearRange) { + tr->clear(KeyRangeRef(iter->second.key, iter->second.val)); + } else { + ASSERT(false); + } + iter++; + } + wait(tr->commit()); + break; + } catch (Error& e) { + wait(tr->onError(e)); + } + } + return Void(); +} + +// Apply mutations in stagingKeys in batches in parallel +ACTOR static Future applyStagingKeys(Reference batchData, UID applierID, int64_t batchIndex, + Database cx) { + std::map::iterator begin = batchData->stagingKeys.begin(); + std::map::iterator cur = begin; + double txnSize = 0; + std::vector> fBatches; + TraceEvent("FastRestoreApplerPhaseApplyStagingKeys", applierID).detail("BatchIndex", batchIndex); + while (cur != batchData->stagingKeys.end()) { + txnSize += cur->second.expectedMutationSize(); + if (txnSize > SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) { + fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock)); + begin = cur; + } + cur++; + } + fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock)); + + wait(waitForAll(fBatches)); + + TraceEvent("FastRestoreApplerPhaseApplyStagingKeysDone", applierID).detail("BatchIndex", batchIndex); + return Void(); +} + +ACTOR Future applyToDBV2(UID applierID, int64_t batchIndex, Reference batchData, Database cx) { + TraceEvent("FastRestoreApplerPhaseApplyTxn", applierID).detail("BatchIndex", batchIndex); + wait(precomputeMutationsResult(batchData, applierID, batchIndex, cx)); + + wait(applyStagingKeys(batchData, applierID, batchIndex, cx)); + + return Void(); +} + // The actor may be invovked multiple times and executed async. // No race condition as long as we do not wait or yield when operate the shared data. // Multiple such actors can run on different fileIDs, because mutations in different files belong to different versions; @@ -546,7 +752,8 @@ ACTOR static Future handleApplyToDBRequest(RestoreVersionBatchRequest req, if (!batchData->dbApplier.present()) { isDuplicated = false; batchData->dbApplier = Never(); - batchData->dbApplier = applyToDB(self->id(), req.batchIndex, batchData, cx); + // batchData->dbApplier = applyToDB(self->id(), req.batchIndex, batchData, cx); + batchData->dbApplier = applyToDBV2(self->id(), req.batchIndex, batchData, cx); } ASSERT(batchData->dbApplier.present()); @@ -562,4 +769,31 @@ ACTOR static Future handleApplyToDBRequest(RestoreVersionBatchRequest req, req.reply.send(RestoreCommonReply(self->id(), isDuplicated)); return Void(); -} \ No newline at end of file +} + +// Copy from WriteDuringRead.actor.cpp +Value applyAtomicOp(Optional existingValue, Value value, MutationRef::Type type) { + Arena arena; + if (type == MutationRef::SetValue) + return value; + else if (type == MutationRef::AddValue) + return doLittleEndianAdd(existingValue, value, arena); + else if (type == MutationRef::AppendIfFits) + return doAppendIfFits(existingValue, value, arena); + else if (type == MutationRef::And) + return doAndV2(existingValue, value, arena); + else if (type == MutationRef::Or) + return doOr(existingValue, value, arena); + else if (type == MutationRef::Xor) + return doXor(existingValue, value, arena); + else if (type == MutationRef::Max) + return doMax(existingValue, value, arena); + else if (type == MutationRef::Min) + return doMinV2(existingValue, value, arena); + else if (type == MutationRef::ByteMin) + return doByteMin(existingValue, value, arena); + else if (type == MutationRef::ByteMax) + return doByteMax(existingValue, value, arena); + ASSERT(false); + return Value(); +} diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index e18a3941fb..2e9d10f896 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -30,6 +30,7 @@ #include #include "flow/Stats.h" #include "fdbclient/FDBTypes.h" +#include "fdbclient/Atomic.h" #include "fdbclient/CommitTransaction.h" #include "fdbrpc/fdbrpc.h" #include "fdbrpc/Locality.h" @@ -40,6 +41,8 @@ #include "flow/actorcompiler.h" // has to be last include +Value applyAtomicOp(Optional existingValue, Value value, MutationRef::Type type); + struct StagingKey { Key key; // TODO: Maybe not needed? Value val; @@ -47,21 +50,22 @@ struct StagingKey { Version version; // largest version of set or clear for the key std::map pendingMutations; // mutations not set or clear type - explicit StagingKey() : version(0) {} + explicit StagingKey() : version(0), type(MutationRef::MAX_ATOMIC_OP) {} void add(const MutationRef& m, Version newVersion) { if (version < newVersion) { if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) { key = m.param1; val = m.param2; - type = m.type; + type = (MutationRef::Type)m.type; version = newVersion; } else { if (pendingMutations.find(newVersion) == pendingMutations.end()) { pendingMutations.emplace(newVersion, MutationsVec()); } // TODO: Do we really need deep copy? - pendingMutations[newVersion].push_back_deep(pendingMutations.arena(), m); + MutationsVec& mutations = pendingMutations[newVersion]; + mutations.push_back_deep(mutations.arena(), m); } } else if (version == newVersion) { TraceEvent("FastRestoreApplierStagingKeyMutationAtSameVersion") @@ -76,13 +80,63 @@ struct StagingKey { } } // else input mutation is old and can be ignored } -} + + void precomputeResult() { + std::map::iterator lb = pendingMutations.lower_bound(version); + if (lb->first == version) { + // Sanity check mutations at version are either atomicOps which can be ignored or the same value as buffered + for (int i = 0; i < lb->second.size(); i++) { + MutationRef m = lb->second[i]; + if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) { + if (std::tie(type, key, val) != std::tie(m.type, m.param1, m.param2)) { + TraceEvent(SevError, "FastRestoreApplierPrecomputeResultUnhandledSituation") + .detail("BufferedType", typeString[type]) + .detail("PendingType", typeString[m.type]) + .detail("BufferedVal", val.toString()) + .detail("PendingVal", m.param2.toString()); + } + } + } + } + while (lb != pendingMutations.end()) { + if (lb->first == version) { + continue; + } + for (auto& atomicOp : lb->second) { + val = applyAtomicOp(val, atomicOp.param2, (MutationRef::Type)atomicOp.type); + } + version = lb->first; + } + } + + // Does the key has at least 1 set or clear mutation to get the base value + bool hasBaseValue() { + if (version > 0) { + ASSERT(type == MutationRef::SetValue || type == MutationRef::ClearRange); + } + return version > 0; + } + + // Has all pendingMutations been pre-applied to the val? + bool hasPrecomputed() { + ASSERT(pendingMutations.rbegin()->first >= pendingMutations.begin()->first); + return version >= pendingMutations.rbegin()->first; + } + + int expectedMutationSize() { return key.size() + val.size(); } +}; struct StagingKeyRange { - KeyRange range; - MutationRef::Type type; // clearrange + Standalone mutation; Version version; -} + + explicit StagingKeyRange(MutationRef m, Version newVersion) : mutation(m), version(newVersion) {} + + bool operator<(const StagingKeyRange& rhs) const { + return std::tie(version, mutation.type, mutation.param1, mutation.param2) < + std::tie(rhs.version, rhs.mutation.type, rhs.mutation.param1, rhs.mutation.param2); + } +}; struct ApplierBatchData : public ReferenceCounted { // processedFileState: key: RestoreAsset; value: largest version of mutation received on the applier @@ -91,6 +145,7 @@ struct ApplierBatchData : public ReferenceCounted { VersionedMutationsMap kvOps; // Mutations at each version std::map stagingKeys; std::set stagingKeyRanges; + FlowLock applyStagingKeysBatchLock; Future pollMetrics; @@ -113,7 +168,8 @@ struct ApplierBatchData : public ReferenceCounted { void addref() { return ReferenceCounted::addref(); } void delref() { return ReferenceCounted::delref(); } - explicit ApplierBatchData(UID nodeID, int batchIndex) : counters(this, nodeID, batchIndex) { + explicit ApplierBatchData(UID nodeID, int batchIndex) + : counters(this, nodeID, batchIndex), applyStagingKeysBatchLock(SERVER_KNOBS->FASTRESTORE_APPLYING_PARALLELISM) { pollMetrics = traceCounters("FastRestoreApplierMetrics", nodeID, SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY, &counters.cc, nodeID.toString() + "/RestoreApplierMetrics/" + std::to_string(batchIndex)); @@ -122,10 +178,28 @@ struct ApplierBatchData : public ReferenceCounted { ~ApplierBatchData() = default; void addMutation(MutationRef m, Version ver) { - if (stagingKeys.find(m.param1) == stagingKeys.end()) { - stagingKeys.emplace(m.param1, StagingKey()); + if (!isRangeMutation(m)) { + if (stagingKeys.find(m.param1) == stagingKeys.end()) { + stagingKeys.emplace(m.param1, StagingKey()); + } + stagingKeys[m.param1].add(m, ver); + } else { + stagingKeyRanges.insert(StagingKeyRange(m, ver)); } - stagingKeys[m.param1].add(m, ver); + } + + // Return true if all staging keys have been precomputed + bool allKeysPrecomputed() { + for (auto& stagingKey : stagingKeys) { + if (!stagingKey.second.hasPrecomputed()) { + TraceEvent("FastRestoreApplierAllKeysPrecomputedFalse") + .detail("Key", stagingKey.first) + .detail("BufferedVersion", stagingKey.second.version) + .detail("MaxPendingVersion", stagingKey.second.pendingMutations.rbegin()->first); + return false; + } + } + return true; } void reset() { diff --git a/fdbserver/workloads/RandomSelector.actor.cpp b/fdbserver/workloads/RandomSelector.actor.cpp index 8e27a0fe55..8e0286fa53 100644 --- a/fdbserver/workloads/RandomSelector.actor.cpp +++ b/fdbserver/workloads/RandomSelector.actor.cpp @@ -288,7 +288,7 @@ struct RandomSelectorWorkload : TestWorkload { myValue = format("%d", deterministicRandom()->randomInt( 0, 10000000 ) ); //TraceEvent("RYOWor").detail("Key",myKeyA).detail("Value", myValue); trRYOW.atomicOp(StringRef(clientID + "b/" + myKeyA), myValue, MutationRef::Or); - + loop { try { tr.set(StringRef(clientID + "z/" + myRandomIDKey), StringRef()); @@ -311,7 +311,7 @@ struct RandomSelectorWorkload : TestWorkload { myValue = format("%d", deterministicRandom()->randomInt( 0, 10000000 ) ); //TraceEvent("RYOWxor").detail("Key",myKeyA).detail("Value", myValue); trRYOW.atomicOp(StringRef(clientID + "b/" + myKeyA), myValue, MutationRef::Xor); - + loop { try { tr.set(StringRef(clientID + "z/" + myRandomIDKey), StringRef()); From 4394605b6fe1056d725064cb0a26dd1c13167a7c Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Thu, 13 Feb 2020 09:14:59 -0800 Subject: [PATCH 04/22] FastRestore:Applier:Fix:Final result mutation type must be set --- fdbserver/RestoreApplier.actor.h | 1 + 1 file changed, 1 insertion(+) diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index 2e9d10f896..7601d59455 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -106,6 +106,7 @@ struct StagingKey { val = applyAtomicOp(val, atomicOp.param2, (MutationRef::Type)atomicOp.type); } version = lb->first; + type = MutationRef::SetValue; // Precomputed result should be set to DB. } } From 238b2cb8e4fcaea53d08f13c2fd593ed5c596e3c Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Thu, 13 Feb 2020 09:24:53 -0800 Subject: [PATCH 05/22] FastRestore:Applier:Fix various bugs 1. segmentation error 2. there exist mutations that is not set or clear or atomicOp, precompute result should ignore them. --- fdbserver/RestoreApplier.actor.cpp | 6 +++++- fdbserver/RestoreApplier.actor.h | 26 ++++++++++++++++++++++---- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index 28867a11e6..9a327d07bb 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -293,6 +293,7 @@ ACTOR static Future precomputeMutationsResult(Reference } } + TraceEvent("FastRestoreApplierGetAndComputeStagingKeysWaitOn", applierID); wait(fGetAndComputeKeys); // Sanity check all stagingKeys have been precomputed @@ -794,6 +795,9 @@ Value applyAtomicOp(Optional existingValue, Value value, MutationRef: return doByteMin(existingValue, value, arena); else if (type == MutationRef::ByteMax) return doByteMax(existingValue, value, arena); - ASSERT(false); + else { + TraceEvent(SevError, "ApplyAtomicOpUnhandledType").detail("Type", type).detail("TypeStr", typeString[type]); + ASSERT(false); + } return Value(); } diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index 7601d59455..418f1940e4 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -82,7 +82,14 @@ struct StagingKey { } void precomputeResult() { + TraceEvent(SevDebug, "FastRestoreApplierPrecomputeResult") + .detail("Key", key) + .detail("Version", version) + .detail("LargestPendingVersion", (pendingMutations.empty() ? -1 : pendingMutations.rbegin()->first)); std::map::iterator lb = pendingMutations.lower_bound(version); + if (lb == pendingMutations.end()) { + return; + } if (lb->first == version) { // Sanity check mutations at version are either atomicOps which can be ignored or the same value as buffered for (int i = 0; i < lb->second.size(); i++) { @@ -102,8 +109,18 @@ struct StagingKey { if (lb->first == version) { continue; } - for (auto& atomicOp : lb->second) { - val = applyAtomicOp(val, atomicOp.param2, (MutationRef::Type)atomicOp.type); + for (auto& mutation : lb->second) { + if (isAtomicOp((MutationRef::Type) mutation.type)) { + val = applyAtomicOp(val, mutation.param2, (MutationRef::Type)mutation.type); + } else if (mutation.type == MutationRef::SetValue || mutation.type == MutationRef::ClearRange) { + TraceEvent(SevError, "FastRestoreApplierPrecomputeResultUnexpectedSet") + .detail("Type", typeString[mutation.type]) + .detail("Version", lb->first); + } else { + TraceEvent(SevWarnAlways, "FastRestoreApplierPrecomputeResultSkipUnexpectedBackupMutation") + .detail("Type", typeString[mutation.type]) + .detail("Version", lb->first); + } } version = lb->first; type = MutationRef::SetValue; // Precomputed result should be set to DB. @@ -120,8 +137,8 @@ struct StagingKey { // Has all pendingMutations been pre-applied to the val? bool hasPrecomputed() { - ASSERT(pendingMutations.rbegin()->first >= pendingMutations.begin()->first); - return version >= pendingMutations.rbegin()->first; + ASSERT(pendingMutations.empty() || pendingMutations.rbegin()->first >= pendingMutations.begin()->first); + return pendingMutations.empty() || version >= pendingMutations.rbegin()->first; } int expectedMutationSize() { return key.size() + val.size(); } @@ -200,6 +217,7 @@ struct ApplierBatchData : public ReferenceCounted { return false; } } + TraceEvent("FastRestoreApplierAllKeysPrecomputed"); return true; } From b008df97ebda62a57ae2db1ec1708746faf4075d Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Thu, 13 Feb 2020 10:07:19 -0800 Subject: [PATCH 06/22] FastRestore:Applier:Multiple set-clear mutations at same version --- fdbserver/RestoreApplier.actor.cpp | 2 +- fdbserver/RestoreApplier.actor.h | 11 +++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index 9a327d07bb..6598a89afd 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -796,7 +796,7 @@ Value applyAtomicOp(Optional existingValue, Value value, MutationRef: else if (type == MutationRef::ByteMax) return doByteMax(existingValue, value, arena); else { - TraceEvent(SevError, "ApplyAtomicOpUnhandledType").detail("Type", type).detail("TypeStr", typeString[type]); + TraceEvent(SevError, "ApplyAtomicOpUnhandledType").detail("Type", (int) type).detail("TypeName", typeString[type]); ASSERT(false); } return Value(); diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index 418f1940e4..994a8a1933 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -73,10 +73,13 @@ struct StagingKey { .detail("NewMutation", m.toString()) .detail("ExistingKeyType", typeString[type]); if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) { - TraceEvent(SevError, "FastRestoreApplierStagingKeyMutationAtSameVersionUnhandled") - .detail("Version", newVersion) - .detail("NewMutation", m.toString()) - .detail("ExistingKeyType", typeString[type]); + if (m.type != type || m.param2 != val) { + TraceEvent(SevError, "FastRestoreApplierStagingKeyMutationAtSameVersionUnhandled") + .detail("Version", newVersion) + .detail("NewMutation", m.toString()) + .detail("ExistingKeyType", typeString[type]) + .detail("ExitingKeyValue", val); + } } } // else input mutation is old and can be ignored } From d3c01763d906bc8103333511a721f6d309629b95 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Thu, 13 Feb 2020 10:48:36 -0800 Subject: [PATCH 07/22] FastRestore:Applier:Handle version stamped key values --- fdbclient/Atomic.h | 18 +++++++++--------- fdbserver/RestoreApplier.actor.cpp | 16 +++++++++++----- fdbserver/RestoreApplier.actor.h | 19 +++++++++++++++++++ 3 files changed, 39 insertions(+), 14 deletions(-) diff --git a/fdbclient/Atomic.h b/fdbclient/Atomic.h index 3e5a71abff..715a7a71c1 100644 --- a/fdbclient/Atomic.h +++ b/fdbclient/Atomic.h @@ -28,11 +28,11 @@ static ValueRef doLittleEndianAdd(const Optional& existingValueOptiona const ValueRef& existingValue = existingValueOptional.present() ? existingValueOptional.get() : StringRef(); if(!existingValue.size()) return otherOperand; if(!otherOperand.size()) return otherOperand; - + uint8_t* buf = new (ar) uint8_t [otherOperand.size()]; int i = 0; int carry = 0; - + for(i = 0; i& existingValueOptiona carry = sum >> 8; } - return StringRef(buf, i); + return StringRef(buf, i); } static ValueRef doAnd(const Optional& existingValueOptional, const ValueRef& otherOperand, Arena& ar) { const ValueRef& existingValue = existingValueOptional.present() ? existingValueOptional.get() : StringRef(); if(!otherOperand.size()) return otherOperand; - + uint8_t* buf = new (ar) uint8_t [otherOperand.size()]; int i = 0; - + for(i = 0; i& existingValueOptional, const Valu uint8_t* buf = new (ar) uint8_t [otherOperand.size()]; int i = 0; - + for(i = 0; i& existingValueOptional, const Val const ValueRef& existingValue = existingValueOptional.present() ? existingValueOptional.get() : StringRef(); if(!existingValue.size()) return otherOperand; if(!otherOperand.size()) return otherOperand; - + uint8_t* buf = new (ar) uint8_t [otherOperand.size()]; int i = 0; - + for(i = 0; i& existingValueOptional, const V static ValueRef doByteMin(const Optional& existingValueOptional, const ValueRef& otherOperand, Arena& ar) { if (!existingValueOptional.present()) return otherOperand; - + const ValueRef& existingValue = existingValueOptional.get(); if (existingValue < otherOperand) return existingValue; diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index 6598a89afd..adb1f67800 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -123,6 +123,7 @@ ACTOR static Future handleSendMutationVectorRequestV2(RestoreSendVersioned if (curFilePos.get() == req.prevVersion) { isDuplicated = false; Version commitVersion = req.version; + uint16_t numVersionStampedKV = 0; MutationsVec mutations(req.mutations); // Sanity check: mutations in range file is in [beginVersion, endVersion); // mutations in log file is in [beginVersion, endVersion], both inclusive. @@ -153,7 +154,12 @@ ACTOR static Future handleSendMutationVectorRequestV2(RestoreSendVersioned } } // Note: Log and range mutations may be delivered out of order. Can we handle it? - batchData->addMutation(mutation, commitVersion); + if (mutation.type == MutationRef::SetVersionstampedKey || mutation.type == MutationRef::SetVersionstampedValue) { + batchData->addVersionStampedKV(mutation, commitVersion, numVersionStampedKV); + numVersionStampedKV++; + } else { + batchData->addMutation(mutation, commitVersion); + } } curFilePos.set(req.version); } @@ -775,9 +781,7 @@ ACTOR static Future handleApplyToDBRequest(RestoreVersionBatchRequest req, // Copy from WriteDuringRead.actor.cpp Value applyAtomicOp(Optional existingValue, Value value, MutationRef::Type type) { Arena arena; - if (type == MutationRef::SetValue) - return value; - else if (type == MutationRef::AddValue) + if (type == MutationRef::AddValue) return doLittleEndianAdd(existingValue, value, arena); else if (type == MutationRef::AppendIfFits) return doAppendIfFits(existingValue, value, arena); @@ -795,8 +799,10 @@ Value applyAtomicOp(Optional existingValue, Value value, MutationRef: return doByteMin(existingValue, value, arena); else if (type == MutationRef::ByteMax) return doByteMax(existingValue, value, arena); + else if (type == MutationRef::CompareAndClear) + return doCompareAndClear(existingValue, value, arena); else { - TraceEvent(SevError, "ApplyAtomicOpUnhandledType").detail("Type", (int) type).detail("TypeName", typeString[type]); + TraceEvent(SevError, "ApplyAtomicOpUnhandledType").detail("TypeCode", (int) type).detail("TypeName", typeString[type]); ASSERT(false); } return Value(); diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index 994a8a1933..32d67a171c 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -52,7 +52,10 @@ struct StagingKey { explicit StagingKey() : version(0), type(MutationRef::MAX_ATOMIC_OP) {} + // Add mutation m at newVersion to stagingKey + // Assume: SetVersionstampedKey and SetVersionstampedValue have been converted to set void add(const MutationRef& m, Version newVersion) { + ASSERT(m.type != MutationRef::SetVersionstampedKey && m.type != MutationRef::SetVersionstampedValue); if (version < newVersion) { if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) { key = m.param1; @@ -209,6 +212,22 @@ struct ApplierBatchData : public ReferenceCounted { } } + void addVersionStampedKV(MutationRef m, Version ver, uint16_t numVersionStampedKV) { + if (m.type == MutationRef::SetVersionstampedKey) { + // Assume transactionNumber = 0 does not affect result + TraceEvent(SevDebug, "FastRestoreApplierAddMutation").detail("MutationType", typeString[m.type]).detail("FakedTransactionNumber", numVersionStampedKV); + transformVersionstampMutation(m, &MutationRef::param1, ver, numVersionStampedKV); + addMutation(m, ver); + } else if (m.type == MutationRef::SetVersionstampedValue) { + // Assume transactionNumber = 0 does not affect result + TraceEvent(SevDebug, "FastRestoreApplierAddMutation").detail("MutationType", typeString[m.type]).detail("FakedTransactionNumber", numVersionStampedKV); + transformVersionstampMutation(m, &MutationRef::param2, ver, numVersionStampedKV); + addMutation(m, ver); + } else { + ASSERT(false); + } + } + // Return true if all staging keys have been precomputed bool allKeysPrecomputed() { for (auto& stagingKey : stagingKeys) { From 58dad5373b246db20435945a88a8265cc3ec6971 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Thu, 13 Feb 2020 11:06:20 -0800 Subject: [PATCH 08/22] FastRestore:Applier:Handle CompareAndClear atomicOp --- fdbserver/RestoreApplier.actor.cpp | 9 ++++----- fdbserver/RestoreApplier.actor.h | 11 +++++++++-- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index adb1f67800..1fa8787531 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -778,14 +778,15 @@ ACTOR static Future handleApplyToDBRequest(RestoreVersionBatchRequest req, return Void(); } -// Copy from WriteDuringRead.actor.cpp +// Copy from WriteDuringRead.actor.cpp with small modifications +// Not all AtomicOps are handled in this function: SetVersionstampedKey, SetVersionstampedValue, Value applyAtomicOp(Optional existingValue, Value value, MutationRef::Type type) { Arena arena; if (type == MutationRef::AddValue) return doLittleEndianAdd(existingValue, value, arena); else if (type == MutationRef::AppendIfFits) return doAppendIfFits(existingValue, value, arena); - else if (type == MutationRef::And) + else if (type == MutationRef::And || type == MutationRef::AndV2) return doAndV2(existingValue, value, arena); else if (type == MutationRef::Or) return doOr(existingValue, value, arena); @@ -793,14 +794,12 @@ Value applyAtomicOp(Optional existingValue, Value value, MutationRef: return doXor(existingValue, value, arena); else if (type == MutationRef::Max) return doMax(existingValue, value, arena); - else if (type == MutationRef::Min) + else if (type == MutationRef::Min || type == MutationRef::MinV2) return doMinV2(existingValue, value, arena); else if (type == MutationRef::ByteMin) return doByteMin(existingValue, value, arena); else if (type == MutationRef::ByteMax) return doByteMax(existingValue, value, arena); - else if (type == MutationRef::CompareAndClear) - return doCompareAndClear(existingValue, value, arena); else { TraceEvent(SevError, "ApplyAtomicOpUnhandledType").detail("TypeCode", (int) type).detail("TypeName", typeString[type]); ASSERT(false); diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index 32d67a171c..e83f3298d2 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -116,9 +116,17 @@ struct StagingKey { continue; } for (auto& mutation : lb->second) { - if (isAtomicOp((MutationRef::Type) mutation.type)) { + if (type == MutationRef::CompareAndClear) { // Special atomicOp + Optional retVal = doCompareAndClear(existingValue, value, arena); + if (!retVal.present()) { + val = key; + type = MutationRef::ClearRange; + } // else no-op + } else if (isAtomicOp((MutationRef::Type) mutation.type)) { val = applyAtomicOp(val, mutation.param2, (MutationRef::Type)mutation.type); + type = MutationRef::SetValue; // Precomputed result should be set to DB. } else if (mutation.type == MutationRef::SetValue || mutation.type == MutationRef::ClearRange) { + type = MutationRef::SetValue; // Precomputed result should be set to DB. TraceEvent(SevError, "FastRestoreApplierPrecomputeResultUnexpectedSet") .detail("Type", typeString[mutation.type]) .detail("Version", lb->first); @@ -129,7 +137,6 @@ struct StagingKey { } } version = lb->first; - type = MutationRef::SetValue; // Precomputed result should be set to DB. } } From b1b44d44770c0a42668b492f75ee0c2c5956691d Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Thu, 13 Feb 2020 11:06:29 -0800 Subject: [PATCH 09/22] FastRestore:Applier:Handle CompareAndClear atomicOp --- fdbserver/RestoreApplier.actor.cpp | 2 +- fdbserver/RestoreApplier.actor.h | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index 1fa8787531..373d227b21 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -779,7 +779,7 @@ ACTOR static Future handleApplyToDBRequest(RestoreVersionBatchRequest req, } // Copy from WriteDuringRead.actor.cpp with small modifications -// Not all AtomicOps are handled in this function: SetVersionstampedKey, SetVersionstampedValue, +// Not all AtomicOps are handled in this function: SetVersionstampedKey, SetVersionstampedValue, and CompareAndClear Value applyAtomicOp(Optional existingValue, Value value, MutationRef::Type type) { Arena arena; if (type == MutationRef::AddValue) diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index e83f3298d2..241d09bc4b 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -117,7 +117,8 @@ struct StagingKey { } for (auto& mutation : lb->second) { if (type == MutationRef::CompareAndClear) { // Special atomicOp - Optional retVal = doCompareAndClear(existingValue, value, arena); + Arena arena; + Optional retVal = doCompareAndClear(val, mutation.param2, arena); if (!retVal.present()) { val = key; type = MutationRef::ClearRange; From b5e60585aac45a8a8eb5d666b0d87208022c20ea Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Thu, 13 Feb 2020 11:26:56 -0800 Subject: [PATCH 10/22] FastRestore:Applier:Fix precompute mutation result --- fdbserver/RestoreApplier.actor.cpp | 18 +++++++++++++----- fdbserver/RestoreApplier.actor.h | 4 +++- fdbserver/RestoreLoader.actor.cpp | 2 +- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index 373d227b21..323bfdfd5a 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -111,7 +111,7 @@ ACTOR static Future handleSendMutationVectorRequestV2(RestoreSendVersioned // Note: Insert new items into processedFileState will not invalidate the reference. state NotifiedVersion& curFilePos = batchData->processedFileState[req.asset]; - TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations", self->id()) + TraceEvent(SevDebug, "FastRestoreApplierPhaseReceiveMutations", self->id()) .detail("BatchIndex", req.batchIndex) .detail("RestoreAsset", req.asset.toString()) .detail("ProcessedFileVersion", curFilePos.get()) @@ -165,6 +165,11 @@ ACTOR static Future handleSendMutationVectorRequestV2(RestoreSendVersioned } req.reply.send(RestoreCommonReply(self->id(), isDuplicated)); + TraceEvent(SevDebug, "FastRestoreApplierPhaseReceiveMutationsDone", self->id()) + .detail("BatchIndex", req.batchIndex) + .detail("RestoreAsset", req.asset.toString()) + .detail("ProcessedFileVersion", curFilePos.get()) + .detail("Request", req.toString()); return Void(); } @@ -238,11 +243,10 @@ ACTOR static Future getAndComputeStagingKeys( ACTOR static Future precomputeMutationsResult(Reference batchData, UID applierID, int64_t batchIndex, Database cx) { + // Apply range mutations (i.e., clearRange) to database cx TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID) .detail("BatchIndex", batchIndex) - .detail("Step", "Applying clear range mutations"); - ; - // Apply range mutations (i.e., clearRange) to database cx + .detail("Step", "Applying clear range mutations to DB"); state std::vector> fClearRanges; std::vector>> clearBuf; clearBuf.push_back(Standalone>()); @@ -264,11 +268,15 @@ ACTOR static Future precomputeMutationsResult(Reference } // Apply range mutations (i.e., clearRange) to stagingKeyRanges + TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID) + .detail("BatchIndex", batchIndex) + .detail("Step", "Applying clear range mutations to staging keys"); for (auto& rangeMutation : batchData->stagingKeyRanges) { std::map::iterator lb = batchData->stagingKeys.lower_bound(rangeMutation.mutation.param1); - std::map::iterator ub = batchData->stagingKeys.upper_bound(rangeMutation.mutation.param1); + std::map::iterator ub = batchData->stagingKeys.upper_bound(rangeMutation.mutation.param2); while (lb != ub) { lb->second.add(rangeMutation.mutation, rangeMutation.version); + lb++; } } diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index 241d09bc4b..d920a9e286 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -70,7 +70,7 @@ struct StagingKey { MutationsVec& mutations = pendingMutations[newVersion]; mutations.push_back_deep(mutations.arena(), m); } - } else if (version == newVersion) { + } else if (version == newVersion) { // Sanity check TraceEvent("FastRestoreApplierStagingKeyMutationAtSameVersion") .detail("Version", newVersion) .detail("NewMutation", m.toString()) @@ -113,6 +113,7 @@ struct StagingKey { } while (lb != pendingMutations.end()) { if (lb->first == version) { + lb++; continue; } for (auto& mutation : lb->second) { @@ -138,6 +139,7 @@ struct StagingKey { } } version = lb->first; + lb++; } } diff --git a/fdbserver/RestoreLoader.actor.cpp b/fdbserver/RestoreLoader.actor.cpp index 41aeb58c47..60e3c27849 100644 --- a/fdbserver/RestoreLoader.actor.cpp +++ b/fdbserver/RestoreLoader.actor.cpp @@ -264,7 +264,7 @@ ACTOR Future handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ .detail("BatchIndex", req.batchIndex) .detail("UseRangeFile", req.useRangeFile); wait(batchStatus->sendAllRanges.get()); - } else if (req.useRangeFile) { + } else { TraceEvent(SevDebug, "FastRestoreSendMutationsSkipDuplicateRangeRequest", self->id()) .detail("BatchIndex", req.batchIndex) .detail("UseRangeFile", req.useRangeFile); From 0b277868116288796d54469d4933583a71a971ac Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Thu, 13 Feb 2020 13:17:32 -0800 Subject: [PATCH 11/22] FastRestore:Applier:Minor change for clang-format --- fdbserver/RestoreApplier.actor.cpp | 7 +++++-- fdbserver/RestoreApplier.actor.h | 18 +++++++++++------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index 323bfdfd5a..0fe9d1a866 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -154,7 +154,8 @@ ACTOR static Future handleSendMutationVectorRequestV2(RestoreSendVersioned } } // Note: Log and range mutations may be delivered out of order. Can we handle it? - if (mutation.type == MutationRef::SetVersionstampedKey || mutation.type == MutationRef::SetVersionstampedValue) { + if (mutation.type == MutationRef::SetVersionstampedKey || + mutation.type == MutationRef::SetVersionstampedValue) { batchData->addVersionStampedKV(mutation, commitVersion, numVersionStampedKV); numVersionStampedKV++; } else { @@ -809,7 +810,9 @@ Value applyAtomicOp(Optional existingValue, Value value, MutationRef: else if (type == MutationRef::ByteMax) return doByteMax(existingValue, value, arena); else { - TraceEvent(SevError, "ApplyAtomicOpUnhandledType").detail("TypeCode", (int) type).detail("TypeName", typeString[type]); + TraceEvent(SevError, "ApplyAtomicOpUnhandledType") + .detail("TypeCode", (int)type) + .detail("TypeName", typeString[type]); ASSERT(false); } return Value(); diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index d920a9e286..a5ba90b3d9 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -78,10 +78,10 @@ struct StagingKey { if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) { if (m.type != type || m.param2 != val) { TraceEvent(SevError, "FastRestoreApplierStagingKeyMutationAtSameVersionUnhandled") - .detail("Version", newVersion) - .detail("NewMutation", m.toString()) - .detail("ExistingKeyType", typeString[type]) - .detail("ExitingKeyValue", val); + .detail("Version", newVersion) + .detail("NewMutation", m.toString()) + .detail("ExistingKeyType", typeString[type]) + .detail("ExitingKeyValue", val); } } } // else input mutation is old and can be ignored @@ -124,7 +124,7 @@ struct StagingKey { val = key; type = MutationRef::ClearRange; } // else no-op - } else if (isAtomicOp((MutationRef::Type) mutation.type)) { + } else if (isAtomicOp((MutationRef::Type)mutation.type)) { val = applyAtomicOp(val, mutation.param2, (MutationRef::Type)mutation.type); type = MutationRef::SetValue; // Precomputed result should be set to DB. } else if (mutation.type == MutationRef::SetValue || mutation.type == MutationRef::ClearRange) { @@ -225,12 +225,16 @@ struct ApplierBatchData : public ReferenceCounted { void addVersionStampedKV(MutationRef m, Version ver, uint16_t numVersionStampedKV) { if (m.type == MutationRef::SetVersionstampedKey) { // Assume transactionNumber = 0 does not affect result - TraceEvent(SevDebug, "FastRestoreApplierAddMutation").detail("MutationType", typeString[m.type]).detail("FakedTransactionNumber", numVersionStampedKV); + TraceEvent(SevDebug, "FastRestoreApplierAddMutation") + .detail("MutationType", typeString[m.type]) + .detail("FakedTransactionNumber", numVersionStampedKV); transformVersionstampMutation(m, &MutationRef::param1, ver, numVersionStampedKV); addMutation(m, ver); } else if (m.type == MutationRef::SetVersionstampedValue) { // Assume transactionNumber = 0 does not affect result - TraceEvent(SevDebug, "FastRestoreApplierAddMutation").detail("MutationType", typeString[m.type]).detail("FakedTransactionNumber", numVersionStampedKV); + TraceEvent(SevDebug, "FastRestoreApplierAddMutation") + .detail("MutationType", typeString[m.type]) + .detail("FakedTransactionNumber", numVersionStampedKV); transformVersionstampMutation(m, &MutationRef::param2, ver, numVersionStampedKV); addMutation(m, ver); } else { From 0d668ea0c319d10545d1029a3f97d588817d6975 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Thu, 13 Feb 2020 15:39:50 -0800 Subject: [PATCH 12/22] FastRestore:Applier:Add more trace for perf tracking --- fdbserver/Knobs.cpp | 2 +- fdbserver/RestoreApplier.actor.cpp | 27 ++++++++++++++++++--------- fdbserver/RestoreMaster.actor.cpp | 10 +++++----- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index 89f2d4bb6c..5f63e72ad5 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -553,7 +553,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula init( FASTRESTORE_ROLE_LOGGING_DELAY, 5 ); if( randomize ) { FASTRESTORE_ROLE_LOGGING_DELAY = deterministicRandom()->random01() * 60 + 1; } init( FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL, 5 ); if( randomize ) { FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL = deterministicRandom()->random01() * 60 + 1; } init( FASTRESTORE_ATOMICOP_WEIGHT, 100 ); if( randomize ) { FASTRESTORE_ATOMICOP_WEIGHT = deterministicRandom()->random01() * 200 + 1; } - init( FASTRESTORE_APPLYING_PARALLELISM, 100 ); if( randomize ) { FASTRESTORE_APPLYING_PARALLELISM = deterministicRandom()->random01() * 200 + 1; } + init( FASTRESTORE_APPLYING_PARALLELISM, 100 ); if( randomize ) { FASTRESTORE_APPLYING_PARALLELISM = deterministicRandom()->random01() * 10 + 1; } // clang-format on diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index 0fe9d1a866..db70a9d122 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -247,7 +247,8 @@ ACTOR static Future precomputeMutationsResult(Reference // Apply range mutations (i.e., clearRange) to database cx TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID) .detail("BatchIndex", batchIndex) - .detail("Step", "Applying clear range mutations to DB"); + .detail("Step", "Applying clear range mutations to DB") + .detail("ClearRanges", batchData->stagingKeyRanges.size()); state std::vector> fClearRanges; std::vector>> clearBuf; clearBuf.push_back(Standalone>()); @@ -271,7 +272,8 @@ ACTOR static Future precomputeMutationsResult(Reference // Apply range mutations (i.e., clearRange) to stagingKeyRanges TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID) .detail("BatchIndex", batchIndex) - .detail("Step", "Applying clear range mutations to staging keys"); + .detail("Step", "Applying clear range mutations to staging keys") + .detail("ClearRanges", batchData->stagingKeyRanges.size()); for (auto& rangeMutation : batchData->stagingKeyRanges) { std::map::iterator lb = batchData->stagingKeys.lower_bound(rangeMutation.mutation.param1); std::map::iterator ub = batchData->stagingKeys.upper_bound(rangeMutation.mutation.param2); @@ -284,7 +286,8 @@ ACTOR static Future precomputeMutationsResult(Reference wait(waitForAll(fClearRanges)); TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID) .detail("BatchIndex", batchIndex) - .detail("Step", "Getting and computing staging keys"); + .detail("Step", "Getting and computing staging keys") + .detail("StagingKeys", batchData->stagingKeys.size()); // Get keys in stagingKeys which does not have a baseline key by reading database cx, and precompute the key's value std::map::iterator> imcompleteStagingKeys; @@ -299,7 +302,8 @@ ACTOR static Future precomputeMutationsResult(Reference TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID) .detail("BatchIndex", batchIndex) - .detail("Step", "Compute the other staging keys"); + .detail("Step", "Compute the other staging keys") + .detail("StagingKeys", batchData->stagingKeys.size()); // Pre-compute pendingMutations to other keys in stagingKeys that has base value for (stagingKeyIter = batchData->stagingKeys.begin(); stagingKeyIter != batchData->stagingKeys.end(); stagingKeyIter++) { @@ -322,10 +326,11 @@ ACTOR static Future precomputeMutationsResult(Reference // Apply mutations in batchData->stagingKeys [begin, end). ACTOR static Future applyStagingKeysBatch(std::map::iterator begin, std::map::iterator end, Database cx, - FlowLock* applyStagingKeysBatchLock) { + FlowLock* applyStagingKeysBatchLock, UID applierID) { wait(applyStagingKeysBatchLock->take(TaskPriority::RestoreApplierWriteDB)); state FlowLock::Releaser releaser(*applyStagingKeysBatchLock); state Reference tr(new ReadYourWritesTransaction(cx)); + TraceEvent("FastRestoreApplierPhaseApplyStagingKeysBatch", applierID).detail("Begin", begin->first); loop { try { tr->reset(); @@ -358,20 +363,23 @@ ACTOR static Future applyStagingKeys(Reference batchData std::map::iterator cur = begin; double txnSize = 0; std::vector> fBatches; - TraceEvent("FastRestoreApplerPhaseApplyStagingKeys", applierID).detail("BatchIndex", batchIndex); + TraceEvent("FastRestoreApplerPhaseApplyStagingKeys", applierID).detail("BatchIndex", batchIndex).detail("StagingKeys", batchData->stagingKeys.size()); while (cur != batchData->stagingKeys.end()) { txnSize += cur->second.expectedMutationSize(); if (txnSize > SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) { - fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock)); + fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID)); begin = cur; + txnSize = 0; } cur++; } - fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock)); + if (begin != batchData->stagingKeys.end()) { + fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID)); + } wait(waitForAll(fBatches)); - TraceEvent("FastRestoreApplerPhaseApplyStagingKeysDone", applierID).detail("BatchIndex", batchIndex); + TraceEvent("FastRestoreApplerPhaseApplyStagingKeysDone", applierID).detail("BatchIndex", batchIndex).detail("StagingKeys", batchData->stagingKeys.size()); return Void(); } @@ -380,6 +388,7 @@ ACTOR Future applyToDBV2(UID applierID, int64_t batchIndex, Reference notifyApplierToApplyMutations(ReferencewhenAtLeast(batchIndex - 1)); - TraceEvent("FastRestoreMasterPhaseApplyToDBStart") + TraceEvent("FastRestoreMasterPhaseApplyToDB") .detail("BatchIndex", batchIndex) .detail("FinishedBatch", finishedBatch->get()); @@ -746,7 +746,7 @@ ACTOR static Future notifyApplierToApplyMutations(Reference> requests; - TraceEvent("FastRestoreMasterPhaseApplyMutations") + TraceEvent("FastRestoreMasterPhaseApplyToDB") .detail("BatchIndex", batchIndex) .detail("Appliers", appliersInterf.size()); for (auto& applier : appliersInterf) { @@ -762,7 +762,7 @@ ACTOR static Future notifyApplierToApplyMutations(ReferenceapplyToDB = getBatchReplies(&RestoreApplierInterface::applyToDB, appliersInterf, requests, &replies, TaskPriority::RestoreApplierWriteDB); } else { - TraceEvent(SevError, "FastRestoreNotifyApplierToApplierMutations") + TraceEvent(SevError, "FastRestoreMasterPhaseApplyToDB") .detail("BatchIndex", batchIndex) .detail("Attention", "Actor should not be invoked twice for the same batch index"); } @@ -775,7 +775,7 @@ ACTOR static Future notifyApplierToApplyMutations(ReferenceapplyStatus[reply.id] == RestoreApplyStatus::Applying) { batchStatus->applyStatus[reply.id] = RestoreApplyStatus::Applied; if (reply.isDuplicated) { - TraceEvent(SevWarn, "FastRestoreNotifyApplierToApplierMutations") + TraceEvent(SevWarn, "FastRestoreMasterPhaseApplyToDB") .detail("Applier", reply.id) .detail("DuplicateRequestReturnEarlier", "Apply db request should have been processed"); } @@ -783,7 +783,7 @@ ACTOR static Future notifyApplierToApplyMutations(ReferenceapplyStatus[applier.first] != RestoreApplyStatus::Applied) { - TraceEvent(SevError, "FastRestoreNotifyApplierToApplierMutations") + TraceEvent(SevError, "FastRestoreMasterPhaseApplyToDB") .detail("Applier", applier.first) .detail("ApplyStatus", batchStatus->applyStatus[applier.first]); } From 53f427c319225aaff08ad56fb991dd4d3c2d4913 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Thu, 13 Feb 2020 21:37:40 -0800 Subject: [PATCH 13/22] FastRestore:Applier:fix getAndComputeStagingKeys --- fdbserver/RestoreApplier.actor.cpp | 164 ++++++++++++++++++++++++----- 1 file changed, 135 insertions(+), 29 deletions(-) diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index db70a9d122..bbfddd9c2a 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -198,42 +198,140 @@ ACTOR static Future applyClearRangeMutations(Standalone getAndComputeStagingKeys( std::map::iterator> imcompleteStagingKeys, Database cx, UID applierID) { state Reference tr(new ReadYourWritesTransaction(cx)); - state std::vector>>> fKVs; - std::vector>> fValues; + //state std::vector>>> fKVs; + state std::vector>> fValues; TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID) .detail("GetKeys", imcompleteStagingKeys.size()); - try { - tr->reset(); - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr->setOption(FDBTransactionOptions::LOCK_AWARE); - for (auto& key : imcompleteStagingKeys) { - fKVs.push_back(std::make_pair(key.first, tr->get(key.first))); - fValues.push_back(fKVs.back().second); + loop { + try { + tr->reset(); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + for (auto& key : imcompleteStagingKeys) { + //fKVs.push_back(std::make_pair(key.first, tr->get(key.first))); + //fValues.push_back(fKVs.back().second); + fValues.push_back(tr->get(key.first)); + } + wait(waitForAll(fValues)); + break; + } catch (Error& e) { + TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") + .detail("GetKeys", imcompleteStagingKeys.size()) + .detail("Error", e.what()) + .detail("ErrorCode", e.code()); + wait(tr->onError(e)); + fValues.clear(); } - wait(waitForAll(fValues)); - } catch (Error& e) { - TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") - .detail("GetKeys", imcompleteStagingKeys.size()) - .detail("Error", e.what()) - .detail("ErrorCode", e.code()); - wait(tr->onError(e)); } - ASSERT(fKVs.size() == imcompleteStagingKeys.size()); + ASSERT(fValues.size() == imcompleteStagingKeys.size()); // TODO: Optimize the performance by reducing map lookup: making getKey's future a field in the input map - for (auto& kv : fKVs) { - if (!kv.second.get().present()) { - TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") - .detail("ValueNotPresent", kv.first); + int i = 0; + for(auto& key : imcompleteStagingKeys) { + if (!fValues[i].get().present()) { + TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("Key", key.first).detail("Reason", "Not found in DB") + .detail("PendingMutations", key.second->second.pendingMutations.size()).detail("StagingKeyType", (int) key.second->second.type); continue; - } else { - std::map::iterator iter = imcompleteStagingKeys[kv.first]; - // The key's version ideally should be the most recently committed version. - // But as long as it is > 1 and less than the start version of the version batch, it is the same result. - MutationRef m(MutationRef::SetValue, kv.first, kv.second.get().get()); - iter->second.add(m, (Version)1); - iter->second.precomputeResult(); } + // The key's version ideally should be the most recently committed version. + // But as long as it is > 1 and less than the start version of the version batch, it is the same result. + MutationRef m(MutationRef::SetValue, key.first, fValues[i].get().get()); + key.second->second.add(m, (Version)1); + key.second->second.precomputeResult(); + i++; + } + // for (auto& kv : fKVs) { + // if (!kv.second.get().present()) { + // TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") + // .detail("ValueNotPresent", kv.first); + // continue; + // } else { + // std::map::iterator iter = imcompleteStagingKeys[kv.first]; + // // The key's version ideally should be the most recently committed version. + // // But as long as it is > 1 and less than the start version of the version batch, it is the same result. + // MutationRef m(MutationRef::SetValue, kv.first, kv.second.get().get()); + // iter->second.add(m, (Version)1); + // iter->second.precomputeResult(); + // } + // } + + TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID) + .detail("GetKeys", imcompleteStagingKeys.size()); + + return Void(); +} + +// ACTOR static Future getAndComputeStagingKeysV2( +// std::map::iterator> imcompleteStagingKeys, Database cx, UID applierID) { +// state std::vector fRets; +// TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID) +// .detail("GetKeys", imcompleteStagingKeys.size()); + +// try { +// for (auto& key : imcompleteStagingKeys) { +// Reference tr(new ReadYourWritesTransaction(cx)); +// tr->reset(); +// tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); +// tr->setOption(FDBTransactionOptions::LOCK_AWARE); + +// fRets.push_back(map(tr->get(key.first), [](Optional const &val) -> Void() { +// if (val.present()) { +// // The key's version ideally should be the most recently committed version. +// // But as long as it is > 1 and less than the start version of the version batch, it is the same result. +// MutationRef m(MutationRef::SetValue, key.first, val.get()); +// key.second->second.add(m, (Version)1); +// key.second->second.precomputeResult(); +// } else { +// TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("Key", key.first).detail("Reason", "Not found in DB") +// .detail("PendingMutations", key.second->second.pendingMutations.size()).detail("StagingKeyType", (int) key.second->second.type); +// } +// }) ); +// } +// wait(waitForAll(fRets)); +// } catch (Error& e) { +// TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") +// .detail("GetKeys", imcompleteStagingKeys.size()) +// .detail("Error", e.what()) +// .detail("ErrorCode", e.code()); +// wait(tr->onError(e)); +// } + +// TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID) +// .detail("GetKeys", imcompleteStagingKeys.size()); + +// return Void(); +// } + + +ACTOR static Future getAndComputeStagingKeysV3( + std::map::iterator> imcompleteStagingKeys, Database cx, UID applierID) { + TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID) + .detail("GetKeys", imcompleteStagingKeys.size()); + + try { + for (auto& key : imcompleteStagingKeys) { + Reference tr(new ReadYourWritesTransaction(cx)); + tr->reset(); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + Optional val = wait(tr->get(key.first)); + if (val.present()) { + // The key's version ideally should be the most recently committed version. + // But as long as it is > 1 and less than the start version of the version batch, it is the same result. + MutationRef m(MutationRef::SetValue, key.first, val.get()); + key.second->second.add(m, (Version)1); + key.second->second.precomputeResult(); + } else { + TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("Key", key.first).detail("Reason", "Not found in DB") + .detail("PendingMutations", key.second->second.pendingMutations.size()).detail("StagingKeyType", (int) key.second->second.type); + } + } + } catch (Error& e) { + TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") + .detail("GetKeys", imcompleteStagingKeys.size()) + .detail("Error", e.what()) + .detail("ErrorCode", e.code()); + wait(tr->onError(e)); } TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID) @@ -298,7 +396,7 @@ ACTOR static Future precomputeMutationsResult(Reference } } - Future fGetAndComputeKeys = getAndComputeStagingKeys(imcompleteStagingKeys, cx, applierID); + Future fGetAndComputeKeys = getAndComputeStagingKeysV3(imcompleteStagingKeys, cx, applierID); TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID) .detail("BatchIndex", batchIndex) @@ -330,6 +428,8 @@ ACTOR static Future applyStagingKeysBatch(std::map::itera wait(applyStagingKeysBatchLock->take(TaskPriority::RestoreApplierWriteDB)); state FlowLock::Releaser releaser(*applyStagingKeysBatchLock); state Reference tr(new ReadYourWritesTransaction(cx)); + state int sets = 0; + state int clears = 0; TraceEvent("FastRestoreApplierPhaseApplyStagingKeysBatch", applierID).detail("Begin", begin->first); loop { try { @@ -340,13 +440,19 @@ ACTOR static Future applyStagingKeysBatch(std::map::itera while (iter != end) { if (iter->second.type == MutationRef::SetValue) { tr->set(iter->second.key, iter->second.val); + sets++; } else if (iter->second.type == MutationRef::ClearRange) { tr->clear(KeyRangeRef(iter->second.key, iter->second.val)); + clears++; } else { ASSERT(false); } iter++; + if (sets > 10000000 || clears > 10000000) { + TraceEvent(SevError, "FastRestoreApplierPhaseApplyStagingKeysBatchInfiniteLoop", applierID).detail("Begin", begin->first).detail("Sets", sets).detail("Clears", clears); + } } + TraceEvent("FastRestoreApplierPhaseApplyStagingKeysBatchPrecommit", applierID).detail("Begin", begin->first).detail("Sets", sets).detail("Clears", clears); wait(tr->commit()); break; } catch (Error& e) { From b57583a504907bd5194e9700023e9676e4f4bd04 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Thu, 13 Feb 2020 22:17:04 -0800 Subject: [PATCH 14/22] FastRestore:Applier:Handle multiple gets in parallel --- fdbserver/RestoreApplier.actor.cpp | 84 +++++++++++++++++------------- 1 file changed, 47 insertions(+), 37 deletions(-) diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index bbfddd9c2a..2070238d56 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -200,6 +200,8 @@ ACTOR static Future getAndComputeStagingKeys( state Reference tr(new ReadYourWritesTransaction(cx)); //state std::vector>>> fKVs; state std::vector>> fValues; + state std::vector> values; + state int i = 0; TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID) .detail("GetKeys", imcompleteStagingKeys.size()); loop { @@ -212,7 +214,10 @@ ACTOR static Future getAndComputeStagingKeys( //fValues.push_back(fKVs.back().second); fValues.push_back(tr->get(key.first)); } - wait(waitForAll(fValues)); + for(i = 0; i < fValues.size(); i++) { + Optional val = wait(fValues[i]); + values.push_back(val); + } break; } catch (Error& e) { TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") @@ -224,13 +229,18 @@ ACTOR static Future getAndComputeStagingKeys( } } - ASSERT(fValues.size() == imcompleteStagingKeys.size()); + ASSERT(values.size() == imcompleteStagingKeys.size()); // TODO: Optimize the performance by reducing map lookup: making getKey's future a field in the input map int i = 0; for(auto& key : imcompleteStagingKeys) { - if (!fValues[i].get().present()) { + if (!values[i].present()) { TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("Key", key.first).detail("Reason", "Not found in DB") .detail("PendingMutations", key.second->second.pendingMutations.size()).detail("StagingKeyType", (int) key.second->second.type); + for(auto& vm : key.second->second.pendingMutations) { + for(auto& m : vm.second) { + TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("PendingMutationVersion", vm.first).detail("PendingMutation", m.toString()); + } + } continue; } // The key's version ideally should be the most recently committed version. @@ -303,42 +313,42 @@ ACTOR static Future getAndComputeStagingKeys( // } -ACTOR static Future getAndComputeStagingKeysV3( - std::map::iterator> imcompleteStagingKeys, Database cx, UID applierID) { - TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID) - .detail("GetKeys", imcompleteStagingKeys.size()); +// ACTOR static Future getAndComputeStagingKeysV3( +// std::map::iterator> imcompleteStagingKeys, Database cx, UID applierID) { +// TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID) +// .detail("GetKeys", imcompleteStagingKeys.size()); - try { - for (auto& key : imcompleteStagingKeys) { - Reference tr(new ReadYourWritesTransaction(cx)); - tr->reset(); - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr->setOption(FDBTransactionOptions::LOCK_AWARE); - Optional val = wait(tr->get(key.first)); - if (val.present()) { - // The key's version ideally should be the most recently committed version. - // But as long as it is > 1 and less than the start version of the version batch, it is the same result. - MutationRef m(MutationRef::SetValue, key.first, val.get()); - key.second->second.add(m, (Version)1); - key.second->second.precomputeResult(); - } else { - TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("Key", key.first).detail("Reason", "Not found in DB") - .detail("PendingMutations", key.second->second.pendingMutations.size()).detail("StagingKeyType", (int) key.second->second.type); - } - } - } catch (Error& e) { - TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") - .detail("GetKeys", imcompleteStagingKeys.size()) - .detail("Error", e.what()) - .detail("ErrorCode", e.code()); - wait(tr->onError(e)); - } +// try { +// for (auto& key : imcompleteStagingKeys) { +// Reference tr(new ReadYourWritesTransaction(cx)); +// tr->reset(); +// tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); +// tr->setOption(FDBTransactionOptions::LOCK_AWARE); +// Optional val = wait(tr->get(key.first)); +// if (val.present()) { +// // The key's version ideally should be the most recently committed version. +// // But as long as it is > 1 and less than the start version of the version batch, it is the same result. +// MutationRef m(MutationRef::SetValue, key.first, val.get()); +// key.second->second.add(m, (Version)1); +// key.second->second.precomputeResult(); +// } else { +// TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("Key", key.first).detail("Reason", "Not found in DB") +// .detail("PendingMutations", key.second->second.pendingMutations.size()).detail("StagingKeyType", (int) key.second->second.type); +// } +// } +// } catch (Error& e) { +// TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") +// .detail("GetKeys", imcompleteStagingKeys.size()) +// .detail("Error", e.what()) +// .detail("ErrorCode", e.code()); +// wait(tr->onError(e)); +// } - TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID) - .detail("GetKeys", imcompleteStagingKeys.size()); +// TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID) +// .detail("GetKeys", imcompleteStagingKeys.size()); - return Void(); -} +// return Void(); +// } ACTOR static Future precomputeMutationsResult(Reference batchData, UID applierID, int64_t batchIndex, Database cx) { @@ -396,7 +406,7 @@ ACTOR static Future precomputeMutationsResult(Reference } } - Future fGetAndComputeKeys = getAndComputeStagingKeysV3(imcompleteStagingKeys, cx, applierID); + Future fGetAndComputeKeys = getAndComputeStagingKeys(imcompleteStagingKeys, cx, applierID); TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID) .detail("BatchIndex", batchIndex) From 3e2c19630abec38cee243396cf7b66c5be5f947f Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Thu, 13 Feb 2020 22:44:15 -0800 Subject: [PATCH 15/22] FastRestore:Applier:atomicOp can work on an empty key --- fdbserver/RestoreApplier.actor.cpp | 19 +++++++++++-------- fdbserver/RestoreApplier.actor.h | 6 +++++- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index 2070238d56..cfab2d2839 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -234,21 +234,24 @@ ACTOR static Future getAndComputeStagingKeys( int i = 0; for(auto& key : imcompleteStagingKeys) { if (!values[i].present()) { - TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("Key", key.first).detail("Reason", "Not found in DB") + TraceEvent(SevWarnAlways, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("Key", key.first).detail("Reason", "Not found in DB") .detail("PendingMutations", key.second->second.pendingMutations.size()).detail("StagingKeyType", (int) key.second->second.type); for(auto& vm : key.second->second.pendingMutations) { for(auto& m : vm.second) { - TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("PendingMutationVersion", vm.first).detail("PendingMutation", m.toString()); + TraceEvent(SevWarnAlways, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("PendingMutationVersion", vm.first).detail("PendingMutation", m.toString()); } } + key.second->second.precomputeResult(); + i++; continue; + } else { + // The key's version ideally should be the most recently committed version. + // But as long as it is > 1 and less than the start version of the version batch, it is the same result. + MutationRef m(MutationRef::SetValue, key.first, fValues[i].get().get()); + key.second->second.add(m, (Version)1); + key.second->second.precomputeResult(); + i++; } - // The key's version ideally should be the most recently committed version. - // But as long as it is > 1 and less than the start version of the version batch, it is the same result. - MutationRef m(MutationRef::SetValue, key.first, fValues[i].get().get()); - key.second->second.add(m, (Version)1); - key.second->second.precomputeResult(); - i++; } // for (auto& kv : fKVs) { // if (!kv.second.get().present()) { diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index a5ba90b3d9..a6e4177c67 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -125,7 +125,11 @@ struct StagingKey { type = MutationRef::ClearRange; } // else no-op } else if (isAtomicOp((MutationRef::Type)mutation.type)) { - val = applyAtomicOp(val, mutation.param2, (MutationRef::Type)mutation.type); + Optional inputVal; + if (hasBaseValue()) { + inputVal = val; + } + val = applyAtomicOp(inputVal, mutation.param2, (MutationRef::Type)mutation.type); type = MutationRef::SetValue; // Precomputed result should be set to DB. } else if (mutation.type == MutationRef::SetValue || mutation.type == MutationRef::ClearRange) { type = MutationRef::SetValue; // Precomputed result should be set to DB. From c34a69df32d015997e80d2544202455cc0b3a288 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Thu, 13 Feb 2020 23:06:44 -0800 Subject: [PATCH 16/22] FastRestore:Applier:Remove unused func --- fdbserver/RestoreApplier.actor.cpp | 109 +++-------------------------- 1 file changed, 11 insertions(+), 98 deletions(-) diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index cfab2d2839..d07a0d06f2 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -214,7 +214,7 @@ ACTOR static Future getAndComputeStagingKeys( //fValues.push_back(fKVs.back().second); fValues.push_back(tr->get(key.first)); } - for(i = 0; i < fValues.size(); i++) { + for (i = 0; i < fValues.size(); i++) { Optional val = wait(fValues[i]); values.push_back(val); } @@ -234,11 +234,16 @@ ACTOR static Future getAndComputeStagingKeys( int i = 0; for(auto& key : imcompleteStagingKeys) { if (!values[i].present()) { - TraceEvent(SevWarnAlways, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("Key", key.first).detail("Reason", "Not found in DB") - .detail("PendingMutations", key.second->second.pendingMutations.size()).detail("StagingKeyType", (int) key.second->second.type); - for(auto& vm : key.second->second.pendingMutations) { - for(auto& m : vm.second) { - TraceEvent(SevWarnAlways, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("PendingMutationVersion", vm.first).detail("PendingMutation", m.toString()); + TraceEvent(SevWarnAlways, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") + .detail("Key", key.first) + .detail("Reason", "Not found in DB") + .detail("PendingMutations", key.second->second.pendingMutations.size()) + .detail("StagingKeyType", (int)key.second->second.type); + for (auto& vm : key.second->second.pendingMutations) { + for (auto& m : vm.second) { + TraceEvent(SevWarnAlways, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") + .detail("PendingMutationVersion", vm.first) + .detail("PendingMutation", m.toString()); } } key.second->second.precomputeResult(); @@ -253,20 +258,6 @@ ACTOR static Future getAndComputeStagingKeys( i++; } } - // for (auto& kv : fKVs) { - // if (!kv.second.get().present()) { - // TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") - // .detail("ValueNotPresent", kv.first); - // continue; - // } else { - // std::map::iterator iter = imcompleteStagingKeys[kv.first]; - // // The key's version ideally should be the most recently committed version. - // // But as long as it is > 1 and less than the start version of the version batch, it is the same result. - // MutationRef m(MutationRef::SetValue, kv.first, kv.second.get().get()); - // iter->second.add(m, (Version)1); - // iter->second.precomputeResult(); - // } - // } TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID) .detail("GetKeys", imcompleteStagingKeys.size()); @@ -274,84 +265,6 @@ ACTOR static Future getAndComputeStagingKeys( return Void(); } -// ACTOR static Future getAndComputeStagingKeysV2( -// std::map::iterator> imcompleteStagingKeys, Database cx, UID applierID) { -// state std::vector fRets; -// TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID) -// .detail("GetKeys", imcompleteStagingKeys.size()); - -// try { -// for (auto& key : imcompleteStagingKeys) { -// Reference tr(new ReadYourWritesTransaction(cx)); -// tr->reset(); -// tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); -// tr->setOption(FDBTransactionOptions::LOCK_AWARE); - -// fRets.push_back(map(tr->get(key.first), [](Optional const &val) -> Void() { -// if (val.present()) { -// // The key's version ideally should be the most recently committed version. -// // But as long as it is > 1 and less than the start version of the version batch, it is the same result. -// MutationRef m(MutationRef::SetValue, key.first, val.get()); -// key.second->second.add(m, (Version)1); -// key.second->second.precomputeResult(); -// } else { -// TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("Key", key.first).detail("Reason", "Not found in DB") -// .detail("PendingMutations", key.second->second.pendingMutations.size()).detail("StagingKeyType", (int) key.second->second.type); -// } -// }) ); -// } -// wait(waitForAll(fRets)); -// } catch (Error& e) { -// TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") -// .detail("GetKeys", imcompleteStagingKeys.size()) -// .detail("Error", e.what()) -// .detail("ErrorCode", e.code()); -// wait(tr->onError(e)); -// } - -// TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID) -// .detail("GetKeys", imcompleteStagingKeys.size()); - -// return Void(); -// } - - -// ACTOR static Future getAndComputeStagingKeysV3( -// std::map::iterator> imcompleteStagingKeys, Database cx, UID applierID) { -// TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID) -// .detail("GetKeys", imcompleteStagingKeys.size()); - -// try { -// for (auto& key : imcompleteStagingKeys) { -// Reference tr(new ReadYourWritesTransaction(cx)); -// tr->reset(); -// tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); -// tr->setOption(FDBTransactionOptions::LOCK_AWARE); -// Optional val = wait(tr->get(key.first)); -// if (val.present()) { -// // The key's version ideally should be the most recently committed version. -// // But as long as it is > 1 and less than the start version of the version batch, it is the same result. -// MutationRef m(MutationRef::SetValue, key.first, val.get()); -// key.second->second.add(m, (Version)1); -// key.second->second.precomputeResult(); -// } else { -// TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("Key", key.first).detail("Reason", "Not found in DB") -// .detail("PendingMutations", key.second->second.pendingMutations.size()).detail("StagingKeyType", (int) key.second->second.type); -// } -// } -// } catch (Error& e) { -// TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") -// .detail("GetKeys", imcompleteStagingKeys.size()) -// .detail("Error", e.what()) -// .detail("ErrorCode", e.code()); -// wait(tr->onError(e)); -// } - -// TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID) -// .detail("GetKeys", imcompleteStagingKeys.size()); - -// return Void(); -// } ACTOR static Future precomputeMutationsResult(Reference batchData, UID applierID, int64_t batchIndex, Database cx) { From fe75a4cafb49cf1ea319d0a4b4e801a077c1c1cc Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Wed, 19 Feb 2020 15:22:52 -0800 Subject: [PATCH 17/22] FastRestore:Apply clang-format --- fdbserver/RestoreApplier.actor.cpp | 47 ++++++++++++++++++------------ 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index 6c942fa153..8616c9bf28 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -37,7 +37,7 @@ ACTOR static Future handleSendMutationVectorRequest(RestoreSendVersionedMutationsRequest req, Reference self); ACTOR static Future handleSendMutationVectorRequestV2(RestoreSendVersionedMutationsRequest req, - Reference self); + Reference self); ACTOR static Future handleApplyToDBRequest(RestoreVersionBatchRequest req, Reference self, Database cx); @@ -104,7 +104,7 @@ ACTOR Future restoreApplierCore(RestoreApplierInterface applierInterf, int // Multiple such actors can run on different fileIDs, because mutations in different files belong to different versions; // Only one actor can process mutations from the same file ACTOR static Future handleSendMutationVectorRequestV2(RestoreSendVersionedMutationsRequest req, - Reference self) { + Reference self) { state Reference batchData = self->batch[req.batchIndex]; // Assume: processedFileState[req.asset] will not be erased while the actor is active. // Note: Insert new items into processedFileState will not invalidate the reference. @@ -142,7 +142,7 @@ ACTOR static Future handleSendMutationVectorRequestV2(RestoreSendVersioned batchData->counters.receivedBytes += mutation.totalSize(); batchData->counters.receivedWeightedBytes += mutation.weightedTotalSize(); // atomicOp will be amplified batchData->counters.receivedMutations += 1; - batchData->counters.receivedAtomicOps += isAtomicOp((MutationRef::Type) mutation.type) ? 1 : 0; + batchData->counters.receivedAtomicOps += isAtomicOp((MutationRef::Type)mutation.type) ? 1 : 0; // Sanity check if (g_network->isSimulated()) { if (isRangeMutation(mutation)) { @@ -197,7 +197,7 @@ ACTOR static Future applyClearRangeMutations(Standalone getAndComputeStagingKeys( std::map::iterator> imcompleteStagingKeys, Database cx, UID applierID) { state Reference tr(new ReadYourWritesTransaction(cx)); - //state std::vector>>> fKVs; + // state std::vector>>> fKVs; state std::vector>> fValues; state std::vector> values; state int i = 0; @@ -209,8 +209,8 @@ ACTOR static Future getAndComputeStagingKeys( tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); for (auto& key : imcompleteStagingKeys) { - //fKVs.push_back(std::make_pair(key.first, tr->get(key.first))); - //fValues.push_back(fKVs.back().second); + // fKVs.push_back(std::make_pair(key.first, tr->get(key.first))); + // fValues.push_back(fKVs.back().second); fValues.push_back(tr->get(key.first)); } for (i = 0; i < fValues.size(); i++) { @@ -220,9 +220,9 @@ ACTOR static Future getAndComputeStagingKeys( break; } catch (Error& e) { TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") - .detail("GetKeys", imcompleteStagingKeys.size()) - .detail("Error", e.what()) - .detail("ErrorCode", e.code()); + .detail("GetKeys", imcompleteStagingKeys.size()) + .detail("Error", e.what()) + .detail("ErrorCode", e.code()); wait(tr->onError(e)); fValues.clear(); } @@ -231,7 +231,7 @@ ACTOR static Future getAndComputeStagingKeys( ASSERT(values.size() == imcompleteStagingKeys.size()); // TODO: Optimize the performance by reducing map lookup: making getKey's future a field in the input map int i = 0; - for(auto& key : imcompleteStagingKeys) { + for (auto& key : imcompleteStagingKeys) { if (!values[i].present()) { TraceEvent(SevWarnAlways, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") .detail("Key", key.first) @@ -264,14 +264,13 @@ ACTOR static Future getAndComputeStagingKeys( return Void(); } - ACTOR static Future precomputeMutationsResult(Reference batchData, UID applierID, int64_t batchIndex, Database cx) { // Apply range mutations (i.e., clearRange) to database cx TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID) .detail("BatchIndex", batchIndex) .detail("Step", "Applying clear range mutations to DB") - .detail("ClearRanges", batchData->stagingKeyRanges.size()); + .detail("ClearRanges", batchData->stagingKeyRanges.size()); state std::vector> fClearRanges; std::vector>> clearBuf; clearBuf.push_back(Standalone>()); @@ -296,7 +295,7 @@ ACTOR static Future precomputeMutationsResult(Reference TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID) .detail("BatchIndex", batchIndex) .detail("Step", "Applying clear range mutations to staging keys") - .detail("ClearRanges", batchData->stagingKeyRanges.size()); + .detail("ClearRanges", batchData->stagingKeyRanges.size()); for (auto& rangeMutation : batchData->stagingKeyRanges) { std::map::iterator lb = batchData->stagingKeys.lower_bound(rangeMutation.mutation.param1); std::map::iterator ub = batchData->stagingKeys.upper_bound(rangeMutation.mutation.param2); @@ -310,7 +309,7 @@ ACTOR static Future precomputeMutationsResult(Reference TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID) .detail("BatchIndex", batchIndex) .detail("Step", "Getting and computing staging keys") - .detail("StagingKeys", batchData->stagingKeys.size()); + .detail("StagingKeys", batchData->stagingKeys.size()); // Get keys in stagingKeys which does not have a baseline key by reading database cx, and precompute the key's value std::map::iterator> imcompleteStagingKeys; @@ -326,7 +325,7 @@ ACTOR static Future precomputeMutationsResult(Reference TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID) .detail("BatchIndex", batchIndex) .detail("Step", "Compute the other staging keys") - .detail("StagingKeys", batchData->stagingKeys.size()); + .detail("StagingKeys", batchData->stagingKeys.size()); // Pre-compute pendingMutations to other keys in stagingKeys that has base value for (stagingKeyIter = batchData->stagingKeys.begin(); stagingKeyIter != batchData->stagingKeys.end(); stagingKeyIter++) { @@ -374,10 +373,16 @@ ACTOR static Future applyStagingKeysBatch(std::map::itera } iter++; if (sets > 10000000 || clears > 10000000) { - TraceEvent(SevError, "FastRestoreApplierPhaseApplyStagingKeysBatchInfiniteLoop", applierID).detail("Begin", begin->first).detail("Sets", sets).detail("Clears", clears); + TraceEvent(SevError, "FastRestoreApplierPhaseApplyStagingKeysBatchInfiniteLoop", applierID) + .detail("Begin", begin->first) + .detail("Sets", sets) + .detail("Clears", clears); } } - TraceEvent("FastRestoreApplierPhaseApplyStagingKeysBatchPrecommit", applierID).detail("Begin", begin->first).detail("Sets", sets).detail("Clears", clears); + TraceEvent("FastRestoreApplierPhaseApplyStagingKeysBatchPrecommit", applierID) + .detail("Begin", begin->first) + .detail("Sets", sets) + .detail("Clears", clears); wait(tr->commit()); break; } catch (Error& e) { @@ -394,7 +399,9 @@ ACTOR static Future applyStagingKeys(Reference batchData std::map::iterator cur = begin; double txnSize = 0; std::vector> fBatches; - TraceEvent("FastRestoreApplerPhaseApplyStagingKeys", applierID).detail("BatchIndex", batchIndex).detail("StagingKeys", batchData->stagingKeys.size()); + TraceEvent("FastRestoreApplerPhaseApplyStagingKeys", applierID) + .detail("BatchIndex", batchIndex) + .detail("StagingKeys", batchData->stagingKeys.size()); while (cur != batchData->stagingKeys.end()) { txnSize += cur->second.expectedMutationSize(); if (txnSize > SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) { @@ -410,7 +417,9 @@ ACTOR static Future applyStagingKeys(Reference batchData wait(waitForAll(fBatches)); - TraceEvent("FastRestoreApplerPhaseApplyStagingKeysDone", applierID).detail("BatchIndex", batchIndex).detail("StagingKeys", batchData->stagingKeys.size()); + TraceEvent("FastRestoreApplerPhaseApplyStagingKeysDone", applierID) + .detail("BatchIndex", batchIndex) + .detail("StagingKeys", batchData->stagingKeys.size()); return Void(); } From e4258d73f5b015a8a19e07b7b4a285a637d9d054 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Wed, 19 Feb 2020 15:27:59 -0800 Subject: [PATCH 18/22] FastRestore:Applier:Remove applying actors that do not have good perf --- fdbserver/Knobs.cpp | 2 +- fdbserver/RestoreApplier.actor.cpp | 370 ----------------------------- 2 files changed, 1 insertion(+), 371 deletions(-) diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index 5b8e69523c..5382f37c51 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -552,7 +552,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula init( FASTRESTORE_VB_LAUNCH_DELAY, 5 ); if( randomize ) { FASTRESTORE_VB_LAUNCH_DELAY = deterministicRandom()->random01() * 60 + 1; } init( FASTRESTORE_ROLE_LOGGING_DELAY, 5 ); if( randomize ) { FASTRESTORE_ROLE_LOGGING_DELAY = deterministicRandom()->random01() * 60 + 1; } init( FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL, 5 ); if( randomize ) { FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL = deterministicRandom()->random01() * 60 + 1; } - init( FASTRESTORE_ATOMICOP_WEIGHT, 100 ); if( randomize ) { FASTRESTORE_ATOMICOP_WEIGHT = deterministicRandom()->random01() * 200 + 1; } + init( FASTRESTORE_ATOMICOP_WEIGHT, 100 ); if( randomize ) { FASTRESTORE_ATOMICOP_WEIGHT = deterministicRandom()->random01() * 200 + 1; } init( FASTRESTORE_APPLYING_PARALLELISM, 100 ); if( randomize ) { FASTRESTORE_APPLYING_PARALLELISM = deterministicRandom()->random01() * 10 + 1; } init( FASTRESTORE_MONITOR_LEADER_DELAY, 5 ); if( randomize ) { FASTRESTORE_MONITOR_LEADER_DELAY = deterministicRandom()->random01() * 100; } diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index 8616c9bf28..9522877b20 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -34,8 +34,6 @@ #include "flow/actorcompiler.h" // This must be the last #include. -ACTOR static Future handleSendMutationVectorRequest(RestoreSendVersionedMutationsRequest req, - Reference self); ACTOR static Future handleSendMutationVectorRequestV2(RestoreSendVersionedMutationsRequest req, Reference self); ACTOR static Future handleApplyToDBRequest(RestoreVersionBatchRequest req, Reference self, @@ -433,373 +431,6 @@ ACTOR Future applyToDBV2(UID applierID, int64_t batchIndex, Reference handleSendMutationVectorRequest(RestoreSendVersionedMutationsRequest req, - Reference self) { - state Reference batchData = self->batch[req.batchIndex]; - // Assume: processedFileState[req.asset] will not be erased while the actor is active. - // Note: Insert new items into processedFileState will not invalidate the reference. - state NotifiedVersion& curFilePos = batchData->processedFileState[req.asset]; - - TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations", self->id()) - .detail("BatchIndex", req.batchIndex) - .detail("RestoreAsset", req.asset.toString()) - .detail("ProcessedFileVersion", curFilePos.get()) - .detail("Request", req.toString()); - - wait(curFilePos.whenAtLeast(req.prevVersion)); - - state bool isDuplicated = true; - if (curFilePos.get() == req.prevVersion) { - isDuplicated = false; - Version commitVersion = req.version; - MutationsVec mutations(req.mutations); - // Sanity check: mutations in range file is in [beginVersion, endVersion); - // mutations in log file is in [beginVersion, endVersion], both inclusive. - ASSERT_WE_THINK(commitVersion >= req.asset.beginVersion); - // Loader sends the endVersion to ensure all useful versions are sent - ASSERT_WE_THINK((req.isRangeFile && commitVersion <= req.asset.endVersion) || - (!req.isRangeFile && commitVersion <= req.asset.endVersion)); - - if (batchData->kvOps.find(commitVersion) == batchData->kvOps.end()) { - batchData->kvOps.insert(std::make_pair(commitVersion, MutationsVec())); - } - for (int mIndex = 0; mIndex < mutations.size(); mIndex++) { - MutationRef mutation = mutations[mIndex]; - TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations") - .detail("ApplierNode", self->id()) - .detail("RestoreAsset", req.asset.toString()) - .detail("Version", commitVersion) - .detail("Index", mIndex) - .detail("MutationReceived", mutation.toString()); - batchData->counters.receivedBytes += mutation.totalSize(); - batchData->counters.receivedWeightedBytes += mutation.weightedTotalSize(); // atomicOp will be amplified - batchData->counters.receivedMutations += 1; - batchData->counters.receivedAtomicOps += isAtomicOp((MutationRef::Type)mutation.type) ? 1 : 0; - // Sanity check - if (g_network->isSimulated()) { - if (isRangeMutation(mutation)) { - ASSERT(mutation.param1 >= req.asset.range.begin && - mutation.param2 <= req.asset.range.end); // Range mutation's right side is exclusive - } else { - ASSERT(mutation.param1 >= req.asset.range.begin && mutation.param1 < req.asset.range.end); - } - } - batchData->kvOps[commitVersion].push_back_deep(batchData->kvOps[commitVersion].arena(), mutation); - // TODO: What if log file's mutations are delivered out-of-order (behind) the range file's mutations?! - } - curFilePos.set(req.version); - } - - req.reply.send(RestoreCommonReply(self->id(), isDuplicated)); - return Void(); -} - -// Progress and checkpoint for applying (atomic) mutations in transactions to DB -struct DBApplyProgress { - // Mutation state in the current uncommitted transaction - VersionedMutationsMap::iterator curItInCurTxn; - int curIndexInCurTxn; - - // Save the starting point for current txn to handle (commit_unknown_result) error in txn commit - // startItInUncommittedTxn is starting iterator in the most recent uncommitted (and failed) txn - // startIndexInUncommittedTxn is start index in the most recent uncommitted (and failed) txn. - // Note: Txns have different number of mutations - VersionedMutationsMap::iterator startItInUncommittedTxn; - int startIndexInUncommittedTxn; - - // State to decide if a txn succeeds or not when txn error (commit_unknown_result) happens; - // curTxnId: The id of the current uncommitted txn, which monotonically increase for each successful transaction - // uncommittedTxnId: The id of the most recent succeeded txn. Used to recover the failed txn id in retry - // lastTxnHasError: Does the last txn has error. TODO: Only need to handle txn_commit_unknown error - Version curTxnId; - Version uncommittedTxnId; - bool lastTxnHasError; - - // Decide when to commit a transaction. We buffer enough mutations in a txn before commit the txn - bool startNextVersion; // The next txn will include mutations in next version - int numAtomicOps; // Status counter - double txnBytes; // Decide when to commit a txn - double txnMutations; // Status counter - - Reference batchData; - UID applierId; - - DBApplyProgress() = default; - explicit DBApplyProgress(UID applierId, Reference batchData) - : applierId(applierId), batchData(batchData), curIndexInCurTxn(0), startIndexInUncommittedTxn(0), curTxnId(0), - uncommittedTxnId(0), lastTxnHasError(false), startNextVersion(false), numAtomicOps(0), txnBytes(0), - txnMutations(0) { - curItInCurTxn = batchData->kvOps.begin(); - while (curItInCurTxn != batchData->kvOps.end() && curItInCurTxn->second.empty()) { - curItInCurTxn++; - } - startItInUncommittedTxn = curItInCurTxn; - } - - // Has all mutations been committed? - bool isDone() { return curItInCurTxn == batchData->kvOps.end(); } - - // Set cursor for next mutation - void nextMutation() { - curIndexInCurTxn++; - while (curItInCurTxn != batchData->kvOps.end() && curIndexInCurTxn >= curItInCurTxn->second.size()) { - curIndexInCurTxn = 0; - curItInCurTxn++; - startNextVersion = true; - } - } - - // Setup for the next transaction; This should be done after nextMutation() - void nextTxn() { - txnBytes = 0; - txnMutations = 0; - numAtomicOps = 0; - lastTxnHasError = false; - startNextVersion = false; - - curTxnId++; - - startIndexInUncommittedTxn = curIndexInCurTxn; - startItInUncommittedTxn = curItInCurTxn; - uncommittedTxnId = curTxnId; - } - - // Rollback to the starting point of the uncommitted-and-failed transaction to - // re-execute uncommitted txn - void rollback() { - TraceEvent(SevWarn, "FastRestoreApplyTxnError") - .detail("TxnStatusFailed", curTxnId) - .detail("ApplierApplyToDB", applierId) - .detail("UncommittedTxnId", uncommittedTxnId) - .detail("CurIteratorVersion", curItInCurTxn->first) - .detail("StartIteratorVersionInUncommittedTxn", startItInUncommittedTxn->first) - .detail("CurrentIndexInFailedTxn", curIndexInCurTxn) - .detail("StartIndexInUncommittedTxn", startIndexInUncommittedTxn) - .detail("NumIncludedAtomicOps", numAtomicOps); - curItInCurTxn = startItInUncommittedTxn; - curIndexInCurTxn = startIndexInUncommittedTxn; - curTxnId = uncommittedTxnId; - - numAtomicOps = 0; - txnBytes = 0; - txnMutations = 0; - startNextVersion = false; - lastTxnHasError = false; - } - - bool shouldCommit() { - return (!lastTxnHasError && (startNextVersion || txnBytes >= SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES || - curItInCurTxn == batchData->kvOps.end())); - } - - bool hasError() { return lastTxnHasError; } - - void setTxnError(Error& e) { - TraceEvent(SevWarnAlways, "FastRestoreApplyTxnError") - .detail("TxnStatus", "?") - .detail("ApplierApplyToDB", applierId) - .detail("TxnId", curTxnId) - .detail("StartIndexInCurrentTxn", curIndexInCurTxn) - .detail("Version", curItInCurTxn->first) - .error(e, true); - lastTxnHasError = true; - } - - MutationRef getCurrentMutation() { - ASSERT_WE_THINK(curIndexInCurTxn < curItInCurTxn->second.size()); - return curItInCurTxn->second[curIndexInCurTxn]; - } -}; - -ACTOR Future applyToDB(UID applierID, int64_t batchIndex, Reference batchData, Database cx) { - // state variables must be defined at the start of actor to be initialized in the actor constructor - state std::string typeStr = ""; - state Reference tr(new ReadYourWritesTransaction(cx)); - state DBApplyProgress progress(applierID, batchData); - - TraceEvent("FastRestoreApplerPhaseApplyTxn", applierID) - .detail("BatchIndex", batchIndex) - .detail("FromVersion", batchData->kvOps.empty() ? -1 : batchData->kvOps.begin()->first) - .detail("EndVersion", batchData->kvOps.empty() ? -1 : batchData->kvOps.rbegin()->first); - - // Assume the process will not crash when it apply mutations to DB. The reply message can be lost though - if (batchData->kvOps.empty()) { - TraceEvent("FastRestoreApplerPhaseApplyTxnDone", applierID) - .detail("BatchIndex", batchIndex) - .detail("Reason", "NoMutationAtVersions"); - return Void(); - } - - batchData->sanityCheckMutationOps(); - - if (progress.isDone()) { - TraceEvent("FastRestoreApplerPhaseApplyTxnDone", applierID) - .detail("BatchIndex", batchIndex) - .detail("Reason", "NoMutationAtVersions"); - return Void(); - } - - // Sanity check the restoreApplierKeys, which should be empty at this point - loop { - try { - tr->reset(); - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr->setOption(FDBTransactionOptions::LOCK_AWARE); - Key begin = restoreApplierKeyFor(applierID, batchIndex, 0); - Key end = restoreApplierKeyFor(applierID, batchIndex, std::numeric_limits::max()); - Standalone txnIds = wait(tr->getRange(KeyRangeRef(begin, end), CLIENT_KNOBS->TOO_MANY)); - if (txnIds.size() > 0) { - TraceEvent(SevError, "FastRestoreApplyTxnStateNotClean").detail("TxnIds", txnIds.size()); - for (auto& kv : txnIds) { - UID id; - int64_t index; - Version txnId; - std::tie(id, index, txnId) = decodeRestoreApplierKey(kv.key); - TraceEvent(SevError, "FastRestoreApplyTxnStateNotClean") - .detail("Applier", id) - .detail("BatchIndex", index) - .detail("ResidueTxnID", txnId); - } - } - break; - } catch (Error& e) { - wait(tr->onError(e)); - } - } - - loop { // Transaction retry loop - try { - // Check if the transaction succeeds - if (progress.hasError()) { - tr->reset(); - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr->setOption(FDBTransactionOptions::LOCK_AWARE); - Optional txnSucceeded = - wait(tr->get(restoreApplierKeyFor(applierID, batchIndex, progress.curTxnId))); - if (!txnSucceeded.present()) { - progress.rollback(); - continue; - } else { - TraceEvent(SevWarn, "FastRestoreApplyTxnError") - .detail("TxnStatusSucceeded", progress.curTxnId) - .detail("ApplierApplyToDB", applierID) - .detail("CurIteratorVersion", progress.curItInCurTxn->first) - .detail("CurrentIteratorMutations", progress.curItInCurTxn->second.size()) - .detail("CurrentIndexInSucceedTxn", progress.curIndexInCurTxn) - .detail("NumIncludedAtomicOps", progress.numAtomicOps); - // Txn succeeded and exectue the same logic when txn succeeds - } - } else { // !lastTxnHasError: accumulate mutations in a txn - tr->reset(); - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr->setOption(FDBTransactionOptions::LOCK_AWARE); - TraceEvent(SevFRMutationInfo, "FastRestore_ApplierTxn") - .detail("ApplierApplyToDB", applierID) - .detail("TxnId", progress.curTxnId) - .detail("CurrentIndexInCurrentTxn", progress.curIndexInCurTxn) - .detail("CurrentIteratorMutations", progress.curItInCurTxn->second.size()) - .detail("Version", progress.curItInCurTxn->first); - - // restoreApplierKeyFor(self->id(), curTxnId) to tell if txn succeeds at an unknown error - tr->set(restoreApplierKeyFor(applierID, batchIndex, progress.curTxnId), restoreApplierTxnValue); - - while (1) { // Loop: Accumulate mutations in a transaction - MutationRef m = progress.getCurrentMutation(); - - if (m.type >= MutationRef::Type::SetValue && m.type <= MutationRef::Type::MAX_ATOMIC_OP) { - typeStr = typeString[m.type]; - } else { - TraceEvent(SevError, "FastRestore").detail("InvalidMutationType", m.type); - } - - TraceEvent(SevFRMutationInfo, "FastRestore") - .detail("ApplierApplyToDB", applierID) - .detail("Version", progress.curItInCurTxn->first) - .detail("Index", progress.curIndexInCurTxn) - .detail("Mutation", m.toString()) - .detail("MutationSize", m.totalSize()) - .detail("TxnSize", progress.txnBytes); - if (m.type == MutationRef::SetValue) { - tr->set(m.param1, m.param2); - } else if (m.type == MutationRef::ClearRange) { - KeyRangeRef mutationRange(m.param1, m.param2); - tr->clear(mutationRange); - } else if (isAtomicOp((MutationRef::Type)m.type)) { - tr->atomicOp(m.param1, m.param2, m.type); - progress.numAtomicOps++; - } else { - TraceEvent(SevError, "FastRestore") - .detail("UnhandledMutationType", m.type) - .detail("TypeName", typeStr); - } - - progress.txnBytes += m.totalSize(); // Changed expectedSize to totalSize - progress.txnMutations += 1; - - progress.nextMutation(); // Prepare for the next mutation - // commit per FASTRESTORE_TXN_BATCH_MAX_BYTES bytes; and commit does not cross version boundary - if (progress.shouldCommit()) { - break; // Got enough mutation in the txn - } - } - } // !lastTxnHasError - - // Commit the txn and prepare the starting point for next txn - if (progress.shouldCommit()) { - wait(tr->commit()); - // Update status counter appliedWeightedBytes, appliedMutations, atomicOps - batchData->counters.appliedWeightedBytes += progress.txnBytes; - batchData->counters.appliedMutations += progress.txnMutations; - batchData->counters.appliedAtomicOps += progress.numAtomicOps; - batchData->counters.appliedTxns += 1; - } - - if (progress.isDone()) { // Are all mutations processed? - break; - } - progress.nextTxn(); - } catch (Error& e) { - TraceEvent(SevWarnAlways, "FastRestoreApplyTxnError") - .detail("TxnStatus", "?") - .detail("ApplierApplyToDB", applierID) - .detail("TxnId", progress.curTxnId) - .detail("CurrentIndexInCurrentTxn", progress.curIndexInCurTxn) - .detail("Version", progress.curItInCurTxn->first) - .error(e, true); - progress.lastTxnHasError = true; - wait(tr->onError(e)); - } - } - - TraceEvent("FastRestoreApplerPhaseApplyTxn", applierID) - .detail("BatchIndex", batchIndex) - .detail("CleanupCurTxnIds", progress.curTxnId); - // clean up txn ids - loop { - try { - tr->reset(); - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr->setOption(FDBTransactionOptions::LOCK_AWARE); - // Clear txnIds in [0, progress.curTxnId). We add 100 to curTxnId just to be safe. - tr->clear(KeyRangeRef(restoreApplierKeyFor(applierID, batchIndex, 0), - restoreApplierKeyFor(applierID, batchIndex, progress.curTxnId + 100))); - wait(tr->commit()); - break; - } catch (Error& e) { - wait(tr->onError(e)); - } - } - // House cleaning - batchData->kvOps.clear(); - TraceEvent("FastRestoreApplerPhaseApplyTxnDone", applierID).detail("BatchIndex", batchIndex); - - return Void(); -} - ACTOR static Future handleApplyToDBRequest(RestoreVersionBatchRequest req, Reference self, Database cx) { // Ensure batch (i-1) is applied before batch i @@ -816,7 +447,6 @@ ACTOR static Future handleApplyToDBRequest(RestoreVersionBatchRequest req, if (!batchData->dbApplier.present()) { isDuplicated = false; batchData->dbApplier = Never(); - // batchData->dbApplier = applyToDB(self->id(), req.batchIndex, batchData, cx); batchData->dbApplier = applyToDBV2(self->id(), req.batchIndex, batchData, cx); } From 7897b1658ffc1c29860a06775958fd85ae9a195e Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Wed, 19 Feb 2020 15:29:14 -0800 Subject: [PATCH 19/22] FastRestore:Applier:Rename new apply actor names --- fdbserver/RestoreApplier.actor.cpp | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index 9522877b20..407259a526 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -34,8 +34,8 @@ #include "flow/actorcompiler.h" // This must be the last #include. -ACTOR static Future handleSendMutationVectorRequestV2(RestoreSendVersionedMutationsRequest req, - Reference self); +ACTOR static Future handleSendMutationVectorRequest(RestoreSendVersionedMutationsRequest req, + Reference self); ACTOR static Future handleApplyToDBRequest(RestoreVersionBatchRequest req, Reference self, Database cx); @@ -60,7 +60,7 @@ ACTOR Future restoreApplierCore(RestoreApplierInterface applierInterf, int when(RestoreSendVersionedMutationsRequest req = waitNext(applierInterf.sendMutationVector.getFuture())) { requestTypeStr = "sendMutationVector"; - actors.add(handleSendMutationVectorRequestV2(req, self)); + actors.add(handleSendMutationVectorRequest(req, self)); } when(RestoreVersionBatchRequest req = waitNext(applierInterf.applyToDB.getFuture())) { requestTypeStr = "applyToDB"; @@ -101,8 +101,8 @@ ACTOR Future restoreApplierCore(RestoreApplierInterface applierInterf, int // No race condition as long as we do not wait or yield when operate the shared data. // Multiple such actors can run on different fileIDs, because mutations in different files belong to different versions; // Only one actor can process mutations from the same file -ACTOR static Future handleSendMutationVectorRequestV2(RestoreSendVersionedMutationsRequest req, - Reference self) { +ACTOR static Future handleSendMutationVectorRequest(RestoreSendVersionedMutationsRequest req, + Reference self) { state Reference batchData = self->batch[req.batchIndex]; // Assume: processedFileState[req.asset] will not be erased while the actor is active. // Note: Insert new items into processedFileState will not invalidate the reference. @@ -421,7 +421,9 @@ ACTOR static Future applyStagingKeys(Reference batchData return Void(); } -ACTOR Future applyToDBV2(UID applierID, int64_t batchIndex, Reference batchData, Database cx) { +// Write mutations to the destination DB +ACTOR Future writeMutationsToDB(UID applierID, int64_t batchIndex, Reference batchData, + Database cx) { TraceEvent("FastRestoreApplerPhaseApplyTxn", applierID).detail("BatchIndex", batchIndex); wait(precomputeMutationsResult(batchData, applierID, batchIndex, cx)); @@ -447,7 +449,7 @@ ACTOR static Future handleApplyToDBRequest(RestoreVersionBatchRequest req, if (!batchData->dbApplier.present()) { isDuplicated = false; batchData->dbApplier = Never(); - batchData->dbApplier = applyToDBV2(self->id(), req.batchIndex, batchData, cx); + batchData->dbApplier = writeMutationsToDB(self->id(), req.batchIndex, batchData, cx); } ASSERT(batchData->dbApplier.present()); From d5d26f589f64c8989672f9d1b36c1d1ac78a713f Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Wed, 19 Feb 2020 15:43:38 -0800 Subject: [PATCH 20/22] FastRestore:Cosmetic change to improve code readability --- fdbserver/RestoreApplier.actor.cpp | 6 +----- fdbserver/RestoreApplier.actor.h | 7 +++++++ 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index 407259a526..49b71d14ff 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -131,7 +131,7 @@ ACTOR static Future handleSendMutationVectorRequest(RestoreSendVersionedMu for (int mIndex = 0; mIndex < mutations.size(); mIndex++) { MutationRef mutation = mutations[mIndex]; - TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations") + TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations", self->id()) .detail("ApplierNode", self->id()) .detail("RestoreAsset", req.asset.toString()) .detail("Version", commitVersion) @@ -195,7 +195,6 @@ ACTOR static Future applyClearRangeMutations(Standalone getAndComputeStagingKeys( std::map::iterator> imcompleteStagingKeys, Database cx, UID applierID) { state Reference tr(new ReadYourWritesTransaction(cx)); - // state std::vector>>> fKVs; state std::vector>> fValues; state std::vector> values; state int i = 0; @@ -207,8 +206,6 @@ ACTOR static Future getAndComputeStagingKeys( tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); for (auto& key : imcompleteStagingKeys) { - // fKVs.push_back(std::make_pair(key.first, tr->get(key.first))); - // fValues.push_back(fKVs.back().second); fValues.push_back(tr->get(key.first)); } for (i = 0; i < fValues.size(); i++) { @@ -227,7 +224,6 @@ ACTOR static Future getAndComputeStagingKeys( } ASSERT(values.size() == imcompleteStagingKeys.size()); - // TODO: Optimize the performance by reducing map lookup: making getKey's future a field in the input map int i = 0; for (auto& key : imcompleteStagingKeys) { if (!values[i].present()) { diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index 7433937cdd..d2c940672a 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -43,6 +43,10 @@ Value applyAtomicOp(Optional existingValue, Value value, MutationRef::Type type); +// Key whose mutations are buffered on applier. +// key, value, type and version defines the parsed mutation at version. +// pendingMutations has all versioned mutations to be applied. +// Mutations in pendingMutations whose version is below the version in StagingKey can be ignored in applying phase. struct StagingKey { Key key; // TODO: Maybe not needed? Value val; @@ -87,6 +91,7 @@ struct StagingKey { } // else input mutation is old and can be ignored } + // Precompute the final value of the key. void precomputeResult() { TraceEvent(SevDebug, "FastRestoreApplierPrecomputeResult") .detail("Key", key) @@ -164,6 +169,8 @@ struct StagingKey { int expectedMutationSize() { return key.size() + val.size(); } }; +// The range mutation received on applier. +// Range mutations should be applied both to the destination DB and to the StagingKeys struct StagingKeyRange { Standalone mutation; Version version; From 898a1ea3edc776643c3b92485dd18bf9135404c1 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Wed, 19 Feb 2020 16:50:42 -0800 Subject: [PATCH 21/22] FastRestore:Applier:Handle mutations at same version --- fdbserver/RestoreApplier.actor.h | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index d2c940672a..d41713904d 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -79,14 +79,32 @@ struct StagingKey { .detail("Version", newVersion) .detail("NewMutation", m.toString()) .detail("ExistingKeyType", typeString[type]); - if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) { - if (m.type != type || m.param2 != val) { - TraceEvent(SevError, "FastRestoreApplierStagingKeyMutationAtSameVersionUnhandled") + if (m.type == MutationRef::SetValue) { + if (type == MutationRef::SetValue) { + if (m.param2 != val) { + TraceEvent(SevError, "FastRestoreApplierStagingKeyMutationAtSameVersionUnhandled") + .detail("Version", newVersion) + .detail("NewMutation", m.toString()) + .detail("ExistingKeyType", typeString[type]) + .detail("ExitingKeyValue", val) + .detail("Investigate", + "Why would backup have two sets with different value at same version"); + } // else {} Backup has duplicate set at the same version + } else { + TraceEvent(SevWarnAlways, "FastRestoreApplierStagingKeyMutationAtSameVersionOverride") .detail("Version", newVersion) .detail("NewMutation", m.toString()) .detail("ExistingKeyType", typeString[type]) .detail("ExitingKeyValue", val); + type = (MutationRef::Type)m.type; + val = m.param2; } + } else if (m.type == MutationRef::ClearRange) { + TraceEvent(SevWarnAlways, "FastRestoreApplierStagingKeyMutationAtSameVersionSkipped") + .detail("Version", newVersion) + .detail("NewMutation", m.toString()) + .detail("ExistingKeyType", typeString[type]) + .detail("ExitingKeyValue", val); } } // else input mutation is old and can be ignored } From 6bd4703a9f1b0cee8f0446cc06e89acd9c122b5e Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Thu, 20 Feb 2020 14:15:53 -0800 Subject: [PATCH 22/22] FastRestore:Resolve review comments --- fdbserver/RestoreApplier.actor.cpp | 13 ++++--------- fdbserver/RestoreApplier.actor.h | 8 +++----- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index 49b71d14ff..e398e0ae8c 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -126,8 +126,7 @@ ACTOR static Future handleSendMutationVectorRequest(RestoreSendVersionedMu // mutations in log file is in [beginVersion, endVersion], both inclusive. ASSERT_WE_THINK(commitVersion >= req.asset.beginVersion); // Loader sends the endVersion to ensure all useful versions are sent - ASSERT_WE_THINK((req.isRangeFile && commitVersion <= req.asset.endVersion) || - (!req.isRangeFile && commitVersion <= req.asset.endVersion)); + ASSERT_WE_THINK(commitVersion <= req.asset.endVersion); for (int mIndex = 0; mIndex < mutations.size(); mIndex++) { MutationRef mutation = mutations[mIndex]; @@ -196,7 +195,6 @@ ACTOR static Future getAndComputeStagingKeys( std::map::iterator> imcompleteStagingKeys, Database cx, UID applierID) { state Reference tr(new ReadYourWritesTransaction(cx)); state std::vector>> fValues; - state std::vector> values; state int i = 0; TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID) .detail("GetKeys", imcompleteStagingKeys.size()); @@ -208,10 +206,7 @@ ACTOR static Future getAndComputeStagingKeys( for (auto& key : imcompleteStagingKeys) { fValues.push_back(tr->get(key.first)); } - for (i = 0; i < fValues.size(); i++) { - Optional val = wait(fValues[i]); - values.push_back(val); - } + wait(waitForAll(fValues)); break; } catch (Error& e) { TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") @@ -223,10 +218,10 @@ ACTOR static Future getAndComputeStagingKeys( } } - ASSERT(values.size() == imcompleteStagingKeys.size()); + ASSERT(fValues.size() == imcompleteStagingKeys.size()); int i = 0; for (auto& key : imcompleteStagingKeys) { - if (!values[i].present()) { + if (!fValues[i].get().present()) { TraceEvent(SevWarnAlways, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") .detail("Key", key.first) .detail("Reason", "Not found in DB") diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index d41713904d..6782186650 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -29,8 +29,8 @@ #include #include "flow/Stats.h" -#include "fdbclient/FDBTypes.h" #include "fdbclient/Atomic.h" +#include "fdbclient/FDBTypes.h" #include "fdbclient/CommitTransaction.h" #include "fdbrpc/fdbrpc.h" #include "fdbrpc/Locality.h" @@ -241,10 +241,8 @@ struct ApplierBatchData : public ReferenceCounted { void addMutation(MutationRef m, Version ver) { if (!isRangeMutation(m)) { - if (stagingKeys.find(m.param1) == stagingKeys.end()) { - stagingKeys.emplace(m.param1, StagingKey()); - } - stagingKeys[m.param1].add(m, ver); + auto item = stagingKeys.emplace(m.param1, StagingKey()); + item.first->second.add(m, ver); } else { stagingKeyRanges.insert(StagingKeyRange(m, ver)); }