From 53f427c319225aaff08ad56fb991dd4d3c2d4913 Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Thu, 13 Feb 2020 21:37:40 -0800 Subject: [PATCH] FastRestore:Applier:fix getAndComputeStagingKeys --- fdbserver/RestoreApplier.actor.cpp | 164 ++++++++++++++++++++++++----- 1 file changed, 135 insertions(+), 29 deletions(-) diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index db70a9d122..bbfddd9c2a 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -198,42 +198,140 @@ ACTOR static Future applyClearRangeMutations(Standalone getAndComputeStagingKeys( std::map::iterator> imcompleteStagingKeys, Database cx, UID applierID) { state Reference tr(new ReadYourWritesTransaction(cx)); - state std::vector>>> fKVs; - std::vector>> fValues; + //state std::vector>>> fKVs; + state std::vector>> fValues; TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID) .detail("GetKeys", imcompleteStagingKeys.size()); - try { - tr->reset(); - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr->setOption(FDBTransactionOptions::LOCK_AWARE); - for (auto& key : imcompleteStagingKeys) { - fKVs.push_back(std::make_pair(key.first, tr->get(key.first))); - fValues.push_back(fKVs.back().second); + loop { + try { + tr->reset(); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + for (auto& key : imcompleteStagingKeys) { + //fKVs.push_back(std::make_pair(key.first, tr->get(key.first))); + //fValues.push_back(fKVs.back().second); + fValues.push_back(tr->get(key.first)); + } + wait(waitForAll(fValues)); + break; + } catch (Error& e) { + TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") + .detail("GetKeys", imcompleteStagingKeys.size()) + .detail("Error", e.what()) + .detail("ErrorCode", e.code()); + wait(tr->onError(e)); + fValues.clear(); } - wait(waitForAll(fValues)); - } catch (Error& e) { - TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") - .detail("GetKeys", imcompleteStagingKeys.size()) - .detail("Error", e.what()) - .detail("ErrorCode", e.code()); - wait(tr->onError(e)); } - ASSERT(fKVs.size() == imcompleteStagingKeys.size()); + ASSERT(fValues.size() == imcompleteStagingKeys.size()); // TODO: Optimize the performance by reducing map lookup: making getKey's future a field in the input map - for (auto& kv : fKVs) { - if (!kv.second.get().present()) { - TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") - .detail("ValueNotPresent", kv.first); + int i = 0; + for(auto& key : imcompleteStagingKeys) { + if (!fValues[i].get().present()) { + TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("Key", key.first).detail("Reason", "Not found in DB") + .detail("PendingMutations", key.second->second.pendingMutations.size()).detail("StagingKeyType", (int) key.second->second.type); continue; - } else { - std::map::iterator iter = imcompleteStagingKeys[kv.first]; - // The key's version ideally should be the most recently committed version. - // But as long as it is > 1 and less than the start version of the version batch, it is the same result. - MutationRef m(MutationRef::SetValue, kv.first, kv.second.get().get()); - iter->second.add(m, (Version)1); - iter->second.precomputeResult(); } + // The key's version ideally should be the most recently committed version. + // But as long as it is > 1 and less than the start version of the version batch, it is the same result. + MutationRef m(MutationRef::SetValue, key.first, fValues[i].get().get()); + key.second->second.add(m, (Version)1); + key.second->second.precomputeResult(); + i++; + } + // for (auto& kv : fKVs) { + // if (!kv.second.get().present()) { + // TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") + // .detail("ValueNotPresent", kv.first); + // continue; + // } else { + // std::map::iterator iter = imcompleteStagingKeys[kv.first]; + // // The key's version ideally should be the most recently committed version. + // // But as long as it is > 1 and less than the start version of the version batch, it is the same result. + // MutationRef m(MutationRef::SetValue, kv.first, kv.second.get().get()); + // iter->second.add(m, (Version)1); + // iter->second.precomputeResult(); + // } + // } + + TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID) + .detail("GetKeys", imcompleteStagingKeys.size()); + + return Void(); +} + +// ACTOR static Future getAndComputeStagingKeysV2( +// std::map::iterator> imcompleteStagingKeys, Database cx, UID applierID) { +// state std::vector fRets; +// TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID) +// .detail("GetKeys", imcompleteStagingKeys.size()); + +// try { +// for (auto& key : imcompleteStagingKeys) { +// Reference tr(new ReadYourWritesTransaction(cx)); +// tr->reset(); +// tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); +// tr->setOption(FDBTransactionOptions::LOCK_AWARE); + +// fRets.push_back(map(tr->get(key.first), [](Optional const &val) -> Void() { +// if (val.present()) { +// // The key's version ideally should be the most recently committed version. +// // But as long as it is > 1 and less than the start version of the version batch, it is the same result. +// MutationRef m(MutationRef::SetValue, key.first, val.get()); +// key.second->second.add(m, (Version)1); +// key.second->second.precomputeResult(); +// } else { +// TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("Key", key.first).detail("Reason", "Not found in DB") +// .detail("PendingMutations", key.second->second.pendingMutations.size()).detail("StagingKeyType", (int) key.second->second.type); +// } +// }) ); +// } +// wait(waitForAll(fRets)); +// } catch (Error& e) { +// TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") +// .detail("GetKeys", imcompleteStagingKeys.size()) +// .detail("Error", e.what()) +// .detail("ErrorCode", e.code()); +// wait(tr->onError(e)); +// } + +// TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID) +// .detail("GetKeys", imcompleteStagingKeys.size()); + +// return Void(); +// } + + +ACTOR static Future getAndComputeStagingKeysV3( + std::map::iterator> imcompleteStagingKeys, Database cx, UID applierID) { + TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID) + .detail("GetKeys", imcompleteStagingKeys.size()); + + try { + for (auto& key : imcompleteStagingKeys) { + Reference tr(new ReadYourWritesTransaction(cx)); + tr->reset(); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + Optional val = wait(tr->get(key.first)); + if (val.present()) { + // The key's version ideally should be the most recently committed version. + // But as long as it is > 1 and less than the start version of the version batch, it is the same result. + MutationRef m(MutationRef::SetValue, key.first, val.get()); + key.second->second.add(m, (Version)1); + key.second->second.precomputeResult(); + } else { + TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError").detail("Key", key.first).detail("Reason", "Not found in DB") + .detail("PendingMutations", key.second->second.pendingMutations.size()).detail("StagingKeyType", (int) key.second->second.type); + } + } + } catch (Error& e) { + TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") + .detail("GetKeys", imcompleteStagingKeys.size()) + .detail("Error", e.what()) + .detail("ErrorCode", e.code()); + wait(tr->onError(e)); } TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID) @@ -298,7 +396,7 @@ ACTOR static Future precomputeMutationsResult(Reference } } - Future fGetAndComputeKeys = getAndComputeStagingKeys(imcompleteStagingKeys, cx, applierID); + Future fGetAndComputeKeys = getAndComputeStagingKeysV3(imcompleteStagingKeys, cx, applierID); TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID) .detail("BatchIndex", batchIndex) @@ -330,6 +428,8 @@ ACTOR static Future applyStagingKeysBatch(std::map::itera wait(applyStagingKeysBatchLock->take(TaskPriority::RestoreApplierWriteDB)); state FlowLock::Releaser releaser(*applyStagingKeysBatchLock); state Reference tr(new ReadYourWritesTransaction(cx)); + state int sets = 0; + state int clears = 0; TraceEvent("FastRestoreApplierPhaseApplyStagingKeysBatch", applierID).detail("Begin", begin->first); loop { try { @@ -340,13 +440,19 @@ ACTOR static Future applyStagingKeysBatch(std::map::itera while (iter != end) { if (iter->second.type == MutationRef::SetValue) { tr->set(iter->second.key, iter->second.val); + sets++; } else if (iter->second.type == MutationRef::ClearRange) { tr->clear(KeyRangeRef(iter->second.key, iter->second.val)); + clears++; } else { ASSERT(false); } iter++; + if (sets > 10000000 || clears > 10000000) { + TraceEvent(SevError, "FastRestoreApplierPhaseApplyStagingKeysBatchInfiniteLoop", applierID).detail("Begin", begin->first).detail("Sets", sets).detail("Clears", clears); + } } + TraceEvent("FastRestoreApplierPhaseApplyStagingKeysBatchPrecommit", applierID).detail("Begin", begin->first).detail("Sets", sets).detail("Clears", clears); wait(tr->commit()); break; } catch (Error& e) {