RestoreApplier:Better handling of key not exist

This commit is contained in:
Meng Xu 2020-06-07 20:35:07 -07:00
parent 94be3afcf8
commit 8c81fedf11
1 changed files with 22 additions and 23 deletions

View File

@ -213,6 +213,21 @@ ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRange
return Void();
}
ACTOR Future<Optional<Value>> getValue(Reference<ReadYourWritesTransaction> tr, Key key, int i,
std::set<int>* keysNotFound) {
try {
Optional<Value> v = wait(tr->get(key));
return v;
} catch (Error& e) {
if (e.code() == error_code_key_not_found) {
keysNotFound->insert(i);
return Optional<Value>();
} else {
throw;
}
}
}
// Get keys in incompleteStagingKeys and precompute the stagingKey which is stored in batchData->stagingKeys
ACTOR static Future<Void> getAndComputeStagingKeys(
std::map<Key, std::map<Key, StagingKey>::iterator> incompleteStagingKeys, double delayTime, Database cx,
@ -231,46 +246,30 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
state std::set<int> keysNotFound;
state int i = 0;
state bool hasError = false;
loop {
hasError = false;
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
i = 0;
for (auto& key : incompleteStagingKeys) {
if (!fValues[i].isReady() || !keysNotFound.count(i)) {
fValues[i] = tr->get(key.first);
if (!keysNotFound.count(i)) { // only get exist-keys
fValues[i] = getValue(tr, key.first, i, &keysNotFound);
}
++i;
}
for (i = 0; i < incompleteStagingKeys.size(); ++i) {
if (keysNotFound.count(i)) {
continue;
}
wait(success(fValues[i])); // NOTE: This may be waiting for ever!
}
wait(waitForAll(fValues));
break;
} catch (Error& e) {
if (e.code() == error_code_key_not_found) { // e.code() == error_code_transaction_too_old || e.code() ==
// error_code_future_version
keysNotFound.insert(i);
} else {
hasError = true;
}
if (retries > incompleteStagingKeys.size()) {
TraceEvent(SevError, "GetAndComputeStagingKeys", applierID)
bool ok = (e.code() != error_code_key_not_found);
if (!ok || retries++ > incompleteStagingKeys.size()) {
TraceEvent(!ok ? SevError : SevWarnAlways, "GetAndComputeStagingKeys", applierID)
.detail("BatchIndex", batchIndex)
.detail("KeyIndex", i)
.error(e);
}
wait(tr->onError(e));
}
if (!hasError) {
break;
}
retries++;
}
ASSERT(fValues.size() == incompleteStagingKeys.size());