FR:Remove applyStagingKeysBatchLock flow lock
This commit is contained in:
parent
6da7e56e03
commit
a4aad591fd
|
@ -543,6 +543,7 @@ public:
|
|||
int64_t TIME_KEEPER_MAX_ENTRIES;
|
||||
|
||||
// Fast Restore
|
||||
// TODO: After 6.3, review FR knobs, remove unneeded ones and change default value
|
||||
int64_t FASTRESTORE_FAILURE_TIMEOUT;
|
||||
int64_t FASTRESTORE_HEARTBEAT_INTERVAL;
|
||||
double FASTRESTORE_SAMPLING_PERCENT;
|
||||
|
|
|
@ -489,8 +489,7 @@ ACTOR static Future<Void> shouldReleaseTransaction(double* targetMB, double* app
|
|||
|
||||
// Apply mutations in batchData->stagingKeys [begin, end).
|
||||
ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::iterator begin,
|
||||
std::map<Key, StagingKey>::iterator end, Database cx,
|
||||
FlowLock* applyStagingKeysBatchLock, UID applierID,
|
||||
std::map<Key, StagingKey>::iterator end, Database cx, UID applierID,
|
||||
ApplierBatchData::Counters* cc, double* appliedBytes,
|
||||
double* applyingDataBytes, double* targetMB,
|
||||
AsyncTrigger* releaseTxnTrigger) {
|
||||
|
@ -500,9 +499,7 @@ ACTOR static Future<Void> applyStagingKeysBatch(std::map<Key, StagingKey>::itera
|
|||
return Void();
|
||||
}
|
||||
wait(shouldReleaseTransaction(targetMB, applyingDataBytes, releaseTxnTrigger));
|
||||
// TODO: Remove applyStagingKeysBatchLock everywhere
|
||||
// wait(applyStagingKeysBatchLock->take(TaskPriority::RestoreApplierWriteDB)); // Q: Do we really need the lock?
|
||||
// state FlowLock::Releaser releaser(*applyStagingKeysBatchLock);
|
||||
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
state int sets = 0;
|
||||
state int clears = 0;
|
||||
|
@ -589,10 +586,9 @@ ACTOR static Future<Void> applyStagingKeys(Reference<ApplierBatchData> batchData
|
|||
while (cur != batchData->stagingKeys.end()) {
|
||||
txnSize += cur->second.totalSize(); // should be consistent with receivedBytes accounting method
|
||||
if (txnSize > SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) {
|
||||
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID,
|
||||
&batchData->counters, &batchData->appliedBytes,
|
||||
&batchData->applyingDataBytes, &batchData->targetWriteRateMB,
|
||||
&batchData->releaseTxnTrigger));
|
||||
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, applierID, &batchData->counters,
|
||||
&batchData->appliedBytes, &batchData->applyingDataBytes,
|
||||
&batchData->targetWriteRateMB, &batchData->releaseTxnTrigger));
|
||||
batchData->totalBytesToWrite += txnSize;
|
||||
begin = cur;
|
||||
txnSize = 0;
|
||||
|
@ -601,10 +597,9 @@ ACTOR static Future<Void> applyStagingKeys(Reference<ApplierBatchData> batchData
|
|||
cur++;
|
||||
}
|
||||
if (begin != batchData->stagingKeys.end()) {
|
||||
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, &batchData->applyStagingKeysBatchLock, applierID,
|
||||
&batchData->counters, &batchData->appliedBytes,
|
||||
&batchData->applyingDataBytes, &batchData->targetWriteRateMB,
|
||||
&batchData->releaseTxnTrigger));
|
||||
fBatches.push_back(applyStagingKeysBatch(begin, cur, cx, applierID, &batchData->counters,
|
||||
&batchData->appliedBytes, &batchData->applyingDataBytes,
|
||||
&batchData->targetWriteRateMB, &batchData->releaseTxnTrigger));
|
||||
batchData->totalBytesToWrite += txnSize;
|
||||
txnBatches++;
|
||||
}
|
||||
|
|
|
@ -247,7 +247,6 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
VersionedMutationsMap kvOps; // Mutations at each version
|
||||
std::map<Key, StagingKey> stagingKeys;
|
||||
std::set<StagingKeyRange> stagingKeyRanges;
|
||||
FlowLock applyStagingKeysBatchLock;
|
||||
|
||||
Future<Void> pollMetrics;
|
||||
|
||||
|
@ -288,7 +287,7 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
void delref() { return ReferenceCounted<ApplierBatchData>::delref(); }
|
||||
|
||||
explicit ApplierBatchData(UID nodeID, int batchIndex)
|
||||
: counters(this, nodeID, batchIndex), applyStagingKeysBatchLock(SERVER_KNOBS->FASTRESTORE_APPLYING_PARALLELISM),
|
||||
: counters(this, nodeID, batchIndex),
|
||||
targetWriteRateMB(SERVER_KNOBS->FASTRESTORE_WRITE_BW_MB / SERVER_KNOBS->FASTRESTORE_NUM_APPLIERS),
|
||||
totalBytesToWrite(-1), applyingDataBytes(0), vbState(ApplierVersionBatchState::NOT_INIT),
|
||||
receiveMutationReqs(0), receivedBytes(0), appliedBytes(0) {
|
||||
|
|
Loading…
Reference in New Issue