diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index b8e730d01d..75ee2863c4 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -543,6 +543,7 @@ public: int64_t TIME_KEEPER_MAX_ENTRIES; // Fast Restore + // TODO: After 6.3, review FR knobs, remove unneeded ones and change default value int64_t FASTRESTORE_FAILURE_TIMEOUT; int64_t FASTRESTORE_HEARTBEAT_INTERVAL; double FASTRESTORE_SAMPLING_PERCENT; diff --git a/fdbserver/RestoreApplier.actor.cpp b/fdbserver/RestoreApplier.actor.cpp index fcf6c73322..c5d45cc3cd 100644 --- a/fdbserver/RestoreApplier.actor.cpp +++ b/fdbserver/RestoreApplier.actor.cpp @@ -489,8 +489,7 @@ ACTOR static Future shouldReleaseTransaction(double* targetMB, double* app // Apply mutations in batchData->stagingKeys [begin, end). ACTOR static Future applyStagingKeysBatch(std::map::iterator begin, - std::map::iterator end, Database cx, - FlowLock* applyStagingKeysBatchLock, UID applierID, + std::map::iterator end, Database cx, UID applierID, ApplierBatchData::Counters* cc, double* appliedBytes, double* applyingDataBytes, double* targetMB, AsyncTrigger* releaseTxnTrigger) { @@ -500,9 +499,7 @@ ACTOR static Future applyStagingKeysBatch(std::map::itera return Void(); } wait(shouldReleaseTransaction(targetMB, applyingDataBytes, releaseTxnTrigger)); - // TODO: Remove applyStagingKeysBatchLock everywhere - // wait(applyStagingKeysBatchLock->take(TaskPriority::RestoreApplierWriteDB)); // Q: Do we really need the lock? - // state FlowLock::Releaser releaser(*applyStagingKeysBatchLock); + state Reference tr(new ReadYourWritesTransaction(cx)); state int sets = 0; state int clears = 0; @@ -589,10 +586,9 @@ ACTOR static Future applyStagingKeys(Reference batchData while (cur != batchData->stagingKeys.end()) { txnSize += cur->second.totalSize(); // should be consistent with receivedBytes accounting method if (txnSize > SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) { - fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID, - &batchData->counters, &batchData->appliedBytes, - &batchData->applyingDataBytes, &batchData->targetWriteRateMB, - &batchData->releaseTxnTrigger)); + fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, applierID, &batchData->counters, + &batchData->appliedBytes, &batchData->applyingDataBytes, + &batchData->targetWriteRateMB, &batchData->releaseTxnTrigger)); batchData->totalBytesToWrite += txnSize; begin = cur; txnSize = 0; @@ -601,10 +597,9 @@ ACTOR static Future applyStagingKeys(Reference batchData cur++; } if (begin != batchData->stagingKeys.end()) { - fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID, - &batchData->counters, &batchData->appliedBytes, - &batchData->applyingDataBytes, &batchData->targetWriteRateMB, - &batchData->releaseTxnTrigger)); + fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, applierID, &batchData->counters, + &batchData->appliedBytes, &batchData->applyingDataBytes, + &batchData->targetWriteRateMB, &batchData->releaseTxnTrigger)); batchData->totalBytesToWrite += txnSize; txnBatches++; } diff --git a/fdbserver/RestoreApplier.actor.h b/fdbserver/RestoreApplier.actor.h index 1ccf920f13..d9966b301b 100644 --- a/fdbserver/RestoreApplier.actor.h +++ b/fdbserver/RestoreApplier.actor.h @@ -247,7 +247,6 @@ struct ApplierBatchData : public ReferenceCounted { VersionedMutationsMap kvOps; // Mutations at each version std::map stagingKeys; std::set stagingKeyRanges; - FlowLock applyStagingKeysBatchLock; Future pollMetrics; @@ -288,7 +287,7 @@ struct ApplierBatchData : public ReferenceCounted { void delref() { return ReferenceCounted::delref(); } explicit ApplierBatchData(UID nodeID, int batchIndex) - : counters(this, nodeID, batchIndex), applyStagingKeysBatchLock(SERVER_KNOBS->FASTRESTORE_APPLYING_PARALLELISM), + : counters(this, nodeID, batchIndex), targetWriteRateMB(SERVER_KNOBS->FASTRESTORE_WRITE_BW_MB / SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS), totalBytesToWrite(-1), applyingDataBytes(0), vbState(ApplierVersionBatchState::NOT_INIT), receiveMutationReqs(0), receivedBytes(0), appliedBytes(0) {