From ecd2d8b23973cd0ad90f767cbcb4389254a7270c Mon Sep 17 00:00:00 2001 From: Meng Xu Date: Sat, 27 Jun 2020 00:20:54 -0700 Subject: [PATCH] FastRestore:Add counters for applier and disable FlowLock on applyStagingKeysBatch --- fdbserver/RestoreApplier.actor.cpp | 35 +++++++++++++++++---------- fdbserver/RestoreApplier.actor.h | 9 ++++--- fdbserver/RestoreRoleCommon.actor.cpp | 1 + 3 files changed, 29 insertions(+), 16 deletions(-) diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index c2c9f9e04b..079adea3d8 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -214,13 +214,13 @@ ACTOR static Future applyClearRangeMutations(StandalonestagingKeys ACTOR static Future getAndComputeStagingKeys( std::map::iterator> incompleteStagingKeys, double delayTime, Database cx, - UID applierID, int batchIndex) { + UID applierID, int batchIndex, ApplierBatchData::Counters* cc) { state Reference tr(new ReadYourWritesTransaction(cx)); state std::vector>> fValues(incompleteStagingKeys.size(), Never()); state int retries = 0; state UID randomID = deterministicRandom()->randomUniqueID(); - wait(delay(delayTime + deterministicRandom()->random01() * delayTime)); + wait(delay(deterministicRandom()->random01() * delayTime)); TraceEvent("FastRestoreApplierGetAndComputeStagingKeysStart", applierID) .detail("RandomUID", randomID) .detail("BatchIndex", batchIndex) @@ -234,10 +234,13 @@ ACTOR static Future getAndComputeStagingKeys( tr->setOption(FDBTransactionOptions::LOCK_AWARE); for (auto& key : incompleteStagingKeys) { fValues[i++] = tr->get(key.first); + cc->fetchKeys += 1; } wait(waitForAll(fValues)); + cc->fetchTxns += 1; break; } catch (Error& e) { + cc->fetchTxnRetries += 1; if (retries++ > incompleteStagingKeys.size()) { TraceEvent(SevWarnAlways, "GetAndComputeStagingKeys", applierID) .suppressFor(1.0) @@ -354,24 +357,23 @@ ACTOR static Future precomputeMutationsResult(Reference std::map::iterator> incompleteStagingKeys; std::map::iterator stagingKeyIter = batchData->stagingKeys.begin(); int numKeysInBatch = 0; - double delayTime = 0; // Start transactions at different time to avoid overwelming FDB. + double delayTime = 0; // Start transactions at different time to avoid overwhelming FDB. for (; stagingKeyIter != batchData->stagingKeys.end(); stagingKeyIter++) { if (!stagingKeyIter->second.hasBaseValue()) { incompleteStagingKeys.emplace(stagingKeyIter->first, stagingKeyIter); - batchData->counters.fetchKeys += 1; numKeysInBatch++; } if (numKeysInBatch == SERVER_KNOBS->FASTRESTORE_APPLIER_FETCH_KEYS_SIZE) { fGetAndComputeKeys.push_back( - getAndComputeStagingKeys(incompleteStagingKeys, delayTime, cx, applierID, batchIndex)); - delayTime += 0.1; + getAndComputeStagingKeys(incompleteStagingKeys, 0.1, cx, applierID, batchIndex, &batchData->counters)); + delayTime += 0.1; // TODO: Delete this because we may not need this numKeysInBatch = 0; incompleteStagingKeys.clear(); } } if (numKeysInBatch > 0) { - fGetAndComputeKeys.push_back( - getAndComputeStagingKeys(incompleteStagingKeys, delayTime, cx, applierID, batchIndex)); + fGetAndComputeKeys.push_back(getAndComputeStagingKeys(incompleteStagingKeys, delayTime, cx, applierID, + batchIndex, &batchData->counters)); } TraceEvent("FastRestoreApplerPhasePrecomputeMutationsResult", applierID) @@ -400,9 +402,10 @@ ACTOR static Future precomputeMutationsResult(Reference // Apply mutations in batchData->stagingKeys [begin, end). ACTOR static Future applyStagingKeysBatch(std::map::iterator begin, std::map::iterator end, Database cx, - FlowLock* applyStagingKeysBatchLock, UID applierID) { - wait(applyStagingKeysBatchLock->take(TaskPriority::RestoreApplierWriteDB)); - state FlowLock::Releaser releaser(*applyStagingKeysBatchLock); + FlowLock* applyStagingKeysBatchLock, UID applierID, + ApplierBatchData::Counters* cc) { + // wait(applyStagingKeysBatchLock->take(TaskPriority::RestoreApplierWriteDB)); // Q: Do we really need the lock? + // state FlowLock::Releaser releaser(*applyStagingKeysBatchLock); state Reference tr(new ReadYourWritesTransaction(cx)); state int sets = 0; state int clears = 0; @@ -416,6 +419,7 @@ ACTOR static Future applyStagingKeysBatch(std::map::itera while (iter != end) { if (iter->second.type == MutationRef::SetValue) { tr->set(iter->second.key, iter->second.val); + cc->appliedMutations += 1; TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseApplyStagingKeysBatch", applierID) .detail("SetKey", iter->second.key); sets++; @@ -428,6 +432,7 @@ ACTOR static Future applyStagingKeysBatch(std::map::itera .detail("SubVersion", iter->second.version.sub); } tr->clear(singleKeyRange(iter->second.key)); + cc->appliedMutations += 1; TraceEvent(SevFRMutationInfo, "FastRestoreApplierPhaseApplyStagingKeysBatch", applierID) .detail("ClearKey", iter->second.key); clears++; @@ -449,8 +454,10 @@ ACTOR static Future applyStagingKeysBatch(std::map::itera .detail("Sets", sets) .detail("Clears", clears); wait(tr->commit()); + cc->appliedTxns += 1; break; } catch (Error& e) { + cc->appliedTxnRetries += 1; wait(tr->onError(e)); } } @@ -470,14 +477,16 @@ ACTOR static Future applyStagingKeys(Reference batchData while (cur != batchData->stagingKeys.end()) { txnSize += cur->second.expectedMutationSize(); if (txnSize > SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) { - fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID)); + fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID, + &batchData->counters)); begin = cur; txnSize = 0; } cur++; } if (begin != batchData->stagingKeys.end()) { - fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID)); + fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID, + &batchData->counters)); } wait(waitForAll(fBatches)); diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index 92c8c1f35f..9a911e3b81 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -258,15 +258,18 @@ struct ApplierBatchData : public ReferenceCounted { CounterCollection cc; Counter receivedBytes, receivedWeightedBytes, receivedMutations, receivedAtomicOps; Counter appliedWeightedBytes, appliedMutations, appliedAtomicOps; - Counter appliedTxns; - Counter fetchKeys; // number of keys to fetch from dest. FDB cluster. + Counter appliedTxns, appliedTxnRetries; + Counter fetchKeys, fetchTxns, fetchTxnRetries; // number of keys to fetch from dest. FDB cluster. + // TODO: Add the counter in applying phase Counters(ApplierBatchData* self, UID applierInterfID, int batchIndex) : cc("ApplierBatch", applierInterfID.toString() + ":" + std::to_string(batchIndex)), receivedBytes("ReceivedBytes", cc), receivedMutations("ReceivedMutations", cc), receivedAtomicOps("ReceivedAtomicOps", cc), receivedWeightedBytes("ReceivedWeightedMutations", cc), appliedWeightedBytes("AppliedWeightedBytes", cc), appliedMutations("AppliedMutations", cc), - appliedAtomicOps("AppliedAtomicOps", cc), appliedTxns("AppliedTxns", cc), fetchKeys("FetchKeys", cc) {} + appliedAtomicOps("AppliedAtomicOps", cc), appliedTxns("AppliedTxns", cc), + appliedTxnRetries("AppliedTxnRetries", cc), fetchKeys("FetchKeys", cc), fetchTxns("FetchTxns", cc), + fetchTxnRetries("FetchTxnRetries", cc) {} } counters; void addref() { return ReferenceCounted::addref(); } diff --git a/fdbserver/RestoreRoleCommon.actor.cpp b/fdbserver/RestoreRoleCommon.actor.cpp index ac113924e6..07fd1d9fdf 100644 --- a/fdbserver/RestoreRoleCommon.actor.cpp +++ b/fdbserver/RestoreRoleCommon.actor.cpp @@ -139,6 +139,7 @@ ACTOR Future isSchedulable(Reference self, int actorBatch ACTOR Future traceProcessMetrics(Reference self, std::string role) { loop { + // TODO: Add node ID and batchIndex TraceEvent("FastRestoreTraceProcessMetrics") .detail("Role", role) .detail("Node", self->nodeID)