FastRestore:applyToDB:Add functions to DBApplyProgress for encapsulation

This commit is contained in:
Meng Xu 2019-10-14 16:18:54 -07:00
parent f89b5586df
commit 0c8de91932
1 changed files with 31 additions and 16 deletions

View File

@ -204,6 +204,15 @@ struct DBApplyProgress {
// Rollback to the starting point of the uncommitted-and-failed transaction to
// re-execute uncommitted txn
void rollback() {
TraceEvent(SevWarn, "FastRestore_ApplyTxnError")
.detail("TxnStatusFailed", curTxnId)
.detail("ApplierApplyToDB", self->id())
.detail("UncommittedTxnId", uncommittedTxnId)
.detail("CurIteratorVersion", curItInCurTxn->first)
.detail("StartIteratorVersionInUncommittedTxn", startItInUncommittedTxn->first)
.detail("CurrentIndexInFailedTxn", curIndexInCurTxn)
.detail("StartIndexInUncommittedTxn", startIndexInUncommittedTxn)
.detail("NumIncludedAtomicOps", numAtomicOps);
curItInCurTxn = startItInUncommittedTxn;
curIndexInCurTxn = startIndexInUncommittedTxn;
curTxnId = uncommittedTxnId;
@ -219,6 +228,24 @@ struct DBApplyProgress {
// mutations in a txn
return (!lastTxnHasError && (startNextVersion || transactionSize > 0 || curItInCurTxn == self->kvOps.end()));
}
bool hasError() { return lastTxnHasError; }
void setTxnError(Error& e) {
TraceEvent(SevWarnAlways, "FastRestore_ApplyTxnError")
.detail("TxnStatus", "?")
.detail("ApplierApplyToDB", self->id())
.detail("TxnId", curTxnId)
.detail("StartIndexInCurrentTxn", curIndexInCurTxn)
.detail("Version", curItInCurTxn->first)
.error(e, true);
lastTxnHasError = true;
}
MutationRef getCurrentMutation() {
ASSERT_WE_THINK(curIndexInCurTxn < curItInCurTxn->second.size());
return curItInCurTxn->second[curIndexInCurTxn];
}
};
ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
@ -253,21 +280,12 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
loop { // Transaction retry loop
try {
// Check if the transaction succeeds
if (progress.lastTxnHasError) {
if (progress.hasError()) {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> txnSucceeded = wait(tr->get(restoreApplierKeyFor(self->id(), progress.curTxnId)));
if (!txnSucceeded.present()) {
TraceEvent(SevWarn, "FastRestore_ApplyTxnError")
.detail("TxnStatusFailed", progress.curTxnId)
.detail("ApplierApplyToDB", self->id())
.detail("UncommittedTxnId", progress.uncommittedTxnId)
.detail("CurIteratorVersion", progress.curItInCurTxn->first)
.detail("StartIteratorVersionInUncommittedTxn", progress.startItInUncommittedTxn->first)
.detail("CurrentIndexInFailedTxn", progress.curIndexInCurTxn)
.detail("StartIndexInUncommittedTxn", progress.startIndexInUncommittedTxn)
.detail("NumIncludedAtomicOps", progress.numAtomicOps);
progress.rollback();
continue;
} else {
@ -278,7 +296,6 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
.detail("CurrentIteratorMutations", progress.curItInCurTxn->second.size())
.detail("CurrentIndexInSucceedTxn", progress.curIndexInCurTxn)
.detail("NumIncludedAtomicOps", progress.numAtomicOps);
// Txn succeeded and exectue the same logic when txn succeeds
}
} else { // !lastTxnHasError: accumulate mutations in a txn
@ -296,10 +313,8 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
tr->set(restoreApplierKeyFor(self->id(), progress.curTxnId), restoreApplierTxnValue);
while (1) { // Loop: Accumulate mutations in a transaction
state MutationRef m;
ASSERT_WE_THINK(progress.curIndexInCurTxn < progress.curItInCurTxn->second.size());
state MutationRef m = progress.getCurrentMutation();
m = progress.curItInCurTxn->second[progress.curIndexInCurTxn];
if (m.type >= MutationRef::Type::SetValue && m.type <= MutationRef::Type::MAX_ATOMIC_OP) {
typeStr = typeString[m.type];
} else {
@ -328,7 +343,7 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
} else {
progress.nextMutation();
// Mutations in the same transaction come from the same version
if (progress.startNextVersion || progress.curItInCurTxn == self->kvOps.end()) {
if (progress.startNextVersion || progress.isDone()) {
break;
}
}
@ -341,7 +356,7 @@ ACTOR Future<Void> applyToDB(Reference<RestoreApplierData> self, Database cx) {
}
// Logic for a successful transaction: Update current txn info and uncommitted txn info
progress.nextMutation();
if (progress.curItInCurTxn == self->kvOps.end()) { // Are all mutations processed?
if (progress.isDone()) { // Are all mutations processed?
break;
}
progress.nextTxn();