FastRestore:Applier:fix getAndComputeStagingKeys

This commit is contained in:
Meng Xu 2020-02-13 21:37:40 -08:00
parent 0d668ea0c3
commit 53f427c319
1 changed files with 135 additions and 29 deletions

View File

@ -198,42 +198,140 @@ ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRange
ACTOR static Future<Void> getAndComputeStagingKeys(
std::map<Key, std::map<Key, StagingKey>::iterator> imcompleteStagingKeys, Database cx, UID applierID) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state std::vector<std::pair<Key, Future<Optional<Value>>>> fKVs;
std::vector<Future<Optional<Value>>> fValues;
//state std::vector<std::pair<Key, Future<Optional<Value>>>> fKVs;
state std::vector<Future<Optional<Value>>> fValues;
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID)
.detail("GetKeys", imcompleteStagingKeys.size());
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
for (auto& key : imcompleteStagingKeys) {
fKVs.push_back(std::make_pair(key.first, tr->get(key.first)));
fValues.push_back(fKVs.back().second);
loop {
try {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
for (auto& key : imcompleteStagingKeys) {
//fKVs.push_back(std::make_pair(key.first, tr->get(key.first)));
//fValues.push_back(fKVs.back().second);
fValues.push_back(tr->get(key.first));
}
wait(waitForAll(fValues));
break;
} catch (Error& e) {
TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
.detail("GetKeys", imcompleteStagingKeys.size())
.detail("Error", e.what())
.detail("ErrorCode", e.code());
wait(tr->onError(e));
fValues.clear();
}
wait(waitForAll(fValues));
} catch (Error& e) {
TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
.detail("GetKeys", imcompleteStagingKeys.size())
.detail("Error", e.what())
.detail("ErrorCode", e.code());
wait(tr->onError(e));
}
ASSERT(fKVs.size() == imcompleteStagingKeys.size());
ASSERT(fValues.size() == imcompleteStagingKeys.size());
// TODO: Optimize the performance by reducing map lookup: making getKey's future a field in the input map
for (auto& kv : fKVs) {
if (!kv.second.get().present()) {
TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
.detail("ValueNotPresent", kv.first);
int i = 0;
for(auto& key : imcompleteStagingKeys) {
if (!fValues[i].get().present()) {
TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("Key", key.first).detail("Reason", "Not found in DB")
.detail("PendingMutations", key.second->second.pendingMutations.size()).detail("StagingKeyType", (int) key.second->second.type);
continue;
} else {
std::map<Key, StagingKey>::iterator iter = imcompleteStagingKeys[kv.first];
// The key's version ideally should be the most recently committed version.
// But as long as it is > 1 and less than the start version of the version batch, it is the same result.
MutationRef m(MutationRef::SetValue, kv.first, kv.second.get().get());
iter->second.add(m, (Version)1);
iter->second.precomputeResult();
}
// The key's version ideally should be the most recently committed version.
// But as long as it is > 1 and less than the start version of the version batch, it is the same result.
MutationRef m(MutationRef::SetValue, key.first, fValues[i].get().get());
key.second->second.add(m, (Version)1);
key.second->second.precomputeResult();
i++;
}
// for (auto& kv : fKVs) {
// if (!kv.second.get().present()) {
// TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
// .detail("ValueNotPresent", kv.first);
// continue;
// } else {
// std::map<Key, StagingKey>::iterator iter = imcompleteStagingKeys[kv.first];
// // The key's version ideally should be the most recently committed version.
// // But as long as it is > 1 and less than the start version of the version batch, it is the same result.
// MutationRef m(MutationRef::SetValue, kv.first, kv.second.get().get());
// iter->second.add(m, (Version)1);
// iter->second.precomputeResult();
// }
// }
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID)
.detail("GetKeys", imcompleteStagingKeys.size());
return Void();
}
// ACTOR static Future<Void> getAndComputeStagingKeysV2(
// std::map<Key, std::map<Key, StagingKey>::iterator> imcompleteStagingKeys, Database cx, UID applierID) {
// state std::vector<Future<Void> fRets;
// TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID)
// .detail("GetKeys", imcompleteStagingKeys.size());
// try {
// for (auto& key : imcompleteStagingKeys) {
// Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
// tr->reset();
// tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
// tr->setOption(FDBTransactionOptions::LOCK_AWARE);
// fRets.push_back(map(tr->get(key.first), [](Optional<Value> const &val) -> Void() {
// if (val.present()) {
// // The key's version ideally should be the most recently committed version.
// // But as long as it is > 1 and less than the start version of the version batch, it is the same result.
// MutationRef m(MutationRef::SetValue, key.first, val.get());
// key.second->second.add(m, (Version)1);
// key.second->second.precomputeResult();
// } else {
// TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("Key", key.first).detail("Reason", "Not found in DB")
// .detail("PendingMutations", key.second->second.pendingMutations.size()).detail("StagingKeyType", (int) key.second->second.type);
// }
// }) );
// }
// wait(waitForAll(fRets));
// } catch (Error& e) {
// TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
// .detail("GetKeys", imcompleteStagingKeys.size())
// .detail("Error", e.what())
// .detail("ErrorCode", e.code());
// wait(tr->onError(e));
// }
// TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID)
// .detail("GetKeys", imcompleteStagingKeys.size());
// return Void();
// }
ACTOR static Future<Void> getAndComputeStagingKeysV3(
std::map<Key, std::map<Key, StagingKey>::iterator> imcompleteStagingKeys, Database cx, UID applierID) {
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID)
.detail("GetKeys", imcompleteStagingKeys.size());
try {
for (auto& key : imcompleteStagingKeys) {
Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> val = wait(tr->get(key.first));
if (val.present()) {
// The key's version ideally should be the most recently committed version.
// But as long as it is > 1 and less than the start version of the version batch, it is the same result.
MutationRef m(MutationRef::SetValue, key.first, val.get());
key.second->second.add(m, (Version)1);
key.second->second.precomputeResult();
} else {
TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("Key", key.first).detail("Reason", "Not found in DB")
.detail("PendingMutations", key.second->second.pendingMutations.size()).detail("StagingKeyType", (int) key.second->second.type);
}
}
} catch (Error& e) {
TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError")
.detail("GetKeys", imcompleteStagingKeys.size())
.detail("Error", e.what())
.detail("ErrorCode", e.code());
wait(tr->onError(e));
}
TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID)
@ -298,7 +396,7 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
}
}
Future<Void> fGetAndComputeKeys = getAndComputeStagingKeys(imcompleteStagingKeys, cx, applierID);
Future<Void> fGetAndComputeKeys = getAndComputeStagingKeysV3(imcompleteStagingKeys, cx, applierID);
TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID)
.detail("BatchIndex", batchIndex)
@ -330,6 +428,8 @@ ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::itera
wait(applyStagingKeysBatchLock->take(TaskPriority::RestoreApplierWriteDB));
state FlowLock::Releaser releaser(*applyStagingKeysBatchLock);
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state int sets = 0;
state int clears = 0;
TraceEvent("FastRestoreApplierPhaseApplyStagingKeysBatch", applierID).detail("Begin", begin->first);
loop {
try {
@ -340,13 +440,19 @@ ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::itera
while (iter != end) {
if (iter->second.type == MutationRef::SetValue) {
tr->set(iter->second.key, iter->second.val);
sets++;
} else if (iter->second.type == MutationRef::ClearRange) {
tr->clear(KeyRangeRef(iter->second.key, iter->second.val));
clears++;
} else {
ASSERT(false);
}
iter++;
if (sets > 10000000 || clears > 10000000) {
TraceEvent(SevError, "FastRestoreApplierPhaseApplyStagingKeysBatchInfiniteLoop", applierID).detail("Begin", begin->first).detail("Sets", sets).detail("Clears", clears);
}
}
TraceEvent("FastRestoreApplierPhaseApplyStagingKeysBatchPrecommit", applierID).detail("Begin", begin->first).detail("Sets", sets).detail("Clears", clears);
wait(tr->commit());
break;
} catch (Error& e) {