diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index 8d8e6cccc5..5382f37c51 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -552,7 +552,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula init( FASTRESTORE_VB_LAUNCH_DELAY, 5 ); if( randomize ) { FASTRESTORE_VB_LAUNCH_DELAY = deterministicRandom()->random01() * 60 + 1; } init( FASTRESTORE_ROLE_LOGGING_DELAY, 5 ); if( randomize ) { FASTRESTORE_ROLE_LOGGING_DELAY = deterministicRandom()->random01() * 60 + 1; } init( FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL, 5 ); if( randomize ) { FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL = deterministicRandom()->random01() * 60 + 1; } - init( FASTRESTORE_ATOMICOP_WEIGHT, 100 ); if( randomize ) { FASTRESTORE_ATOMICOP_WEIGHT = deterministicRandom()->random01() * 200 + 1; } + init( FASTRESTORE_ATOMICOP_WEIGHT, 100 ); if( randomize ) { FASTRESTORE_ATOMICOP_WEIGHT = deterministicRandom()->random01() * 200 + 1; } + init( FASTRESTORE_APPLYING_PARALLELISM, 100 ); if( randomize ) { FASTRESTORE_APPLYING_PARALLELISM = deterministicRandom()->random01() * 10 + 1; } init( FASTRESTORE_MONITOR_LEADER_DELAY, 5 ); if( randomize ) { FASTRESTORE_MONITOR_LEADER_DELAY = deterministicRandom()->random01() * 100; } // clang-format on diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index b8d9a7c4cc..144b5eb142 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -495,6 +495,7 @@ public: int64_t FASTRESTORE_ROLE_LOGGING_DELAY; int64_t FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL; // How quickly to update process metrics for restore int64_t FASTRESTORE_ATOMICOP_WEIGHT; // workload amplication factor for atomic op + int64_t FASTRESTORE_APPLYING_PARALLELISM; // number of outstanding txns writing to dest. DB int64_t FASTRESTORE_MONITOR_LEADER_DELAY; ServerKnobs(bool randomize = false, ClientKnobs* clientKnobs = NULL, bool isSimulated = false); diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index 0a714bd488..e398e0ae8c 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -108,7 +108,7 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu // Note: Insert new items into processedFileState will not invalidate the reference. state NotifiedVersion& curFilePos = batchData->processedFileState[req.asset]; - TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations", self->id()) + TraceEvent(SevDebug, "FastRestoreApplierPhaseReceiveMutations", self->id()) .detail("BatchIndex", req.batchIndex) .detail("RestoreAsset", req.asset.toString()) .detail("ProcessedFileVersion", curFilePos.get()) @@ -120,20 +120,17 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu if (curFilePos.get() == req.prevVersion) { isDuplicated = false; Version commitVersion = req.version; + uint16_t numVersionStampedKV = 0; MutationsVec mutations(req.mutations); // Sanity check: mutations in range file is in [beginVersion, endVersion); // mutations in log file is in [beginVersion, endVersion], both inclusive. ASSERT_WE_THINK(commitVersion >= req.asset.beginVersion); // Loader sends the endVersion to ensure all useful versions are sent - ASSERT_WE_THINK((req.isRangeFile && commitVersion <= req.asset.endVersion) || - (!req.isRangeFile && commitVersion <= req.asset.endVersion)); + ASSERT_WE_THINK(commitVersion <= req.asset.endVersion); - if (batchData->kvOps.find(commitVersion) == batchData->kvOps.end()) { - batchData->kvOps.insert(std::make_pair(commitVersion, MutationsVec())); - } for (int mIndex = 0; mIndex < mutations.size(); mIndex++) { MutationRef mutation = mutations[mIndex]; - TraceEvent(SevFRMutationInfo, "FastRestore") + TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseReceiveMutations", self->id()) .detail("ApplierNode", self->id()) .detail("RestoreAsset", req.asset.toString()) .detail("Version", commitVersion) @@ -152,313 +149,276 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu ASSERT(mutation.param1 >= req.asset.range.begin && mutation.param1 < req.asset.range.end); } } - batchData->kvOps[commitVersion].push_back_deep(batchData->kvOps[commitVersion].arena(), mutation); - // TODO: What if log file's mutations are delivered out-of-order (behind) the range file's mutations?! + // Note: Log and range mutations may be delivered out of order. Can we handle it? + if (mutation.type == MutationRef::SetVersionstampedKey || + mutation.type == MutationRef::SetVersionstampedValue) { + batchData->addVersionStampedKV(mutation, commitVersion, numVersionStampedKV); + numVersionStampedKV++; + } else { + batchData->addMutation(mutation, commitVersion); + } } curFilePos.set(req.version); } req.reply.send(RestoreCommonReply(self->id(), isDuplicated)); + TraceEvent(SevDebug, "FastRestoreApplierPhaseReceiveMutationsDone", self->id()) + .detail("BatchIndex", req.batchIndex) + .detail("RestoreAsset", req.asset.toString()) + .detail("ProcessedFileVersion", curFilePos.get()) + .detail("Request", req.toString()); return Void(); } -// Progress and checkpoint for applying (atomic) mutations in transactions to DB -struct DBApplyProgress { - // Mutation state in the current uncommitted transaction - VersionedMutationsMap::iterator curItInCurTxn; - int curIndexInCurTxn; - - // Save the starting point for current txn to handle (commit_unknown_result) error in txn commit - // startItInUncommittedTxn is starting iterator in the most recent uncommitted (and failed) txn - // startIndexInUncommittedTxn is start index in the most recent uncommitted (and failed) txn. - // Note: Txns have different number of mutations - VersionedMutationsMap::iterator startItInUncommittedTxn; - int startIndexInUncommittedTxn; - - // State to decide if a txn succeeds or not when txn error (commit_unknown_result) happens; - // curTxnId: The id of the current uncommitted txn, which monotonically increase for each successful transaction - // uncommittedTxnId: The id of the most recent succeeded txn. Used to recover the failed txn id in retry - // lastTxnHasError: Does the last txn has error. TODO: Only need to handle txn_commit_unknown error - Version curTxnId; - Version uncommittedTxnId; - bool lastTxnHasError; - - // Decide when to commit a transaction. We buffer enough mutations in a txn before commit the txn - bool startNextVersion; // The next txn will include mutations in next version - int numAtomicOps; // Status counter - double txnBytes; // Decide when to commit a txn - double txnMutations; // Status counter - - Reference<ApplierBatchData> batchData; - UID applierId; - - DBApplyProgress() = default; - explicit DBApplyProgress(UID applierId, Reference<ApplierBatchData> batchData) - : applierId(applierId), batchData(batchData), curIndexInCurTxn(0), startIndexInUncommittedTxn(0), curTxnId(0), - uncommittedTxnId(0), lastTxnHasError(false), startNextVersion(false), numAtomicOps(0), txnBytes(0), - txnMutations(0) { - curItInCurTxn = batchData->kvOps.begin(); - while (curItInCurTxn != batchData->kvOps.end() && curItInCurTxn->second.empty()) { - curItInCurTxn++; - } - startItInUncommittedTxn = curItInCurTxn; - } - - // Has all mutations been committed? - bool isDone() { return curItInCurTxn == batchData->kvOps.end(); } - - // Set cursor for next mutation - void nextMutation() { - curIndexInCurTxn++; - while (curItInCurTxn != batchData->kvOps.end() && curIndexInCurTxn >= curItInCurTxn->second.size()) { - curIndexInCurTxn = 0; - curItInCurTxn++; - startNextVersion = true; - } - } - - // Setup for the next transaction; This should be done after nextMutation() - void nextTxn() { - txnBytes = 0; - txnMutations = 0; - numAtomicOps = 0; - lastTxnHasError = false; - startNextVersion = false; - - curTxnId++; - - startIndexInUncommittedTxn = curIndexInCurTxn; - startItInUncommittedTxn = curItInCurTxn; - uncommittedTxnId = curTxnId; - } - - // Rollback to the starting point of the uncommitted-and-failed transaction to - // re-execute uncommitted txn - void rollback() { - TraceEvent(SevWarn, "FastRestoreApplyTxnError") - .detail("TxnStatusFailed", curTxnId) - .detail("ApplierApplyToDB", applierId) - .detail("UncommittedTxnId", uncommittedTxnId) - .detail("CurIteratorVersion", curItInCurTxn->first) - .detail("StartIteratorVersionInUncommittedTxn", startItInUncommittedTxn->first) - .detail("CurrentIndexInFailedTxn", curIndexInCurTxn) - .detail("StartIndexInUncommittedTxn", startIndexInUncommittedTxn) - .detail("NumIncludedAtomicOps", numAtomicOps); - curItInCurTxn = startItInUncommittedTxn; - curIndexInCurTxn = startIndexInUncommittedTxn; - curTxnId = uncommittedTxnId; - - numAtomicOps = 0; - txnBytes = 0; - txnMutations = 0; - startNextVersion = false; - lastTxnHasError = false; - } - - bool shouldCommit() { - return (!lastTxnHasError && (startNextVersion || txnBytes >= SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES || - curItInCurTxn == batchData->kvOps.end())); - } - - bool hasError() { return lastTxnHasError; } - - void setTxnError(Error& e) { - TraceEvent(SevWarnAlways, "FastRestoreApplyTxnError") - .detail("TxnStatus", "?") - .detail("ApplierApplyToDB", applierId) - .detail("TxnId", curTxnId) - .detail("StartIndexInCurrentTxn", curIndexInCurTxn) - .detail("Version", curItInCurTxn->first) - .error(e, true); - lastTxnHasError = true; - } - - MutationRef getCurrentMutation() { - ASSERT_WE_THINK(curIndexInCurTxn < curItInCurTxn->second.size()); - return curItInCurTxn->second[curIndexInCurTxn]; - } -}; - -ACTOR Future<Void> applyToDB(UID applierID, int64_t batchIndex, Reference<ApplierBatchData> batchData, Database cx) { - // state variables must be defined at the start of actor to be initialized in the actor constructor - state std::string typeStr = ""; +// Clear all ranges in input ranges +ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRangeRef>> ranges, Database cx) { state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx)); - state DBApplyProgress progress(applierID, batchData); - - TraceEvent("FastRestoreApplerPhaseApplyTxn", applierID) - .detail("BatchIndex", batchIndex) - .detail("FromVersion", batchData->kvOps.empty() ? -1 : batchData->kvOps.begin()->first) - .detail("EndVersion", batchData->kvOps.empty() ? -1 : batchData->kvOps.rbegin()->first); - - // Assume the process will not crash when it apply mutations to DB. The reply message can be lost though - if (batchData->kvOps.empty()) { - TraceEvent("FastRestoreApplerPhaseApplyTxnDone", applierID) - .detail("BatchIndex", batchIndex) - .detail("Reason", "NoMutationAtVersions"); - return Void(); - } - - batchData->sanityCheckMutationOps(); - - if (progress.isDone()) { - TraceEvent("FastRestoreApplerPhaseApplyTxnDone", applierID) - .detail("BatchIndex", batchIndex) - .detail("Reason", "NoMutationAtVersions"); - return Void(); - } - - // Sanity check the restoreApplierKeys, which should be empty at this point loop { try { tr->reset(); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); - Key begin = restoreApplierKeyFor(applierID, batchIndex, 0); - Key end = restoreApplierKeyFor(applierID, batchIndex, std::numeric_limits<int64_t>::max()); - Standalone<RangeResultRef> txnIds = wait(tr->getRange(KeyRangeRef(begin, end), CLIENT_KNOBS->TOO_MANY)); - if (txnIds.size() > 0) { - TraceEvent(SevError, "FastRestoreApplyTxnStateNotClean").detail("TxnIds", txnIds.size()); - for (auto& kv : txnIds) { - UID id; - int64_t index; - Version txnId; - std::tie(id, index, txnId) = decodeRestoreApplierKey(kv.key); - TraceEvent(SevError, "FastRestoreApplyTxnStateNotClean") - .detail("Applier", id) - .detail("BatchIndex", index) - .detail("ResidueTxnID", txnId); - } + for (auto& range : ranges) { + tr->clear(range); } - break; - } catch (Error& e) { - wait(tr->onError(e)); - } - } - - loop { // Transaction retry loop - try { - // Check if the transaction succeeds - if (progress.hasError()) { - tr->reset(); - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr->setOption(FDBTransactionOptions::LOCK_AWARE); - Optional<Value> txnSucceeded = - wait(tr->get(restoreApplierKeyFor(applierID, batchIndex, progress.curTxnId))); - if (!txnSucceeded.present()) { - progress.rollback(); - continue; - } else { - TraceEvent(SevWarn, "FastRestoreApplyTxnError") - .detail("TxnStatusSucceeded", progress.curTxnId) - .detail("ApplierApplyToDB", applierID) - .detail("CurIteratorVersion", progress.curItInCurTxn->first) - .detail("CurrentIteratorMutations", progress.curItInCurTxn->second.size()) - .detail("CurrentIndexInSucceedTxn", progress.curIndexInCurTxn) - .detail("NumIncludedAtomicOps", progress.numAtomicOps); - // Txn succeeded and exectue the same logic when txn succeeds - } - } else { // !lastTxnHasError: accumulate mutations in a txn - tr->reset(); - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr->setOption(FDBTransactionOptions::LOCK_AWARE); - TraceEvent(SevFRMutationInfo, "FastRestore_ApplierTxn") - .detail("ApplierApplyToDB", applierID) - .detail("TxnId", progress.curTxnId) - .detail("CurrentIndexInCurrentTxn", progress.curIndexInCurTxn) - .detail("CurrentIteratorMutations", progress.curItInCurTxn->second.size()) - .detail("Version", progress.curItInCurTxn->first); - - // restoreApplierKeyFor(self->id(), curTxnId) to tell if txn succeeds at an unknown error - tr->set(restoreApplierKeyFor(applierID, batchIndex, progress.curTxnId), restoreApplierTxnValue); - - while (1) { // Loop: Accumulate mutations in a transaction - MutationRef m = progress.getCurrentMutation(); - - if (m.type >= MutationRef::Type::SetValue && m.type <= MutationRef::Type::MAX_ATOMIC_OP) { - typeStr = typeString[m.type]; - } else { - TraceEvent(SevError, "FastRestore").detail("InvalidMutationType", m.type); - } - - TraceEvent(SevFRMutationInfo, "FastRestore") - .detail("ApplierApplyToDB", applierID) - .detail("Version", progress.curItInCurTxn->first) - .detail("Index", progress.curIndexInCurTxn) - .detail("Mutation", m.toString()) - .detail("MutationSize", m.totalSize()) - .detail("TxnSize", progress.txnBytes); - if (m.type == MutationRef::SetValue) { - tr->set(m.param1, m.param2); - } else if (m.type == MutationRef::ClearRange) { - KeyRangeRef mutationRange(m.param1, m.param2); - tr->clear(mutationRange); - } else if (isAtomicOp((MutationRef::Type)m.type)) { - tr->atomicOp(m.param1, m.param2, m.type); - progress.numAtomicOps++; - } else { - TraceEvent(SevError, "FastRestore") - .detail("UnhandledMutationType", m.type) - .detail("TypeName", typeStr); - } - - progress.txnBytes += m.totalSize(); // Changed expectedSize to totalSize - progress.txnMutations += 1; - - progress.nextMutation(); // Prepare for the next mutation - // commit per FASTRESTORE_TXN_BATCH_MAX_BYTES bytes; and commit does not cross version boundary - if (progress.shouldCommit()) { - break; // Got enough mutation in the txn - } - } - } // !lastTxnHasError - - // Commit the txn and prepare the starting point for next txn - if (progress.shouldCommit()) { - wait(tr->commit()); - // Update status counter appliedWeightedBytes, appliedMutations, atomicOps - batchData->counters.appliedWeightedBytes += progress.txnBytes; - batchData->counters.appliedMutations += progress.txnMutations; - batchData->counters.appliedAtomicOps += progress.numAtomicOps; - batchData->counters.appliedTxns += 1; - } - - if (progress.isDone()) { // Are all mutations processed? - break; - } - progress.nextTxn(); - } catch (Error& e) { - TraceEvent(SevWarnAlways, "FastRestoreApplyTxnError") - .detail("TxnStatus", "?") - .detail("ApplierApplyToDB", applierID) - .detail("TxnId", progress.curTxnId) - .detail("CurrentIndexInCurrentTxn", progress.curIndexInCurTxn) - .detail("Version", progress.curItInCurTxn->first) - .error(e, true); - progress.lastTxnHasError = true; - wait(tr->onError(e)); - } - } - - TraceEvent("FastRestoreApplerPhaseApplyTxn", applierID) - .detail("BatchIndex", batchIndex) - .detail("CleanupCurTxnIds", progress.curTxnId); - // clean up txn ids - loop { - try { - tr->reset(); - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - tr->setOption(FDBTransactionOptions::LOCK_AWARE); - // Clear txnIds in [0, progress.curTxnId). We add 100 to curTxnId just to be safe. - tr->clear(KeyRangeRef(restoreApplierKeyFor(applierID, batchIndex, 0), - restoreApplierKeyFor(applierID, batchIndex, progress.curTxnId + 100))); wait(tr->commit()); break; } catch (Error& e) { wait(tr->onError(e)); } } - // House cleaning - batchData->kvOps.clear(); + return Void(); +} + +// Get keys in imcompleteStagingKeys and precompute the stagingKey which is stored in batchData->stagingKeys +ACTOR static Future<Void> getAndComputeStagingKeys( + std::map<Key, std::map<Key, StagingKey>::iterator> imcompleteStagingKeys, Database cx, UID applierID) { + state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx)); + state std::vector<Future<Optional<Value>>> fValues; + state int i = 0; + TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID) + .detail("GetKeys", imcompleteStagingKeys.size()); + loop { + try { + tr->reset(); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + for (auto& key : imcompleteStagingKeys) { + fValues.push_back(tr->get(key.first)); + } + wait(waitForAll(fValues)); + break; + } catch (Error& e) { + TraceEvent(SevError, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") + .detail("GetKeys", imcompleteStagingKeys.size()) + .detail("Error", e.what()) + .detail("ErrorCode", e.code()); + wait(tr->onError(e)); + fValues.clear(); + } + } + + ASSERT(fValues.size() == imcompleteStagingKeys.size()); + int i = 0; + for (auto& key : imcompleteStagingKeys) { + if (!fValues[i].get().present()) { + TraceEvent(SevWarnAlways, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") + .detail("Key", key.first) + .detail("Reason", "Not found in DB") + .detail("PendingMutations", key.second->second.pendingMutations.size()) + .detail("StagingKeyType", (int)key.second->second.type); + for (auto& vm : key.second->second.pendingMutations) { + for (auto& m : vm.second) { + TraceEvent(SevWarnAlways, "FastRestoreApplierGetAndComputeStagingKeysUnhandledError") + .detail("PendingMutationVersion", vm.first) + .detail("PendingMutation", m.toString()); + } + } + key.second->second.precomputeResult(); + i++; + continue; + } else { + // The key's version ideally should be the most recently committed version. + // But as long as it is > 1 and less than the start version of the version batch, it is the same result. + MutationRef m(MutationRef::SetValue, key.first, fValues[i].get().get()); + key.second->second.add(m, (Version)1); + key.second->second.precomputeResult(); + i++; + } + } + + TraceEvent("FastRestoreApplierGetAndComputeStagingKeysDone", applierID) + .detail("GetKeys", imcompleteStagingKeys.size()); + + return Void(); +} + +ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData> batchData, UID applierID, + int64_t batchIndex, Database cx) { + // Apply range mutations (i.e., clearRange) to database cx + TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID) + .detail("BatchIndex", batchIndex) + .detail("Step", "Applying clear range mutations to DB") + .detail("ClearRanges", batchData->stagingKeyRanges.size()); + state std::vector<Future<Void>> fClearRanges; + std::vector<Standalone<VectorRef<KeyRangeRef>>> clearBuf; + clearBuf.push_back(Standalone<VectorRef<KeyRangeRef>>()); + Standalone<VectorRef<KeyRangeRef>> clearRanges = clearBuf.back(); + double curTxnSize = 0; + for (auto& rangeMutation : batchData->stagingKeyRanges) { + KeyRangeRef range(rangeMutation.mutation.param1, rangeMutation.mutation.param2); + clearRanges.push_back(clearRanges.arena(), range); + curTxnSize += range.expectedSize(); + if (curTxnSize >= SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) { + fClearRanges.push_back(applyClearRangeMutations(clearRanges, cx)); + clearBuf.push_back(Standalone<VectorRef<KeyRangeRef>>()); + clearRanges = clearBuf.back(); + curTxnSize = 0; + } + } + if (curTxnSize > 0) { + fClearRanges.push_back(applyClearRangeMutations(clearRanges, cx)); + } + + // Apply range mutations (i.e., clearRange) to stagingKeyRanges + TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID) + .detail("BatchIndex", batchIndex) + .detail("Step", "Applying clear range mutations to staging keys") + .detail("ClearRanges", batchData->stagingKeyRanges.size()); + for (auto& rangeMutation : batchData->stagingKeyRanges) { + std::map<Key, StagingKey>::iterator lb = batchData->stagingKeys.lower_bound(rangeMutation.mutation.param1); + std::map<Key, StagingKey>::iterator ub = batchData->stagingKeys.upper_bound(rangeMutation.mutation.param2); + while (lb != ub) { + lb->second.add(rangeMutation.mutation, rangeMutation.version); + lb++; + } + } + + wait(waitForAll(fClearRanges)); + TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID) + .detail("BatchIndex", batchIndex) + .detail("Step", "Getting and computing staging keys") + .detail("StagingKeys", batchData->stagingKeys.size()); + + // Get keys in stagingKeys which does not have a baseline key by reading database cx, and precompute the key's value + std::map<Key, std::map<Key, StagingKey>::iterator> imcompleteStagingKeys; + std::map<Key, StagingKey>::iterator stagingKeyIter = batchData->stagingKeys.begin(); + for (; stagingKeyIter != batchData->stagingKeys.end(); stagingKeyIter++) { + if (!stagingKeyIter->second.hasBaseValue()) { + imcompleteStagingKeys.emplace(stagingKeyIter->first, stagingKeyIter); + } + } + + Future<Void> fGetAndComputeKeys = getAndComputeStagingKeys(imcompleteStagingKeys, cx, applierID); + + TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID) + .detail("BatchIndex", batchIndex) + .detail("Step", "Compute the other staging keys") + .detail("StagingKeys", batchData->stagingKeys.size()); + // Pre-compute pendingMutations to other keys in stagingKeys that has base value + for (stagingKeyIter = batchData->stagingKeys.begin(); stagingKeyIter != batchData->stagingKeys.end(); + stagingKeyIter++) { + if (stagingKeyIter->second.hasBaseValue()) { + stagingKeyIter->second.precomputeResult(); + } + } + + TraceEvent("FastRestoreApplierGetAndComputeStagingKeysWaitOn", applierID); + wait(fGetAndComputeKeys); + + // Sanity check all stagingKeys have been precomputed + ASSERT_WE_THINK(batchData->allKeysPrecomputed()); + + TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResultDone", applierID).detail("BatchIndex", batchIndex); + + return Void(); +} + +// Apply mutations in batchData->stagingKeys [begin, end). +ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::iterator begin, + std::map<Key, StagingKey>::iterator end, Database cx, + FlowLock* applyStagingKeysBatchLock, UID applierID) { + wait(applyStagingKeysBatchLock->take(TaskPriority::RestoreApplierWriteDB)); + state FlowLock::Releaser releaser(*applyStagingKeysBatchLock); + state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx)); + state int sets = 0; + state int clears = 0; + TraceEvent("FastRestoreApplierPhaseApplyStagingKeysBatch", applierID).detail("Begin", begin->first); + loop { + try { + tr->reset(); + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + std::map<Key, StagingKey>::iterator iter = begin; + while (iter != end) { + if (iter->second.type == MutationRef::SetValue) { + tr->set(iter->second.key, iter->second.val); + sets++; + } else if (iter->second.type == MutationRef::ClearRange) { + tr->clear(KeyRangeRef(iter->second.key, iter->second.val)); + clears++; + } else { + ASSERT(false); + } + iter++; + if (sets > 10000000 || clears > 10000000) { + TraceEvent(SevError, "FastRestoreApplierPhaseApplyStagingKeysBatchInfiniteLoop", applierID) + .detail("Begin", begin->first) + .detail("Sets", sets) + .detail("Clears", clears); + } + } + TraceEvent("FastRestoreApplierPhaseApplyStagingKeysBatchPrecommit", applierID) + .detail("Begin", begin->first) + .detail("Sets", sets) + .detail("Clears", clears); + wait(tr->commit()); + break; + } catch (Error& e) { + wait(tr->onError(e)); + } + } + return Void(); +} + +// Apply mutations in stagingKeys in batches in parallel +ACTOR static Future<Void> applyStagingKeys(Reference<ApplierBatchData> batchData, UID applierID, int64_t batchIndex, + Database cx) { + std::map<Key, StagingKey>::iterator begin = batchData->stagingKeys.begin(); + std::map<Key, StagingKey>::iterator cur = begin; + double txnSize = 0; + std::vector<Future<Void>> fBatches; + TraceEvent("FastRestoreApplerPhaseApplyStagingKeys", applierID) + .detail("BatchIndex", batchIndex) + .detail("StagingKeys", batchData->stagingKeys.size()); + while (cur != batchData->stagingKeys.end()) { + txnSize += cur->second.expectedMutationSize(); + if (txnSize > SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) { + fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID)); + begin = cur; + txnSize = 0; + } + cur++; + } + if (begin != batchData->stagingKeys.end()) { + fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID)); + } + + wait(waitForAll(fBatches)); + + TraceEvent("FastRestoreApplerPhaseApplyStagingKeysDone", applierID) + .detail("BatchIndex", batchIndex) + .detail("StagingKeys", batchData->stagingKeys.size()); + return Void(); +} + +// Write mutations to the destination DB +ACTOR Future<Void> writeMutationsToDB(UID applierID, int64_t batchIndex, Reference<ApplierBatchData> batchData, + Database cx) { + TraceEvent("FastRestoreApplerPhaseApplyTxn", applierID).detail("BatchIndex", batchIndex); + wait(precomputeMutationsResult(batchData, applierID, batchIndex, cx)); + + wait(applyStagingKeys(batchData, applierID, batchIndex, cx)); TraceEvent("FastRestoreApplerPhaseApplyTxnDone", applierID).detail("BatchIndex", batchIndex); return Void(); @@ -480,7 +440,7 @@ ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req, if (!batchData->dbApplier.present()) { isDuplicated = false; batchData->dbApplier = Never(); - batchData->dbApplier = applyToDB(self->id(), req.batchIndex, batchData, cx); + batchData->dbApplier = writeMutationsToDB(self->id(), req.batchIndex, batchData, cx); } ASSERT(batchData->dbApplier.present()); @@ -496,4 +456,35 @@ ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req, req.reply.send(RestoreCommonReply(self->id(), isDuplicated)); return Void(); -} \ No newline at end of file +} + +// Copy from WriteDuringRead.actor.cpp with small modifications +// Not all AtomicOps are handled in this function: SetVersionstampedKey, SetVersionstampedValue, and CompareAndClear +Value applyAtomicOp(Optional<StringRef> existingValue, Value value, MutationRef::Type type) { + Arena arena; + if (type == MutationRef::AddValue) + return doLittleEndianAdd(existingValue, value, arena); + else if (type == MutationRef::AppendIfFits) + return doAppendIfFits(existingValue, value, arena); + else if (type == MutationRef::And || type == MutationRef::AndV2) + return doAndV2(existingValue, value, arena); + else if (type == MutationRef::Or) + return doOr(existingValue, value, arena); + else if (type == MutationRef::Xor) + return doXor(existingValue, value, arena); + else if (type == MutationRef::Max) + return doMax(existingValue, value, arena); + else if (type == MutationRef::Min || type == MutationRef::MinV2) + return doMinV2(existingValue, value, arena); + else if (type == MutationRef::ByteMin) + return doByteMin(existingValue, value, arena); + else if (type == MutationRef::ByteMax) + return doByteMax(existingValue, value, arena); + else { + TraceEvent(SevError, "ApplyAtomicOpUnhandledType") + .detail("TypeCode", (int)type) + .detail("TypeName", typeString[type]); + ASSERT(false); + } + return Value(); +} diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index e06e772dca..6782186650 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -29,6 +29,7 @@ #include <sstream> #include "flow/Stats.h" +#include "fdbclient/Atomic.h" #include "fdbclient/FDBTypes.h" #include "fdbclient/CommitTransaction.h" #include "fdbrpc/fdbrpc.h" @@ -40,11 +41,174 @@ #include "flow/actorcompiler.h" // has to be last include +Value applyAtomicOp(Optional<StringRef> existingValue, Value value, MutationRef::Type type); + +// Key whose mutations are buffered on applier. +// key, value, type and version defines the parsed mutation at version. +// pendingMutations has all versioned mutations to be applied. +// Mutations in pendingMutations whose version is below the version in StagingKey can be ignored in applying phase. +struct StagingKey { + Key key; // TODO: Maybe not needed? + Value val; + MutationRef::Type type; // set or clear + Version version; // largest version of set or clear for the key + std::map<Version, MutationsVec> pendingMutations; // mutations not set or clear type + + explicit StagingKey() : version(0), type(MutationRef::MAX_ATOMIC_OP) {} + + // Add mutation m at newVersion to stagingKey + // Assume: SetVersionstampedKey and SetVersionstampedValue have been converted to set + void add(const MutationRef& m, Version newVersion) { + ASSERT(m.type != MutationRef::SetVersionstampedKey && m.type != MutationRef::SetVersionstampedValue); + if (version < newVersion) { + if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) { + key = m.param1; + val = m.param2; + type = (MutationRef::Type)m.type; + version = newVersion; + } else { + if (pendingMutations.find(newVersion) == pendingMutations.end()) { + pendingMutations.emplace(newVersion, MutationsVec()); + } + // TODO: Do we really need deep copy? + MutationsVec& mutations = pendingMutations[newVersion]; + mutations.push_back_deep(mutations.arena(), m); + } + } else if (version == newVersion) { // Sanity check + TraceEvent("FastRestoreApplierStagingKeyMutationAtSameVersion") + .detail("Version", newVersion) + .detail("NewMutation", m.toString()) + .detail("ExistingKeyType", typeString[type]); + if (m.type == MutationRef::SetValue) { + if (type == MutationRef::SetValue) { + if (m.param2 != val) { + TraceEvent(SevError, "FastRestoreApplierStagingKeyMutationAtSameVersionUnhandled") + .detail("Version", newVersion) + .detail("NewMutation", m.toString()) + .detail("ExistingKeyType", typeString[type]) + .detail("ExitingKeyValue", val) + .detail("Investigate", + "Why would backup have two sets with different value at same version"); + } // else {} Backup has duplicate set at the same version + } else { + TraceEvent(SevWarnAlways, "FastRestoreApplierStagingKeyMutationAtSameVersionOverride") + .detail("Version", newVersion) + .detail("NewMutation", m.toString()) + .detail("ExistingKeyType", typeString[type]) + .detail("ExitingKeyValue", val); + type = (MutationRef::Type)m.type; + val = m.param2; + } + } else if (m.type == MutationRef::ClearRange) { + TraceEvent(SevWarnAlways, "FastRestoreApplierStagingKeyMutationAtSameVersionSkipped") + .detail("Version", newVersion) + .detail("NewMutation", m.toString()) + .detail("ExistingKeyType", typeString[type]) + .detail("ExitingKeyValue", val); + } + } // else input mutation is old and can be ignored + } + + // Precompute the final value of the key. + void precomputeResult() { + TraceEvent(SevDebug, "FastRestoreApplierPrecomputeResult") + .detail("Key", key) + .detail("Version", version) + .detail("LargestPendingVersion", (pendingMutations.empty() ? -1 : pendingMutations.rbegin()->first)); + std::map<Version, MutationsVec>::iterator lb = pendingMutations.lower_bound(version); + if (lb == pendingMutations.end()) { + return; + } + if (lb->first == version) { + // Sanity check mutations at version are either atomicOps which can be ignored or the same value as buffered + for (int i = 0; i < lb->second.size(); i++) { + MutationRef m = lb->second[i]; + if (m.type == MutationRef::SetValue || m.type == MutationRef::ClearRange) { + if (std::tie(type, key, val) != std::tie(m.type, m.param1, m.param2)) { + TraceEvent(SevError, "FastRestoreApplierPrecomputeResultUnhandledSituation") + .detail("BufferedType", typeString[type]) + .detail("PendingType", typeString[m.type]) + .detail("BufferedVal", val.toString()) + .detail("PendingVal", m.param2.toString()); + } + } + } + } + while (lb != pendingMutations.end()) { + if (lb->first == version) { + lb++; + continue; + } + for (auto& mutation : lb->second) { + if (type == MutationRef::CompareAndClear) { // Special atomicOp + Arena arena; + Optional<ValueRef> retVal = doCompareAndClear(val, mutation.param2, arena); + if (!retVal.present()) { + val = key; + type = MutationRef::ClearRange; + } // else no-op + } else if (isAtomicOp((MutationRef::Type)mutation.type)) { + Optional<StringRef> inputVal; + if (hasBaseValue()) { + inputVal = val; + } + val = applyAtomicOp(inputVal, mutation.param2, (MutationRef::Type)mutation.type); + type = MutationRef::SetValue; // Precomputed result should be set to DB. + } else if (mutation.type == MutationRef::SetValue || mutation.type == MutationRef::ClearRange) { + type = MutationRef::SetValue; // Precomputed result should be set to DB. + TraceEvent(SevError, "FastRestoreApplierPrecomputeResultUnexpectedSet") + .detail("Type", typeString[mutation.type]) + .detail("Version", lb->first); + } else { + TraceEvent(SevWarnAlways, "FastRestoreApplierPrecomputeResultSkipUnexpectedBackupMutation") + .detail("Type", typeString[mutation.type]) + .detail("Version", lb->first); + } + } + version = lb->first; + lb++; + } + } + + // Does the key has at least 1 set or clear mutation to get the base value + bool hasBaseValue() { + if (version > 0) { + ASSERT(type == MutationRef::SetValue || type == MutationRef::ClearRange); + } + return version > 0; + } + + // Has all pendingMutations been pre-applied to the val? + bool hasPrecomputed() { + ASSERT(pendingMutations.empty() || pendingMutations.rbegin()->first >= pendingMutations.begin()->first); + return pendingMutations.empty() || version >= pendingMutations.rbegin()->first; + } + + int expectedMutationSize() { return key.size() + val.size(); } +}; + +// The range mutation received on applier. +// Range mutations should be applied both to the destination DB and to the StagingKeys +struct StagingKeyRange { + Standalone<MutationRef> mutation; + Version version; + + explicit StagingKeyRange(MutationRef m, Version newVersion) : mutation(m), version(newVersion) {} + + bool operator<(const StagingKeyRange& rhs) const { + return std::tie(version, mutation.type, mutation.param1, mutation.param2) < + std::tie(rhs.version, rhs.mutation.type, rhs.mutation.param1, rhs.mutation.param2); + } +}; + struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> { // processedFileState: key: RestoreAsset; value: largest version of mutation received on the applier std::map<RestoreAsset, NotifiedVersion> processedFileState; Optional<Future<Void>> dbApplier; VersionedMutationsMap kvOps; // Mutations at each version + std::map<Key, StagingKey> stagingKeys; + std::set<StagingKeyRange> stagingKeyRanges; + FlowLock applyStagingKeysBatchLock; Future<Void> pollMetrics; @@ -66,7 +230,8 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> { void addref() { return ReferenceCounted<ApplierBatchData>::addref(); } void delref() { return ReferenceCounted<ApplierBatchData>::delref(); } - explicit ApplierBatchData(UID nodeID, int batchIndex) : counters(this, nodeID, batchIndex) { + explicit ApplierBatchData(UID nodeID, int batchIndex) + : counters(this, nodeID, batchIndex), applyStagingKeysBatchLock(SERVER_KNOBS->FASTRESTORE_APPLYING_PARALLELISM) { pollMetrics = traceCounters("FastRestoreApplierMetrics", nodeID, SERVER_KNOBS->FASTRESTORE_ROLE_LOGGING_DELAY, &counters.cc, nodeID.toString() + "/RestoreApplierMetrics/" + std::to_string(batchIndex)); @@ -74,6 +239,50 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> { } ~ApplierBatchData() = default; + void addMutation(MutationRef m, Version ver) { + if (!isRangeMutation(m)) { + auto item = stagingKeys.emplace(m.param1, StagingKey()); + item.first->second.add(m, ver); + } else { + stagingKeyRanges.insert(StagingKeyRange(m, ver)); + } + } + + void addVersionStampedKV(MutationRef m, Version ver, uint16_t numVersionStampedKV) { + if (m.type == MutationRef::SetVersionstampedKey) { + // Assume transactionNumber = 0 does not affect result + TraceEvent(SevDebug, "FastRestoreApplierAddMutation") + .detail("MutationType", typeString[m.type]) + .detail("FakedTransactionNumber", numVersionStampedKV); + transformVersionstampMutation(m, &MutationRef::param1, ver, numVersionStampedKV); + addMutation(m, ver); + } else if (m.type == MutationRef::SetVersionstampedValue) { + // Assume transactionNumber = 0 does not affect result + TraceEvent(SevDebug, "FastRestoreApplierAddMutation") + .detail("MutationType", typeString[m.type]) + .detail("FakedTransactionNumber", numVersionStampedKV); + transformVersionstampMutation(m, &MutationRef::param2, ver, numVersionStampedKV); + addMutation(m, ver); + } else { + ASSERT(false); + } + } + + // Return true if all staging keys have been precomputed + bool allKeysPrecomputed() { + for (auto& stagingKey : stagingKeys) { + if (!stagingKey.second.hasPrecomputed()) { + TraceEvent("FastRestoreApplierAllKeysPrecomputedFalse") + .detail("Key", stagingKey.first) + .detail("BufferedVersion", stagingKey.second.version) + .detail("MaxPendingVersion", stagingKey.second.pendingMutations.rbegin()->first); + return false; + } + } + TraceEvent("FastRestoreApplierAllKeysPrecomputed"); + return true; + } + void reset() { kvOps.clear(); dbApplier = Optional<Future<Void>>(); diff --git a/fdbserver/RestoreCommon.actor.cpp b/fdbserver/RestoreCommon.actor.cpp index d8689d136f..0c336538da 100644 --- a/fdbserver/RestoreCommon.actor.cpp +++ b/fdbserver/RestoreCommon.actor.cpp @@ -292,8 +292,6 @@ std::string RestoreConfigFR::toString() { return ss.str(); } -//typedef RestoreConfigFR::RestoreFile RestoreFileFR; - // parallelFileRestore is copied from FileBackupAgent.actor.cpp for the same reason as RestoreConfigFR is copied // The implementation of parallelFileRestore is copied from FileBackupAgent.actor.cpp // parallelFileRestore is copied from FileBackupAgent.actor.cpp for the same reason as RestoreConfigFR is copied diff --git a/fdbserver/RestoreLoader.actor.cpp b/fdbserver/RestoreLoader.actor.cpp index 8eb9bc2e4b..c0833fc373 100644 --- a/fdbserver/RestoreLoader.actor.cpp +++ b/fdbserver/RestoreLoader.actor.cpp @@ -264,7 +264,7 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ .detail("BatchIndex", req.batchIndex) .detail("UseRangeFile", req.useRangeFile); wait(batchStatus->sendAllRanges.get()); - } else if (req.useRangeFile) { + } else { TraceEvent(SevDebug, "FastRestoreSendMutationsSkipDuplicateRangeRequest", self->id()) .detail("BatchIndex", req.batchIndex) .detail("UseRangeFile", req.useRangeFile); diff --git a/fdbserver/RestoreMaster.actor.cpp b/fdbserver/RestoreMaster.actor.cpp index eb39883b63..eccbc6bc21 100644 --- a/fdbserver/RestoreMaster.actor.cpp +++ b/fdbserver/RestoreMaster.actor.cpp @@ -746,7 +746,7 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<MasterBatchDat int batchIndex, NotifiedVersion* finishedBatch) { wait(finishedBatch->whenAtLeast(batchIndex - 1)); - TraceEvent("FastRestoreMasterPhaseApplyToDBStart") + TraceEvent("FastRestoreMasterPhaseApplyToDB") .detail("BatchIndex", batchIndex) .detail("FinishedBatch", finishedBatch->get()); @@ -754,7 +754,7 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<MasterBatchDat // Prepare the applyToDB requests std::vector<std::pair<UID, RestoreVersionBatchRequest>> requests; - TraceEvent("FastRestoreMasterPhaseApplyMutations") + TraceEvent("FastRestoreMasterPhaseApplyToDB") .detail("BatchIndex", batchIndex) .detail("Appliers", appliersInterf.size()); for (auto& applier : appliersInterf) { @@ -770,7 +770,7 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<MasterBatchDat batchData->applyToDB = getBatchReplies(&RestoreApplierInterface::applyToDB, appliersInterf, requests, &replies, TaskPriority::RestoreApplierWriteDB); } else { - TraceEvent(SevError, "FastRestoreNotifyApplierToApplierMutations") + TraceEvent(SevError, "FastRestoreMasterPhaseApplyToDB") .detail("BatchIndex", batchIndex) .detail("Attention", "Actor should not be invoked twice for the same batch index"); } @@ -783,7 +783,7 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<MasterBatchDat if (batchStatus->applyStatus[reply.id] == RestoreApplyStatus::Applying) { batchStatus->applyStatus[reply.id] = RestoreApplyStatus::Applied; if (reply.isDuplicated) { - TraceEvent(SevWarn, "FastRestoreNotifyApplierToApplierMutations") + TraceEvent(SevWarn, "FastRestoreMasterPhaseApplyToDB") .detail("Applier", reply.id) .detail("DuplicateRequestReturnEarlier", "Apply db request should have been processed"); } @@ -791,7 +791,7 @@ ACTOR static Future<Void> notifyApplierToApplyMutations(Reference<MasterBatchDat } for (auto& applier : appliersInterf) { if (batchStatus->applyStatus[applier.first] != RestoreApplyStatus::Applied) { - TraceEvent(SevError, "FastRestoreNotifyApplierToApplierMutations") + TraceEvent(SevError, "FastRestoreMasterPhaseApplyToDB") .detail("Applier", applier.first) .detail("ApplyStatus", batchStatus->applyStatus[applier.first]); } diff --git a/fdbserver/workloads/RandomSelector.actor.cpp b/fdbserver/workloads/RandomSelector.actor.cpp index 8e27a0fe55..8e0286fa53 100644 --- a/fdbserver/workloads/RandomSelector.actor.cpp +++ b/fdbserver/workloads/RandomSelector.actor.cpp @@ -288,7 +288,7 @@ struct RandomSelectorWorkload : TestWorkload { myValue = format("%d", deterministicRandom()->randomInt( 0, 10000000 ) ); //TraceEvent("RYOWor").detail("Key",myKeyA).detail("Value", myValue); trRYOW.atomicOp(StringRef(clientID + "b/" + myKeyA), myValue, MutationRef::Or); - + loop { try { tr.set(StringRef(clientID + "z/" + myRandomIDKey), StringRef()); @@ -311,7 +311,7 @@ struct RandomSelectorWorkload : TestWorkload { myValue = format("%d", deterministicRandom()->randomInt( 0, 10000000 ) ); //TraceEvent("RYOWxor").detail("Key",myKeyA).detail("Value", myValue); trRYOW.atomicOp(StringRef(clientID + "b/" + myKeyA), myValue, MutationRef::Xor); - + loop { try { tr.set(StringRef(clientID + "z/" + myRandomIDKey), StringRef());