FastRestore:Lower priority for RestoreApplierReceiveMutations actor
This commit is contained in:
parent
78c45c1200
commit
e57dba00bd
|
@ -66,7 +66,9 @@ ACTOR Future<Void> restoreApplierCore(RestoreApplierInterface applierInterf, int
|
|||
}
|
||||
when(RestoreVersionBatchRequest req = waitNext(applierInterf.applyToDB.getFuture())) {
|
||||
requestTypeStr = "applyToDB";
|
||||
actors.add(handleApplyToDBRequest(req, self, cx));
|
||||
actors.add(handleApplyToDBRequest(
|
||||
req, self, cx)); // TODO: Check how FDB uses TaskPriority for ACTORS. We may need to add
|
||||
// priority here to avoid requests at later VB block requests at earlier VBs
|
||||
}
|
||||
when(RestoreVersionBatchRequest req = waitNext(applierInterf.initVersionBatch.getFuture())) {
|
||||
requestTypeStr = "initVersionBatch";
|
||||
|
@ -113,14 +115,23 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
|
|||
// Assume: processedFileState[req.asset] will not be erased while the actor is active.
|
||||
// Note: Insert new items into processedFileState will not invalidate the reference.
|
||||
state NotifiedVersion& curMsgIndex = batchData->processedFileState[req.asset];
|
||||
state bool printTrace = false;
|
||||
|
||||
TraceEvent(SevFRDebugInfo, "FastRestoreApplierPhaseReceiveMutations", self->id())
|
||||
wait(delay(0.0, TaskPriority::RestoreApplierReceiveMutations));
|
||||
|
||||
batchData->receiveMutationReqs += 1;
|
||||
// Trace when the receive phase starts at a VB and when it finishes.
|
||||
// This can help check if receiveMutations block applyMutation phase. If so, we need more sophisticated schedule to
|
||||
// ensure priority execution
|
||||
printTrace = (batchData->receiveMutationReqs % 100 == 1);
|
||||
TraceEvent(printTrace ? SevInfo : SevFRDebugInfo, "FastRestoreApplierPhaseReceiveMutations", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("RestoreAsset", req.asset.toString())
|
||||
.detail("RestoreAssetMesssageIndex", curMsgIndex.get())
|
||||
.detail("Request", req.toString())
|
||||
.detail("CurrentMemory", getSystemStatistics().processMemory)
|
||||
.detail("PreviousVersionBatchState", batchData->vbState.get());
|
||||
.detail("PreviousVersionBatchState", batchData->vbState.get())
|
||||
.detail("ReceiveMutationRequests", batchData->receiveMutationReqs);
|
||||
|
||||
wait(isSchedulable(self, req.batchIndex, __FUNCTION__));
|
||||
|
||||
|
@ -159,7 +170,7 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
|
|||
}
|
||||
|
||||
req.reply.send(RestoreCommonReply(self->id(), isDuplicated));
|
||||
TraceEvent(SevFRDebugInfo, "FastRestoreApplierPhaseReceiveMutationsDone", self->id())
|
||||
TraceEvent(printTrace ? SevInfo : SevFRDebugInfo, "FastRestoreApplierPhaseReceiveMutationsDone", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("RestoreAsset", req.asset.toString())
|
||||
.detail("ProcessedMessageIndex", curMsgIndex.get())
|
||||
|
@ -169,7 +180,7 @@ ACTOR static Future<Void> handleSendMutationVectorRequest(RestoreSendVersionedMu
|
|||
|
||||
// Clear all ranges in input ranges
|
||||
ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRangeRef>> ranges, double delayTime,
|
||||
Database cx, UID applierID, int batchIndex) {
|
||||
Database cx, UID applierID, int batchIndex, ApplierBatchData* cc) {
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
state int retries = 0;
|
||||
state double numOps = 0;
|
||||
|
@ -186,6 +197,7 @@ ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRange
|
|||
debugFRMutation("FastRestoreApplierApplyClearRangeMutation", 0,
|
||||
MutationRef(MutationRef::ClearRange, range.begin, range.end));
|
||||
tr->clear(range);
|
||||
cc->clearOps += 1;
|
||||
++numOps;
|
||||
if (numOps >= SERVER_KNOBS->FASTRESTORE_TXN_CLEAR_MAX) {
|
||||
TraceEvent(SevWarn, "FastRestoreApplierClearRangeMutationsTooManyClearsInTxn")
|
||||
|
@ -196,6 +208,7 @@ ACTOR static Future<Void> applyClearRangeMutations(Standalone<VectorRef<KeyRange
|
|||
}
|
||||
}
|
||||
wait(tr->commit());
|
||||
cc->clearTxns += 1;
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
retries++;
|
||||
|
@ -307,14 +320,16 @@ ACTOR static Future<Void> precomputeMutationsResult(Reference<ApplierBatchData>
|
|||
clearRanges.push_back_deep(clearRanges.arena(), range);
|
||||
curTxnSize += range.expectedSize();
|
||||
if (curTxnSize >= SERVER_KNOBS->FASTRESTORE_TXN_BATCH_MAX_BYTES) {
|
||||
fClearRanges.push_back(applyClearRangeMutations(clearRanges, delayTime, cx, applierID, batchIndex));
|
||||
fClearRanges.push_back(
|
||||
applyClearRangeMutations(clearRanges, delayTime, cx, applierID, batchIndex, &batchData->counters));
|
||||
delayTime += SERVER_KNOBS->FASTRESTORE_TXN_DELAY_OFFSET;
|
||||
clearRanges = Standalone<VectorRef<KeyRangeRef>>();
|
||||
curTxnSize = 0;
|
||||
}
|
||||
}
|
||||
if (curTxnSize > 0) {
|
||||
fClearRanges.push_back(applyClearRangeMutations(clearRanges, delayTime, cx, applierID, batchIndex));
|
||||
fClearRanges.push_back(
|
||||
applyClearRangeMutations(clearRanges, delayTime, cx, applierID, batchIndex, &batchData->counters));
|
||||
}
|
||||
|
||||
// Apply range mutations (i.e., clearRange) to stagingKeyRanges
|
||||
|
|
|
@ -253,6 +253,8 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
|
||||
RoleVersionBatchState vbState;
|
||||
|
||||
long receiveMutationReqs;
|
||||
|
||||
// Status counters
|
||||
struct Counters {
|
||||
CounterCollection cc;
|
||||
|
@ -260,6 +262,7 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
Counter appliedWeightedBytes, appliedMutations, appliedAtomicOps;
|
||||
Counter appliedTxns, appliedTxnRetries;
|
||||
Counter fetchKeys, fetchTxns, fetchTxnRetries; // number of keys to fetch from dest. FDB cluster.
|
||||
Counter clearOps, clearTxns;
|
||||
// TODO: Add the counter in applying phase
|
||||
|
||||
Counters(ApplierBatchData* self, UID applierInterfID, int batchIndex)
|
||||
|
@ -269,7 +272,7 @@ struct ApplierBatchData : public ReferenceCounted<ApplierBatchData> {
|
|||
appliedWeightedBytes("AppliedWeightedBytes", cc), appliedMutations("AppliedMutations", cc),
|
||||
appliedAtomicOps("AppliedAtomicOps", cc), appliedTxns("AppliedTxns", cc),
|
||||
appliedTxnRetries("AppliedTxnRetries", cc), fetchKeys("FetchKeys", cc), fetchTxns("FetchTxns", cc),
|
||||
fetchTxnRetries("FetchTxnRetries", cc) {}
|
||||
fetchTxnRetries("FetchTxnRetries", cc), clearOps("ClearOps", cc), clearTxns("ClearTxns", cc) {}
|
||||
} counters;
|
||||
|
||||
void addref() { return ReferenceCounted<ApplierBatchData>::addref(); }
|
||||
|
|
|
@ -348,11 +348,15 @@ ACTOR Future<Void> _processLoadingParam(KeyRangeMap<Version>* pRangeVersions, Lo
|
|||
ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<RestoreLoaderData> self) {
|
||||
state Reference<LoaderBatchData> batchData = self->batch[req.batchIndex];
|
||||
state bool isDuplicated = true;
|
||||
state bool printTrace = false;
|
||||
ASSERT(batchData.isValid());
|
||||
bool paramExist = batchData->processedFileParams.find(req.param) != batchData->processedFileParams.end();
|
||||
bool isReady = paramExist ? batchData->processedFileParams[req.param].isReady() : false;
|
||||
|
||||
TraceEvent(SevFRDebugInfo, "FastRestoreLoaderPhaseLoadFile", self->id())
|
||||
batchData->loadFileReqs += 1;
|
||||
printTrace = (batchData->loadFileReqs % 10 == 1);
|
||||
// TODO: Make priority lower than sendMutation priority.
|
||||
TraceEvent(printTrace ? SevInfo : SevFRDebugInfo, "FastRestoreLoaderPhaseLoadFile", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("ProcessLoadParam", req.param.toString())
|
||||
.detail("NotProcessed", !paramExist)
|
||||
|
@ -381,7 +385,7 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
|
|||
wait(it->second); // wait on the processing of the req.param.
|
||||
|
||||
req.reply.send(RestoreLoadFileReply(req.param, batchData->sampleMutations[req.param], isDuplicated));
|
||||
TraceEvent(SevFRDebugInfo, "FastRestoreLoaderPhaseLoadFileDone", self->id())
|
||||
TraceEvent(printTrace ? SevInfo : SevFRDebugInfo, "FastRestoreLoaderPhaseLoadFileDone", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("ProcessLoadParam", req.param.toString());
|
||||
// TODO: clear self->sampleMutations[req.param] memory to save memory on loader
|
||||
|
|
|
@ -77,6 +77,8 @@ struct LoaderBatchData : public ReferenceCounted<LoaderBatchData> {
|
|||
|
||||
LoaderVersionBatchState vbState;
|
||||
|
||||
long loadFileReqs;
|
||||
|
||||
// Status counters
|
||||
struct Counters {
|
||||
CounterCollection cc;
|
||||
|
|
Loading…
Reference in New Issue