From 0c8de919325ac97e5ecfa9d51bf6723a7dd4e21c Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Mon, 14 Oct 2019 16:18:54 -0700 Subject: [PATCH] FastRestore:applyToDB:Add functions to DBApplyProgress for encapsulation --- fdbserver/RestoreApplier.actor.cpp | 47 ++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index a4976d1be6..123a8a236b 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -204,6 +204,15 @@ struct DBApplyProgress { // Rollback to the starting point of the uncommitted-and-failed transaction to // re-execute uncommitted txn void rollback() { + TraceEvent(SevWarn, "FastRestore_ApplyTxnError") + .detail("TxnStatusFailed", curTxnId) + .detail("ApplierApplyToDB", self->id()) + .detail("UncommittedTxnId", uncommittedTxnId) + .detail("CurIteratorVersion", curItInCurTxn->first) + .detail("StartIteratorVersionInUncommittedTxn", startItInUncommittedTxn->first) + .detail("CurrentIndexInFailedTxn", curIndexInCurTxn) + .detail("StartIndexInUncommittedTxn", startIndexInUncommittedTxn) + .detail("NumIncludedAtomicOps", numAtomicOps); curItInCurTxn = startItInUncommittedTxn; curIndexInCurTxn = startIndexInUncommittedTxn; curTxnId = uncommittedTxnId; @@ -219,6 +228,24 @@ struct DBApplyProgress { // mutations in a txn return (!lastTxnHasError && (startNextVersion || transactionSize > 0 || curItInCurTxn == self->kvOps.end())); } + + bool hasError() { return lastTxnHasError; } + + void setTxnError(Error& e) { + TraceEvent(SevWarnAlways, "FastRestore_ApplyTxnError") + .detail("TxnStatus", "?") + .detail("ApplierApplyToDB", self->id()) + .detail("TxnId", curTxnId) + .detail("StartIndexInCurrentTxn", curIndexInCurTxn) + .detail("Version", curItInCurTxn->first) + .error(e, true); + lastTxnHasError = true; + } + + MutationRef getCurrentMutation() { + ASSERT_WE_THINK(curIndexInCurTxn < curItInCurTxn->second.size()); + return curItInCurTxn->second[curIndexInCurTxn]; + } }; ACTOR Future applyToDB(Reference self, Database cx) { @@ -253,21 +280,12 @@ ACTOR Future applyToDB(Reference self, Database cx) { loop { // Transaction retry loop try { // Check if the transaction succeeds - if (progress.lastTxnHasError) { + if (progress.hasError()) { tr->reset(); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::LOCK_AWARE); Optional txnSucceeded = wait(tr->get(restoreApplierKeyFor(self->id(), progress.curTxnId))); if (!txnSucceeded.present()) { - TraceEvent(SevWarn, "FastRestore_ApplyTxnError") - .detail("TxnStatusFailed", progress.curTxnId) - .detail("ApplierApplyToDB", self->id()) - .detail("UncommittedTxnId", progress.uncommittedTxnId) - .detail("CurIteratorVersion", progress.curItInCurTxn->first) - .detail("StartIteratorVersionInUncommittedTxn", progress.startItInUncommittedTxn->first) - .detail("CurrentIndexInFailedTxn", progress.curIndexInCurTxn) - .detail("StartIndexInUncommittedTxn", progress.startIndexInUncommittedTxn) - .detail("NumIncludedAtomicOps", progress.numAtomicOps); progress.rollback(); continue; } else { @@ -278,7 +296,6 @@ ACTOR Future applyToDB(Reference self, Database cx) { .detail("CurrentIteratorMutations", progress.curItInCurTxn->second.size()) .detail("CurrentIndexInSucceedTxn", progress.curIndexInCurTxn) .detail("NumIncludedAtomicOps", progress.numAtomicOps); - // Txn succeeded and exectue the same logic when txn succeeds } } else { // !lastTxnHasError: accumulate mutations in a txn @@ -296,10 +313,8 @@ ACTOR Future applyToDB(Reference self, Database cx) { tr->set(restoreApplierKeyFor(self->id(), progress.curTxnId), restoreApplierTxnValue); while (1) { // Loop: Accumulate mutations in a transaction - state MutationRef m; - ASSERT_WE_THINK(progress.curIndexInCurTxn < progress.curItInCurTxn->second.size()); + state MutationRef m = progress.getCurrentMutation(); - m = progress.curItInCurTxn->second[progress.curIndexInCurTxn]; if (m.type >= MutationRef::Type::SetValue && m.type <= MutationRef::Type::MAX_ATOMIC_OP) { typeStr = typeString[m.type]; } else { @@ -328,7 +343,7 @@ ACTOR Future applyToDB(Reference self, Database cx) { } else { progress.nextMutation(); // Mutations in the same transaction come from the same version - if (progress.startNextVersion || progress.curItInCurTxn == self->kvOps.end()) { + if (progress.startNextVersion || progress.isDone()) { break; } } @@ -341,7 +356,7 @@ ACTOR Future applyToDB(Reference self, Database cx) { } // Logic for a successful transaction: Update current txn info and uncommitted txn info progress.nextMutation(); - if (progress.curItInCurTxn == self->kvOps.end()) { // Are all mutations processed? + if (progress.isDone()) { // Are all mutations processed? break; } progress.nextTxn();