Use KeyRangeMap for splitMutation

This simplifies code by reuse KeyRangeMap class.
This commit is contained in:
Jingyu Zhou 2020-07-08 09:19:05 -07:00
parent 82862c6b1a
commit a5af214861
3 changed files with 113 additions and 120 deletions

View File

@ -37,8 +37,8 @@
typedef std::map<Standalone<StringRef>, std::pair<Standalone<StringRef>, uint32_t>> SerializedMutationListMap;
std::vector<UID> getApplierIDs(std::map<Key, UID>& rangeToApplier);
void splitMutation(std::map<Key, UID>* pRangeToApplier, MutationRef m, Arena& mvector_arena,
VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs);
void splitMutation(KeyRangeMap<UID>& krMap, MutationRef m, Arena& mvector_arena, VectorRef<MutationRef>& mvector,
Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs);
void _parseSerializedMutation(KeyRangeMap<Version>* pRangeVersions,
std::map<LoadingParam, VersionedMutationsMap>::iterator kvOpsIter,
SerializedMutationListMap* mutationMap,
@ -473,6 +473,19 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
return Void();
}
void buildApplierRangeMap(KeyRangeMap<UID>* krMap, std::map<Key, UID>* pRangeToApplier) {
std::map<Key, UID>::iterator beginKey = pRangeToApplier->begin();
std::map<Key, UID>::iterator endKey = std::next(beginKey, 1);
while (endKey != pRangeToApplier->end()) {
krMap->insert(KeyRangeRef(beginKey->first, endKey->first), beginKey->second);
beginKey = endKey;
endKey++;
}
if (beginKey != pRangeToApplier->end()) {
krMap->insert(KeyRangeRef(beginKey->first, normalKeys.end), beginKey->second);
}
}
// Assume: kvOps data are from the same RestoreAsset.
// Input: pkvOps: versioned kv mutation for the asset in the version batch (batchIndex)
// isRangeFile: is pkvOps from range file? Let receiver (applier) know if the mutation is log mutation;
@ -517,6 +530,8 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
for (auto& applierID : applierIDs) {
applierVersionedMutationsBuffer[applierID] = VersionedMutationsVec();
}
KeyRangeMap<UID> krMap;
buildApplierRangeMap(&krMap, pRangeToApplier);
for (kvOp = kvOps.begin(); kvOp != kvOps.end(); kvOp++) {
commitVersion = kvOp->first;
ASSERT(commitVersion.version >= asset.beginVersion);
@ -529,8 +544,7 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
Standalone<VectorRef<UID>> nodeIDs;
// Because using a vector of mutations causes overhead, and the range mutation should happen rarely;
// We handle the range mutation and key mutation differently for the benefit of avoiding memory copy
splitMutation(pRangeToApplier, kvm, mvector.arena(), mvector.contents(), nodeIDs.arena(),
nodeIDs.contents());
splitMutation(krMap, kvm, mvector.arena(), mvector.contents(), nodeIDs.arena(), nodeIDs.contents());
ASSERT(mvector.size() == nodeIDs.size());
if (MUTATION_TRACKING_ENABLED) {
@ -625,59 +639,21 @@ ACTOR Future<Void> sendMutationsToApplier(VersionedMutationsMap* pkvOps, int bat
return Void();
}
void splitMutation(std::map<Key, UID>* pRangeToApplier, MutationRef m, Arena& mvector_arena,
VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs) {
// Splits a clear range mutation for Appliers and puts results of splitted mutations and
// Applier IDs into "mvector" and "nodeIDs" on return.
void splitMutation(KeyRangeMap<UID>& krMap, MutationRef m, Arena& mvector_arena, VectorRef<MutationRef>& mvector,
Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs) {
TraceEvent(SevDebug, "FastRestoreSplitMutation").detail("Mutation", m.toString());
// mvector[i] should be mapped to nodeID[i]
ASSERT(mvector.empty());
ASSERT(nodeIDs.empty());
// key range [m->param1, m->param2)
std::map<Key, UID>::iterator itlow, itup; // we will return [itlow, itup)
itlow = pRangeToApplier->lower_bound(m.param1); // lower_bound returns the iterator that is >= m.param1
if (itlow == pRangeToApplier->end()) {
--itlow;
mvector.push_back_deep(mvector_arena, m);
nodeIDs.push_back(nodeIDs_arena, itlow->second);
return;
}
if (itlow->first > m.param1) {
if (itlow != pRangeToApplier->begin()) {
--itlow;
}
}
itup = pRangeToApplier->upper_bound(m.param2); // return rmap::end if no key is after m.param2.
ASSERT(itup == pRangeToApplier->end() || itup->first > m.param2);
std::map<Key, UID>::iterator itApplier;
while (itlow != itup) {
Standalone<MutationRef> curm; // current mutation
curm.type = m.type;
// The first split mutation should starts with m.first.
// The later ones should start with the rangeToApplier boundary.
if (m.param1 > itlow->first) {
curm.param1 = m.param1;
} else {
curm.param1 = itlow->first;
}
itApplier = itlow;
itlow++;
if (itlow == itup) {
ASSERT(m.param2 <= normalKeys.end);
curm.param2 = m.param2;
} else if (m.param2 < itlow->first) {
UNREACHABLE();
curm.param2 = m.param2;
} else {
curm.param2 = itlow->first;
}
ASSERT(curm.param1 <= curm.param2);
// itup > m.param2: (itup-1) may be out of mutation m's range
// Ensure the added mutations have overlap with mutation m
if (m.param1 < curm.param2 && m.param2 > curm.param1) {
mvector.push_back_deep(mvector_arena, curm);
nodeIDs.push_back(nodeIDs_arena, itApplier->second);
}
auto r = krMap.intersectingRanges(KeyRangeRef(m.param1, m.param2));
for (auto i = r.begin(); i != r.end(); ++i) {
// Calculate the overlap range
KeyRef rangeBegin = m.param1 > i->range().begin ? m.param1 : i->range().begin;
KeyRef rangeEnd = m.param2 < i->range().end ? m.param2 : i->range().end;
KeyRange krange1(KeyRangeRef(rangeBegin, rangeEnd));
mvector.push_back_deep(mvector_arena, MutationRef(MutationRef::ClearRange, rangeBegin, rangeEnd));
nodeIDs.push_back(nodeIDs_arena, i->value());
}
}
@ -1007,6 +983,63 @@ ACTOR Future<Void> handleFinishVersionBatchRequest(RestoreVersionBatchRequest re
return Void();
}
namespace {
void oldSplitMutation(std::map<Key, UID>* pRangeToApplier, MutationRef m, Arena& mvector_arena,
VectorRef<MutationRef>& mvector, Arena& nodeIDs_arena, VectorRef<UID>& nodeIDs) {
// mvector[i] should be mapped to nodeID[i]
ASSERT(mvector.empty());
ASSERT(nodeIDs.empty());
// key range [m->param1, m->param2)
std::map<Key, UID>::iterator itlow, itup; // we will return [itlow, itup)
itlow = pRangeToApplier->lower_bound(m.param1); // lower_bound returns the iterator that is >= m.param1
if (itlow == pRangeToApplier->end()) {
--itlow;
mvector.push_back_deep(mvector_arena, m);
nodeIDs.push_back(nodeIDs_arena, itlow->second);
return;
}
if (itlow->first > m.param1) {
if (itlow != pRangeToApplier->begin()) {
--itlow;
}
}
itup = pRangeToApplier->upper_bound(m.param2); // return rmap::end if no key is after m.param2.
ASSERT(itup == pRangeToApplier->end() || itup->first > m.param2);
std::map<Key, UID>::iterator itApplier;
while (itlow != itup) {
Standalone<MutationRef> curm; // current mutation
curm.type = m.type;
// The first split mutation should starts with m.first.
// The later ones should start with the rangeToApplier boundary.
if (m.param1 > itlow->first) {
curm.param1 = m.param1;
} else {
curm.param1 = itlow->first;
}
itApplier = itlow;
itlow++;
if (itlow == itup) {
ASSERT(m.param2 <= normalKeys.end);
curm.param2 = m.param2;
} else if (m.param2 < itlow->first) {
UNREACHABLE();
curm.param2 = m.param2;
} else {
curm.param2 = itlow->first;
}
ASSERT(curm.param1 <= curm.param2);
// itup > m.param2: (itup-1) may be out of mutation m's range
// Ensure the added mutations have overlap with mutation m
if (m.param1 < curm.param2 && m.param2 > curm.param1) {
mvector.push_back_deep(mvector_arena, curm);
nodeIDs.push_back(nodeIDs_arena, itApplier->second);
}
}
}
// Test splitMutation
TEST_CASE("/FastRestore/RestoreLoader/splitMutation") {
std::map<Key, UID> rangeToApplier;
@ -1028,77 +1061,45 @@ TEST_CASE("/FastRestore/RestoreLoader/splitMutation") {
Key endK = k1 < k2 ? k2 : k1;
Standalone<MutationRef> mutation(MutationRef(MutationRef::ClearRange, beginK.contents(), endK.contents()));
// Method 1: Use splitMutation
splitMutation(&rangeToApplier, mutation, mvector.arena(), mvector.contents(), nodeIDs.arena(), nodeIDs.contents());
// Method 1: Use old splitMutation
oldSplitMutation(&rangeToApplier, mutation, mvector.arena(), mvector.contents(), nodeIDs.arena(),
nodeIDs.contents());
ASSERT(mvector.size() == nodeIDs.size());
// Method 2: Use intersection
// Method 2: Use new intersection based method
KeyRangeMap<UID> krMap;
std::map<Key, UID>::iterator beginKey = rangeToApplier.begin();
std::map<Key, UID>::iterator endKey = std::next(beginKey, 1);
while (endKey != rangeToApplier.end()) {
TraceEvent("KeyRangeMap")
.detail("BeginKey", beginKey->first)
.detail("EndKey", endKey->first)
.detail("Node", beginKey->second);
krMap.insert(KeyRangeRef(beginKey->first, endKey->first), beginKey->second);
beginKey = endKey;
endKey++;
}
if (beginKey != rangeToApplier.end()) {
TraceEvent("KeyRangeMap")
.detail("BeginKey", beginKey->first)
.detail("EndKey", normalKeys.end)
.detail("Node", beginKey->second);
krMap.insert(KeyRangeRef(beginKey->first, normalKeys.end), beginKey->second);
}
buildApplierRangeMap(&krMap, &rangeToApplier);
MutationsVec mvector2;
Standalone<VectorRef<UID>> nodeIDs2;
splitMutation(krMap, mutation, mvector2.arena(), mvector2.contents(), nodeIDs2.arena(), nodeIDs2.contents());
ASSERT(mvector2.size() == nodeIDs2.size());
ASSERT(mvector.size() == mvector2.size());
int splitMutationIndex = 0;
auto r = krMap.intersectingRanges(KeyRangeRef(mutation.param1, mutation.param2));
bool correctResult = true;
for (auto i = r.begin(); i != r.end(); ++i) {
// intersectionRange result
// Calculate the overlap range
KeyRef rangeBegin = mutation.param1 > i->range().begin ? mutation.param1 : i->range().begin;
KeyRef rangeEnd = mutation.param2 < i->range().end ? mutation.param2 : i->range().end;
KeyRange krange1(KeyRangeRef(rangeBegin, rangeEnd));
UID nodeID = i->value();
// splitMuation result
if (splitMutationIndex >= mvector.size()) {
correctResult = false;
break;
}
MutationRef result2M = mvector[splitMutationIndex];
for (; splitMutationIndex < mvector.size(); splitMutationIndex++) {
MutationRef result = mvector[splitMutationIndex];
MutationRef result2 = mvector2[splitMutationIndex];
UID applierID = nodeIDs[splitMutationIndex];
KeyRange krange2(KeyRangeRef(result2M.param1, result2M.param2));
UID applierID2 = nodeIDs2[splitMutationIndex];
KeyRange krange(KeyRangeRef(result.param1, result.param2));
KeyRange krange2(KeyRangeRef(result2.param1, result2.param2));
TraceEvent("Result")
.detail("KeyRange1", krange1.toString())
.detail("KeyRange1", krange.toString())
.detail("KeyRange2", krange2.toString())
.detail("ApplierID1", nodeID)
.detail("ApplierID2", applierID);
if (krange1 != krange2 || nodeID != applierID) {
correctResult = false;
.detail("ApplierID1", applierID)
.detail("ApplierID2", applierID2);
if (krange != krange2 || applierID != applierID2) {
TraceEvent(SevError, "IncorrectResult")
.detail("Mutation", mutation.toString())
.detail("KeyRange1", krange1.toString())
.detail("KeyRange1", krange.toString())
.detail("KeyRange2", krange2.toString())
.detail("ApplierID1", nodeID)
.detail("ApplierID2", applierID);
}
splitMutationIndex++;
}
if (splitMutationIndex != mvector.size()) {
correctResult = false;
TraceEvent(SevError, "SplitMuationTooMany")
.detail("SplitMutationIndex", splitMutationIndex)
.detail("Results", mvector.size());
for (; splitMutationIndex < mvector.size(); splitMutationIndex++) {
TraceEvent("SplitMuationTooMany")
.detail("SplitMutationIndex", splitMutationIndex)
.detail("Result", mvector[splitMutationIndex].toString());
.detail("ApplierID1", applierID)
.detail("ApplierID2", applierID2);
}
}
return Void();
}
} // namespace

View File

@ -231,7 +231,6 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupCorrectnessMultiCycles.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupWriteDuringReadAtomicRestore.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreOldBackupApiCorrectnessAtomicRestore.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreTestSplitMutation.txt)
# Note that status tests are not deterministic.
add_fdb_test(TEST_FILES status/invalid_proc_addresses.txt)
add_fdb_test(TEST_FILES status/local_6_machine_no_replicas_remain.txt)

View File

@ -1,7 +0,0 @@
testTitle=UnitTests
useDB=false
startDelay=0
testName=UnitTests
maxTestCases=0
testsMatching=/FastRestore/RestoreLoader/splitMutation