FastRestore:Fix clearrange on a key mistakenly clear other keys

This commit is contained in:
Meng Xu 2020-03-31 17:45:19 -07:00
parent 212dadc2a1
commit 25e96a13d3
3 changed files with 10 additions and 4 deletions

View File

@ -304,7 +304,8 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
.detail("ClearRangeUpperBound", rangeMutation.mutation.param2)
.detail("UsedUpperBound", ub->first);
}
lb->second.add(rangeMutation.mutation, rangeMutation.version);
MutationRef clearKey(MutationRef::ClearRange, lb->first, lb->first);
lb->second.add(clearKey, rangeMutation.version);
lb++;
}
}

View File

@ -79,6 +79,10 @@ struct StagingKey {
// newVersion can be smaller than version as different loaders can send
// mutations out of order.
if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) {
if (m.type == MutationRef::ClearRange) {
// We should only clear this key! Otherwise, it causes side effect to other keys
ASSERT(m.param1 == m.param2);
}
if (version < newVersion) {
if (debugMutation("StagingKeyAdd", newVersion.version, m)) {
TraceEvent("StagingKeyAdd")
@ -122,9 +126,10 @@ struct StagingKey {
.detail("LargestPendingVersion",
(pendingMutations.empty() ? "[none]" : pendingMutations.rbegin()->first.toString()));
std::map<LogMessageVersion, Standalone<MutationRef>>::iterator lb = pendingMutations.lower_bound(version);
if (lb == pendingMutations.end()) {
if (lb == pendingMutations.end()) { //pendingMutations.empty() ||
return;
}
ASSERT(!pendingMutations.empty());
if (lb->first == version) {
// Sanity check mutations at version are either atomicOps which can be ignored or the same value as buffered
MutationRef m = lb->second;

View File

@ -207,9 +207,9 @@ StringRef debugKey2 = LiteralStringRef( "\xff\xff\xff\xff" );
bool debugMutation( const char* context, Version version, MutationRef const& mutation ) {
if ((mutation.type == mutation.SetValue || mutation.type == mutation.AddValue || mutation.type==mutation.DebugKey) && (mutation.param1 == debugKey || mutation.param1 == debugKey2))
;//TraceEvent("MutationTracking").detail("At", context).detail("Version", version).detail("MutationType", "SetValue").detail("Key", mutation.param1).detail("Value", mutation.param2);
TraceEvent("MutationTracking").detail("At", context).detail("Version", version).detail("MutationType", "SetValue").detail("Key", mutation.param1).detail("Value", mutation.param2);
else if ((mutation.type == mutation.ClearRange || mutation.type == mutation.DebugKeyRange) && ((mutation.param1<=debugKey && mutation.param2>debugKey) || (mutation.param1<=debugKey2 && mutation.param2>debugKey2)))
;//TraceEvent("MutationTracking").detail("At", context).detail("Version", version).detail("MutationType", "ClearRange").detail("KeyBegin", mutation.param1).detail("KeyEnd", mutation.param2);
TraceEvent("MutationTracking").detail("At", context).detail("Version", version).detail("MutationType", "ClearRange").detail("KeyBegin", mutation.param1).detail("KeyEnd", mutation.param2);
else
return false;
const char* type =