From a5af214861f8b7facd2eb9db9e3eb1b10a6fddec Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Wed, 8 Jul 2020 09:19:05 -0700 Subject: [PATCH] Use KeyRangeMap for splitMutation This simplifies code by reuse KeyRangeMap class. --- fdbserver/RestoreLoader.actor.cpp | 225 +++++++++--------- tests/CMakeLists.txt | 1 - .../slow/ParallelRestoreTestSplitMutation.txt | 7 - 3 files changed, 113 insertions(+), 120 deletions(-) delete mode 100644 tests/slow/ParallelRestoreTestSplitMutation.txt diff --git a/fdbserver/RestoreLoader.actor.cpp b/fdbserver/RestoreLoader.actor.cpp index 821f2ebc31..0522dfacba 100644 --- a/fdbserver/RestoreLoader.actor.cpp +++ b/fdbserver/RestoreLoader.actor.cpp @@ -37,8 +37,8 @@ typedef std::map, std::pair, uint32_t>> SerializedMutationListMap; std::vector getApplierIDs(std::map& rangeToApplier); -void splitMutation(std::map* pRangeToApplier, MutationRef m, Arena& mvector_arena, - VectorRef& mvector, Arena& nodeIDs_arena, VectorRef& nodeIDs); +void splitMutation(KeyRangeMap& krMap, MutationRef m, Arena& mvector_arena, VectorRef& mvector, + Arena& nodeIDs_arena, VectorRef& nodeIDs); void _parseSerializedMutation(KeyRangeMap* pRangeVersions, std::map::iterator kvOpsIter, SerializedMutationListMap* mutationMap, @@ -473,6 +473,19 @@ ACTOR Future handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ return Void(); } +void buildApplierRangeMap(KeyRangeMap* krMap, std::map* pRangeToApplier) { + std::map::iterator beginKey = pRangeToApplier->begin(); + std::map::iterator endKey = std::next(beginKey, 1); + while (endKey != pRangeToApplier->end()) { + krMap->insert(KeyRangeRef(beginKey->first, endKey->first), beginKey->second); + beginKey = endKey; + endKey++; + } + if (beginKey != pRangeToApplier->end()) { + krMap->insert(KeyRangeRef(beginKey->first, normalKeys.end), beginKey->second); + } +} + // Assume: kvOps data are from the same RestoreAsset. // Input: pkvOps: versioned kv mutation for the asset in the version batch (batchIndex) // isRangeFile: is pkvOps from range file? Let receiver (applier) know if the mutation is log mutation; @@ -517,6 +530,8 @@ ACTOR Future sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat for (auto& applierID : applierIDs) { applierVersionedMutationsBuffer[applierID] = VersionedMutationsVec(); } + KeyRangeMap krMap; + buildApplierRangeMap(&krMap, pRangeToApplier); for (kvOp = kvOps.begin(); kvOp != kvOps.end(); kvOp++) { commitVersion = kvOp->first; ASSERT(commitVersion.version >= asset.beginVersion); @@ -529,8 +544,7 @@ ACTOR Future sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat Standalone> nodeIDs; // Because using a vector of mutations causes overhead, and the range mutation should happen rarely; // We handle the range mutation and key mutation differently for the benefit of avoiding memory copy - splitMutation(pRangeToApplier, kvm, mvector.arena(), mvector.contents(), nodeIDs.arena(), - nodeIDs.contents()); + splitMutation(krMap, kvm, mvector.arena(), mvector.contents(), nodeIDs.arena(), nodeIDs.contents()); ASSERT(mvector.size() == nodeIDs.size()); if (MUTATION_TRACKING_ENABLED) { @@ -625,59 +639,21 @@ ACTOR Future sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat return Void(); } -void splitMutation(std::map* pRangeToApplier, MutationRef m, Arena& mvector_arena, - VectorRef& mvector, Arena& nodeIDs_arena, VectorRef& nodeIDs) { +// Splits a clear range mutation for Appliers and puts results of splitted mutations and +// Applier IDs into "mvector" and "nodeIDs" on return. +void splitMutation(KeyRangeMap& krMap, MutationRef m, Arena& mvector_arena, VectorRef& mvector, + Arena& nodeIDs_arena, VectorRef& nodeIDs) { TraceEvent(SevDebug, "FastRestoreSplitMutation").detail("Mutation", m.toString()); - // mvector[i] should be mapped to nodeID[i] ASSERT(mvector.empty()); ASSERT(nodeIDs.empty()); - // key range [m->param1, m->param2) - std::map::iterator itlow, itup; // we will return [itlow, itup) - itlow = pRangeToApplier->lower_bound(m.param1); // lower_bound returns the iterator that is >= m.param1 - if (itlow == pRangeToApplier->end()) { - --itlow; - mvector.push_back_deep(mvector_arena, m); - nodeIDs.push_back(nodeIDs_arena, itlow->second); - return; - } - if (itlow->first > m.param1) { - if (itlow != pRangeToApplier->begin()) { - --itlow; - } - } - - itup = pRangeToApplier->upper_bound(m.param2); // return rmap::end if no key is after m.param2. - ASSERT(itup == pRangeToApplier->end() || itup->first > m.param2); - - std::map::iterator itApplier; - while (itlow != itup) { - Standalone curm; // current mutation - curm.type = m.type; - // The first split mutation should starts with m.first. - // The later ones should start with the rangeToApplier boundary. - if (m.param1 > itlow->first) { - curm.param1 = m.param1; - } else { - curm.param1 = itlow->first; - } - itApplier = itlow; - itlow++; - if (itlow == itup) { - ASSERT(m.param2 <= normalKeys.end); - curm.param2 = m.param2; - } else if (m.param2 < itlow->first) { - UNREACHABLE(); - curm.param2 = m.param2; - } else { - curm.param2 = itlow->first; - } - ASSERT(curm.param1 <= curm.param2); - // itup > m.param2: (itup-1) may be out of mutation m's range - // Ensure the added mutations have overlap with mutation m - if (m.param1 < curm.param2 && m.param2 > curm.param1) { - mvector.push_back_deep(mvector_arena, curm); - nodeIDs.push_back(nodeIDs_arena, itApplier->second); - } + auto r = krMap.intersectingRanges(KeyRangeRef(m.param1, m.param2)); + for (auto i = r.begin(); i != r.end(); ++i) { + // Calculate the overlap range + KeyRef rangeBegin = m.param1 > i->range().begin ? m.param1 : i->range().begin; + KeyRef rangeEnd = m.param2 < i->range().end ? m.param2 : i->range().end; + KeyRange krange1(KeyRangeRef(rangeBegin, rangeEnd)); + mvector.push_back_deep(mvector_arena, MutationRef(MutationRef::ClearRange, rangeBegin, rangeEnd)); + nodeIDs.push_back(nodeIDs_arena, i->value()); } } @@ -1007,6 +983,63 @@ ACTOR Future handleFinishVersionBatchRequest(RestoreVersionBatchRequest re return Void(); } +namespace { + +void oldSplitMutation(std::map* pRangeToApplier, MutationRef m, Arena& mvector_arena, + VectorRef& mvector, Arena& nodeIDs_arena, VectorRef& nodeIDs) { + // mvector[i] should be mapped to nodeID[i] + ASSERT(mvector.empty()); + ASSERT(nodeIDs.empty()); + // key range [m->param1, m->param2) + std::map::iterator itlow, itup; // we will return [itlow, itup) + itlow = pRangeToApplier->lower_bound(m.param1); // lower_bound returns the iterator that is >= m.param1 + if (itlow == pRangeToApplier->end()) { + --itlow; + mvector.push_back_deep(mvector_arena, m); + nodeIDs.push_back(nodeIDs_arena, itlow->second); + return; + } + if (itlow->first > m.param1) { + if (itlow != pRangeToApplier->begin()) { + --itlow; + } + } + + itup = pRangeToApplier->upper_bound(m.param2); // return rmap::end if no key is after m.param2. + ASSERT(itup == pRangeToApplier->end() || itup->first > m.param2); + + std::map::iterator itApplier; + while (itlow != itup) { + Standalone curm; // current mutation + curm.type = m.type; + // The first split mutation should starts with m.first. + // The later ones should start with the rangeToApplier boundary. + if (m.param1 > itlow->first) { + curm.param1 = m.param1; + } else { + curm.param1 = itlow->first; + } + itApplier = itlow; + itlow++; + if (itlow == itup) { + ASSERT(m.param2 <= normalKeys.end); + curm.param2 = m.param2; + } else if (m.param2 < itlow->first) { + UNREACHABLE(); + curm.param2 = m.param2; + } else { + curm.param2 = itlow->first; + } + ASSERT(curm.param1 <= curm.param2); + // itup > m.param2: (itup-1) may be out of mutation m's range + // Ensure the added mutations have overlap with mutation m + if (m.param1 < curm.param2 && m.param2 > curm.param1) { + mvector.push_back_deep(mvector_arena, curm); + nodeIDs.push_back(nodeIDs_arena, itApplier->second); + } + } +} + // Test splitMutation TEST_CASE("/FastRestore/RestoreLoader/splitMutation") { std::map rangeToApplier; @@ -1028,77 +1061,45 @@ TEST_CASE("/FastRestore/RestoreLoader/splitMutation") { Key endK = k1 < k2 ? k2 : k1; Standalone mutation(MutationRef(MutationRef::ClearRange, beginK.contents(), endK.contents())); - // Method 1: Use splitMutation - splitMutation(&rangeToApplier, mutation, mvector.arena(), mvector.contents(), nodeIDs.arena(), nodeIDs.contents()); + // Method 1: Use old splitMutation + oldSplitMutation(&rangeToApplier, mutation, mvector.arena(), mvector.contents(), nodeIDs.arena(), + nodeIDs.contents()); ASSERT(mvector.size() == nodeIDs.size()); - // Method 2: Use intersection + // Method 2: Use new intersection based method KeyRangeMap krMap; - std::map::iterator beginKey = rangeToApplier.begin(); - std::map::iterator endKey = std::next(beginKey, 1); - while (endKey != rangeToApplier.end()) { - TraceEvent("KeyRangeMap") - .detail("BeginKey", beginKey->first) - .detail("EndKey", endKey->first) - .detail("Node", beginKey->second); - krMap.insert(KeyRangeRef(beginKey->first, endKey->first), beginKey->second); - beginKey = endKey; - endKey++; - } - if (beginKey != rangeToApplier.end()) { - TraceEvent("KeyRangeMap") - .detail("BeginKey", beginKey->first) - .detail("EndKey", normalKeys.end) - .detail("Node", beginKey->second); - krMap.insert(KeyRangeRef(beginKey->first, normalKeys.end), beginKey->second); - } + buildApplierRangeMap(&krMap, &rangeToApplier); + MutationsVec mvector2; + Standalone> nodeIDs2; + splitMutation(krMap, mutation, mvector2.arena(), mvector2.contents(), nodeIDs2.arena(), nodeIDs2.contents()); + ASSERT(mvector2.size() == nodeIDs2.size()); + + ASSERT(mvector.size() == mvector2.size()); int splitMutationIndex = 0; - auto r = krMap.intersectingRanges(KeyRangeRef(mutation.param1, mutation.param2)); - bool correctResult = true; - for (auto i = r.begin(); i != r.end(); ++i) { - // intersectionRange result - // Calculate the overlap range - KeyRef rangeBegin = mutation.param1 > i->range().begin ? mutation.param1 : i->range().begin; - KeyRef rangeEnd = mutation.param2 < i->range().end ? mutation.param2 : i->range().end; - KeyRange krange1(KeyRangeRef(rangeBegin, rangeEnd)); - UID nodeID = i->value(); - // splitMuation result - if (splitMutationIndex >= mvector.size()) { - correctResult = false; - break; - } - MutationRef result2M = mvector[splitMutationIndex]; + for (; splitMutationIndex < mvector.size(); splitMutationIndex++) { + MutationRef result = mvector[splitMutationIndex]; + MutationRef result2 = mvector2[splitMutationIndex]; UID applierID = nodeIDs[splitMutationIndex]; - KeyRange krange2(KeyRangeRef(result2M.param1, result2M.param2)); + UID applierID2 = nodeIDs2[splitMutationIndex]; + KeyRange krange(KeyRangeRef(result.param1, result.param2)); + KeyRange krange2(KeyRangeRef(result2.param1, result2.param2)); TraceEvent("Result") - .detail("KeyRange1", krange1.toString()) + .detail("KeyRange1", krange.toString()) .detail("KeyRange2", krange2.toString()) - .detail("ApplierID1", nodeID) - .detail("ApplierID2", applierID); - if (krange1 != krange2 || nodeID != applierID) { - correctResult = false; + .detail("ApplierID1", applierID) + .detail("ApplierID2", applierID2); + if (krange != krange2 || applierID != applierID2) { TraceEvent(SevError, "IncorrectResult") .detail("Mutation", mutation.toString()) - .detail("KeyRange1", krange1.toString()) + .detail("KeyRange1", krange.toString()) .detail("KeyRange2", krange2.toString()) - .detail("ApplierID1", nodeID) - .detail("ApplierID2", applierID); - } - splitMutationIndex++; - } - - if (splitMutationIndex != mvector.size()) { - correctResult = false; - TraceEvent(SevError, "SplitMuationTooMany") - .detail("SplitMutationIndex", splitMutationIndex) - .detail("Results", mvector.size()); - for (; splitMutationIndex < mvector.size(); splitMutationIndex++) { - TraceEvent("SplitMuationTooMany") - .detail("SplitMutationIndex", splitMutationIndex) - .detail("Result", mvector[splitMutationIndex].toString()); + .detail("ApplierID1", applierID) + .detail("ApplierID2", applierID2); } } return Void(); } + +} // namespace diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 7739935cb7..854969f2be 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -231,7 +231,6 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessMultiCycles.txt) add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupWriteDuringReadAtomicRestore.txt) add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupApiCorrectnessAtomicRestore.txt) - add_fdb_test(TEST_FILES slow/ParallelRestoreTestSplitMutation.txt) # Note that status tests are not deterministic. add_fdb_test(TEST_FILES status/invalid_proc_addresses.txt) add_fdb_test(TEST_FILES status/local_6_machine_no_replicas_remain.txt) diff --git a/tests/slow/ParallelRestoreTestSplitMutation.txt b/tests/slow/ParallelRestoreTestSplitMutation.txt deleted file mode 100644 index ab28b2e5bc..0000000000 --- a/tests/slow/ParallelRestoreTestSplitMutation.txt +++ /dev/null @@ -1,7 +0,0 @@ -testTitle=UnitTests -useDB=false -startDelay=0 - - testName=UnitTests - maxTestCases=0 - testsMatching=/FastRestore/RestoreLoader/splitMutation