FastRestore:Applier:Handle version stamped key values
This commit is contained in:
parent
b008df97eb
commit
d3c01763d9
|
@ -28,11 +28,11 @@ static ValueRef doLittleEndianAdd(const Optional<ValueRef>& existingValueOptiona
|
|||
const ValueRef& existingValue = existingValueOptional.present() ? existingValueOptional.get() : StringRef();
|
||||
if(!existingValue.size()) return otherOperand;
|
||||
if(!otherOperand.size()) return otherOperand;
|
||||
|
||||
|
||||
uint8_t* buf = new (ar) uint8_t [otherOperand.size()];
|
||||
int i = 0;
|
||||
int carry = 0;
|
||||
|
||||
|
||||
for(i = 0; i<std::min(existingValue.size(), otherOperand.size()); i++) {
|
||||
int sum = existingValue[i] + otherOperand[i] + carry;
|
||||
buf[i] = sum;
|
||||
|
@ -44,16 +44,16 @@ static ValueRef doLittleEndianAdd(const Optional<ValueRef>& existingValueOptiona
|
|||
carry = sum >> 8;
|
||||
}
|
||||
|
||||
return StringRef(buf, i);
|
||||
return StringRef(buf, i);
|
||||
}
|
||||
|
||||
static ValueRef doAnd(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
|
||||
const ValueRef& existingValue = existingValueOptional.present() ? existingValueOptional.get() : StringRef();
|
||||
if(!otherOperand.size()) return otherOperand;
|
||||
|
||||
|
||||
uint8_t* buf = new (ar) uint8_t [otherOperand.size()];
|
||||
int i = 0;
|
||||
|
||||
|
||||
for(i = 0; i<std::min(existingValue.size(), otherOperand.size()); i++)
|
||||
buf[i] = existingValue[i] & otherOperand[i];
|
||||
for(; i<otherOperand.size(); i++)
|
||||
|
@ -76,7 +76,7 @@ static ValueRef doOr(const Optional<ValueRef>& existingValueOptional, const Valu
|
|||
|
||||
uint8_t* buf = new (ar) uint8_t [otherOperand.size()];
|
||||
int i = 0;
|
||||
|
||||
|
||||
for(i = 0; i<std::min(existingValue.size(), otherOperand.size()); i++)
|
||||
buf[i] = existingValue[i] | otherOperand[i];
|
||||
for(; i<otherOperand.size(); i++)
|
||||
|
@ -89,10 +89,10 @@ static ValueRef doXor(const Optional<ValueRef>& existingValueOptional, const Val
|
|||
const ValueRef& existingValue = existingValueOptional.present() ? existingValueOptional.get() : StringRef();
|
||||
if(!existingValue.size()) return otherOperand;
|
||||
if(!otherOperand.size()) return otherOperand;
|
||||
|
||||
|
||||
uint8_t* buf = new (ar) uint8_t [otherOperand.size()];
|
||||
int i = 0;
|
||||
|
||||
|
||||
for(i = 0; i<std::min(existingValue.size(), otherOperand.size()); i++)
|
||||
buf[i] = existingValue[i] ^ otherOperand[i];
|
||||
|
||||
|
@ -212,7 +212,7 @@ static ValueRef doMinV2(const Optional<ValueRef>& existingValueOptional, const V
|
|||
|
||||
static ValueRef doByteMin(const Optional<ValueRef>& existingValueOptional, const ValueRef& otherOperand, Arena& ar) {
|
||||
if (!existingValueOptional.present()) return otherOperand;
|
||||
|
||||
|
||||
const ValueRef& existingValue = existingValueOptional.get();
|
||||
if (existingValue < otherOperand)
|
||||
return existingValue;
|
||||
|
|
|
@ -123,6 +123,7 @@ ACTOR static Future<Void> handleSendMutationVectorRequestV2(RestoreSendVersioned
|
|||
if (curFilePos.get() == req.prevVersion) {
|
||||
isDuplicated = false;
|
||||
Version commitVersion = req.version;
|
||||
uint16_t numVersionStampedKV = 0;
|
||||
MutationsVec mutations(req.mutations);
|
||||
// Sanity check: mutations in range file is in [beginVersion, endVersion);
|
||||
// mutations in log file is in [beginVersion, endVersion], both inclusive.
|
||||
|
@ -153,7 +154,12 @@ ACTOR static Future<Void> handleSendMutationVectorRequestV2(RestoreSendVersioned
|
|||
}
|
||||
}
|
||||
// Note: Log and range mutations may be delivered out of order. Can we handle it?
|
||||
batchData->addMutation(mutation, commitVersion);
|
||||
if (mutation.type == MutationRef::SetVersionstampedKey || mutation.type == MutationRef::SetVersionstampedValue) {
|
||||
batchData->addVersionStampedKV(mutation, commitVersion, numVersionStampedKV);
|
||||
numVersionStampedKV++;
|
||||
} else {
|
||||
batchData->addMutation(mutation, commitVersion);
|
||||
}
|
||||
}
|
||||
curFilePos.set(req.version);
|
||||
}
|
||||
|
@ -775,9 +781,7 @@ ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req,
|
|||
// Copy from WriteDuringRead.actor.cpp
|
||||
Value applyAtomicOp(Optional<StringRef> existingValue, Value value, MutationRef::Type type) {
|
||||
Arena arena;
|
||||
if (type == MutationRef::SetValue)
|
||||
return value;
|
||||
else if (type == MutationRef::AddValue)
|
||||
if (type == MutationRef::AddValue)
|
||||
return doLittleEndianAdd(existingValue, value, arena);
|
||||
else if (type == MutationRef::AppendIfFits)
|
||||
return doAppendIfFits(existingValue, value, arena);
|
||||
|
@ -795,8 +799,10 @@ Value applyAtomicOp(Optional<StringRef> existingValue, Value value, MutationRef:
|
|||
return doByteMin(existingValue, value, arena);
|
||||
else if (type == MutationRef::ByteMax)
|
||||
return doByteMax(existingValue, value, arena);
|
||||
else if (type == MutationRef::CompareAndClear)
|
||||
return doCompareAndClear(existingValue, value, arena);
|
||||
else {
|
||||
TraceEvent(SevError, "ApplyAtomicOpUnhandledType").detail("Type", (int) type).detail("TypeName", typeString[type]);
|
||||
TraceEvent(SevError, "ApplyAtomicOpUnhandledType").detail("TypeCode", (int) type).detail("TypeName", typeString[type]);
|
||||
ASSERT(false);
|
||||
}
|
||||
return Value();
|
||||
|
|
|
@ -52,7 +52,10 @@ struct StagingKey {
|
|||
|
||||
explicit StagingKey() : version(0), type(MutationRef::MAX_ATOMIC_OP) {}
|
||||
|
||||
// Add mutation m at newVersion to stagingKey
|
||||
// Assume: SetVersionstampedKey and SetVersionstampedValue have been converted to set
|
||||
void add(const MutationRef& m, Version newVersion) {
|
||||
ASSERT(m.type != MutationRef::SetVersionstampedKey && m.type != MutationRef::SetVersionstampedValue);
|
||||
if (version < newVersion) {
|
||||
if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) {
|
||||
key = m.param1;
|
||||
|
@ -209,6 +212,22 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
}
|
||||
}
|
||||
|
||||
void addVersionStampedKV(MutationRef m, Version ver, uint16_t numVersionStampedKV) {
|
||||
if (m.type == MutationRef::SetVersionstampedKey) {
|
||||
// Assume transactionNumber = 0 does not affect result
|
||||
TraceEvent(SevDebug, "FastRestoreApplierAddMutation").detail("MutationType", typeString[m.type]).detail("FakedTransactionNumber", numVersionStampedKV);
|
||||
transformVersionstampMutation(m, &MutationRef::param1, ver, numVersionStampedKV);
|
||||
addMutation(m, ver);
|
||||
} else if (m.type == MutationRef::SetVersionstampedValue) {
|
||||
// Assume transactionNumber = 0 does not affect result
|
||||
TraceEvent(SevDebug, "FastRestoreApplierAddMutation").detail("MutationType", typeString[m.type]).detail("FakedTransactionNumber", numVersionStampedKV);
|
||||
transformVersionstampMutation(m, &MutationRef::param2, ver, numVersionStampedKV);
|
||||
addMutation(m, ver);
|
||||
} else {
|
||||
ASSERT(false);
|
||||
}
|
||||
}
|
||||
|
||||
// Return true if all staging keys have been precomputed
|
||||
bool allKeysPrecomputed() {
|
||||
for (auto& stagingKey : stagingKeys) {
|
||||
|
|
Loading…
Reference in New Issue