FastRestore:Applier:Handle CompareAndClear atomicOp

This commit is contained in:
Meng Xu 2020-02-13 11:06:20 -08:00
parent d3c01763d9
commit 58dad5373b
2 changed files with 13 additions and 7 deletions

View File

@ -778,14 +778,15 @@ ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req,
return Void();
}
// Copy from WriteDuringRead.actor.cpp
// Copy from WriteDuringRead.actor.cpp with small modifications
// Not all AtomicOps are handled in this function: SetVersionstampedKey, SetVersionstampedValue,
Value applyAtomicOp(Optional<StringRef> existingValue, Value value, MutationRef::Type type) {
Arena arena;
if (type == MutationRef::AddValue)
return doLittleEndianAdd(existingValue, value, arena);
else if (type == MutationRef::AppendIfFits)
return doAppendIfFits(existingValue, value, arena);
else if (type == MutationRef::And)
else if (type == MutationRef::And || type == MutationRef::AndV2)
return doAndV2(existingValue, value, arena);
else if (type == MutationRef::Or)
return doOr(existingValue, value, arena);
@ -793,14 +794,12 @@ Value applyAtomicOp(Optional<StringRef> existingValue, Value value, MutationRef:
return doXor(existingValue, value, arena);
else if (type == MutationRef::Max)
return doMax(existingValue, value, arena);
else if (type == MutationRef::Min)
else if (type == MutationRef::Min || type == MutationRef::MinV2)
return doMinV2(existingValue, value, arena);
else if (type == MutationRef::ByteMin)
return doByteMin(existingValue, value, arena);
else if (type == MutationRef::ByteMax)
return doByteMax(existingValue, value, arena);
else if (type == MutationRef::CompareAndClear)
return doCompareAndClear(existingValue, value, arena);
else {
TraceEvent(SevError, "ApplyAtomicOpUnhandledType").detail("TypeCode", (int) type).detail("TypeName", typeString[type]);
ASSERT(false);

View File

@ -116,9 +116,17 @@ struct StagingKey {
continue;
}
for (auto& mutation : lb->second) {
if (isAtomicOp((MutationRef::Type) mutation.type)) {
if (type == MutationRef::CompareAndClear) { // Special atomicOp
Optional<Value> retVal = doCompareAndClear(existingValue, value, arena);
if (!retVal.present()) {
val = key;
type = MutationRef::ClearRange;
} // else no-op
} else if (isAtomicOp((MutationRef::Type) mutation.type)) {
val = applyAtomicOp(val, mutation.param2, (MutationRef::Type)mutation.type);
type = MutationRef::SetValue; // Precomputed result should be set to DB.
} else if (mutation.type == MutationRef::SetValue || mutation.type == MutationRef::ClearRange) {
type = MutationRef::SetValue; // Precomputed result should be set to DB.
TraceEvent(SevError, "FastRestoreApplierPrecomputeResultUnexpectedSet")
.detail("Type", typeString[mutation.type])
.detail("Version", lb->first);
@ -129,7 +137,6 @@ struct StagingKey {
}
}
version = lb->first;
type = MutationRef::SetValue; // Precomputed result should be set to DB.
}
}