FastRestore:Applier:Keep incompleteStagingKeys content before values are applied to DB

To avoid the incompleteStagingKeys is cleared before  getAndComputeStagingKeys() finish using it.
This commit is contained in:
Meng Xu 2020-04-04 22:38:02 -07:00
parent a81ec332a9
commit 432c99afd0
2 changed files with 20 additions and 2 deletions

View File

@ -250,6 +250,15 @@ struct RestoreAsset {
bool isInVersionRange(Version commitVersion) const {
return commitVersion >= beginVersion && commitVersion < endVersion;
}
// Is mutation's begin and end keys are in RestoreAsset's range
bool isInKeyRange(MutationRef mutation) const {
if (mutation.type == MutationRef::ClearRange) {
return mutation.param1 >= range.begin && mutation.param2 <= range.end;
} else {
return mutation.param1 >= range.begin && mutation.param1 < range.end;
}
}
};
struct LoadingParam {

View File

@ -151,6 +151,7 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
batchData->counters.receivedAtomicOps += isAtomicOp((MutationRef::Type)mutation.type) ? 1 : 0;
// Sanity check
if (g_network->isSimulated()) {
// TODO: Use asset.isInKeyRange();
if (isRangeMutation(mutation)) {
ASSERT(mutation.param1 >= req.asset.range.begin &&
mutation.param2 <= req.asset.range.end); // Range mutation's right side is exclusive
@ -278,6 +279,11 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
double curTxnSize = 0;
for (auto& rangeMutation : batchData->stagingKeyRanges) {
KeyRangeRef range(rangeMutation.mutation.param1, rangeMutation.mutation.param2);
if (debugMutation("FastRestoreApplierPrecomputeMutationsResultClearRange", rangeMutation.version.version, MutationRef(MutationRef::ClearRange, range.begin, range.end))) {
TraceEvent("FastRestoreApplierPrecomputeMutationsResultClearRange")
.detail("Version", rangeMutation.version.version)
.detail("Sub", rangeMutation.version.sub);
}
clearRanges.push_back(clearRanges.arena(), range);
curTxnSize += range.expectedSize();
if (curTxnSize >= SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) {
@ -306,6 +312,7 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
.detail("ClearRangeUpperBound", rangeMutation.mutation.param2)
.detail("UsedUpperBound", ub->first);
}
// Q: Can beginKey = endKey and clear beginKey?
MutationRef clearKey(MutationRef::ClearRange, lb->first, lb->first);
lb->second.add(clearKey, rangeMutation.version);
lb++;
@ -320,7 +327,8 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
// Get keys in stagingKeys which does not have a baseline key by reading database cx, and precompute the key's value
std::vector<Future<Void>> fGetAndComputeKeys;
std::map<Key, std::map<Key, StagingKey>::iterator> incompleteStagingKeys;
std::vector<std::map<Key, std::map<Key, StagingKey>::iterator>> incompleteStagingKeysBuf(1);
std::map<Key, std::map<Key, StagingKey>::iterator>& incompleteStagingKeys = incompleteStagingKeysBuf.back();
std::map<Key, StagingKey>::iterator stagingKeyIter = batchData->stagingKeys.begin();
int numKeysInBatch = 0;
for (; stagingKeyIter != batchData->stagingKeys.end(); stagingKeyIter++) {
@ -332,7 +340,8 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
if (numKeysInBatch == SERVER_KNOBS->FASTRESTORE_APPLIER_FETCH_KEYS_SIZE) {
fGetAndComputeKeys.push_back(getAndComputeStagingKeys(incompleteStagingKeys, cx, applierID));
numKeysInBatch = 0;
incompleteStagingKeys.clear();
incompleteStagingKeysBuf.push_back(std::map<Key, std::map<Key, StagingKey>::iterator>());
incompleteStagingKeys = incompleteStagingKeysBuf.back();
}
}
if (numKeysInBatch > 0) {