FastRestore:Applier:Handle multiple gets in parallel

This commit is contained in:
Meng Xu 2020-02-13 22:17:04 -08:00
parent 53f427c319
commit b57583a504
1 changed files with 47 additions and 37 deletions

View File

@ -200,6 +200,8 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
//state std::vector<std::pair<Key, Future<Optional<Value>>>> fKVs;
state std::vector<Future<Optional<Value>>> fValues;
state std::vector<Optional<Value>> values;
state int i = 0;
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID)
.detail("GetKeys", imcompleteStagingKeys.size());
loop {
@ -212,7 +214,10 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
//fValues.push_back(fKVs.back().second);
fValues.push_back(tr->get(key.first));
}
wait(waitForAll(fValues));
for(i = 0; i < fValues.size(); i++) {
Optional<Value> val = wait(fValues[i]);
values.push_back(val);
}
break;
} catch (Error& e) {
TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
@ -224,13 +229,18 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
}
}
ASSERT(fValues.size() == imcompleteStagingKeys.size());
ASSERT(values.size() == imcompleteStagingKeys.size());
// TODO: Optimize the performance by reducing map lookup: making getKey's future a field in the input map
int i = 0;
for(auto& key : imcompleteStagingKeys) {
if (!fValues[i].get().present()) {
if (!values[i].present()) {
TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("Key", key.first).detail("Reason", "Not found in DB")
.detail("PendingMutations", key.second->second.pendingMutations.size()).detail("StagingKeyType", (int) key.second->second.type);
for(auto& vm : key.second->second.pendingMutations) {
for(auto& m : vm.second) {
TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("PendingMutationVersion", vm.first).detail("PendingMutation", m.toString());
}
}
continue;
}
// The key's version ideally should be the most recently committed version.
@ -303,42 +313,42 @@ ACTOR static Future<Void> getAndComputeStagingKeys(
// }
ACTOR static Future<Void> getAndComputeStagingKeysV3(
std::map<Key, std::map<Key, StagingKey>::iterator> imcompleteStagingKeys, Database cx, UID applierID) {
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID)
.detail("GetKeys", imcompleteStagingKeys.size());
// ACTOR static Future<Void> getAndComputeStagingKeysV3(
// std::map<Key, std::map<Key, StagingKey>::iterator> imcompleteStagingKeys, Database cx, UID applierID) {
// TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID)
// .detail("GetKeys", imcompleteStagingKeys.size());
try {
for (auto& key : imcompleteStagingKeys) {
Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> val = wait(tr->get(key.first));
if (val.present()) {
// The key's version ideally should be the most recently committed version.
// But as long as it is > 1 and less than the start version of the version batch, it is the same result.
MutationRef m(MutationRef::SetValue, key.first, val.get());
key.second->second.add(m, (Version)1);
key.second->second.precomputeResult();
} else {
TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("Key", key.first).detail("Reason", "Not found in DB")
.detail("PendingMutations", key.second->second.pendingMutations.size()).detail("StagingKeyType", (int) key.second->second.type);
}
}
} catch (Error& e) {
TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
.detail("GetKeys", imcompleteStagingKeys.size())
.detail("Error", e.what())
.detail("ErrorCode", e.code());
wait(tr->onError(e));
}
// try {
// for (auto& key : imcompleteStagingKeys) {
// Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
// tr->reset();
// tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
// tr->setOption(FDBTransactionOptions::LOCK_AWARE);
// Optional<Value> val = wait(tr->get(key.first));
// if (val.present()) {
// // The key's version ideally should be the most recently committed version.
// // But as long as it is > 1 and less than the start version of the version batch, it is the same result.
// MutationRef m(MutationRef::SetValue, key.first, val.get());
// key.second->second.add(m, (Version)1);
// key.second->second.precomputeResult();
// } else {
// TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("Key", key.first).detail("Reason", "Not found in DB")
// .detail("PendingMutations", key.second->second.pendingMutations.size()).detail("StagingKeyType", (int) key.second->second.type);
// }
// }
// } catch (Error& e) {
// TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
// .detail("GetKeys", imcompleteStagingKeys.size())
// .detail("Error", e.what())
// .detail("ErrorCode", e.code());
// wait(tr->onError(e));
// }
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID)
.detail("GetKeys", imcompleteStagingKeys.size());
// TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID)
// .detail("GetKeys", imcompleteStagingKeys.size());
return Void();
}
// return Void();
// }
ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData> batchData, UID applierID,
int64_t batchIndex, Database cx) {
@ -396,7 +406,7 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
}
}
Future<Void> fGetAndComputeKeys = getAndComputeStagingKeysV3(imcompleteStagingKeys, cx, applierID);
Future<Void> fGetAndComputeKeys = getAndComputeStagingKeys(imcompleteStagingKeys, cx, applierID);
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
.detail("BatchIndex", batchIndex)