FastRestore:Loader:Use isSchedulable to guard OOM
And trigger delayed actors that are blocked on memory to recheck memory.
This commit is contained in:
parent
a354f6ffa2
commit
06495b90ae
|
@ -457,6 +457,10 @@ ACTOR static Future<Void> handleApplyToDBRequest(RestoreVersionBatchRequest req,
|
|||
self->finishedBatch.set(req.batchIndex);
|
||||
}
|
||||
}
|
||||
|
||||
if (self->delayedActors > 0) {
|
||||
self->checkMemory.trigger();
|
||||
}
|
||||
req.reply.send(RestoreCommonReply(self->id(), isDuplicated));
|
||||
|
||||
return Void();
|
||||
|
|
|
@ -201,7 +201,11 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
|
|||
.detail("BatchIndex", req.batchIndex)
|
||||
.detail("ProcessLoadParam", req.param.toString())
|
||||
.detail("NotProcessed", !paramExist)
|
||||
.detail("Processed", isReady);
|
||||
.detail("Processed", isReady)
|
||||
.detail("CurrentMemory", getSystemStatistics().processMemory);
|
||||
|
||||
wait(isSchedulable(self, req.batchIndex, __FUNCTION__);
|
||||
|
||||
if (batchData->processedFileParams.find(req.param) == batchData->processedFileParams.end()) {
|
||||
TraceEvent("FastRestoreLoadFile", self->id())
|
||||
.detail("BatchIndex", req.batchIndex)
|
||||
|
@ -226,6 +230,8 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
|
|||
return Void();
|
||||
}
|
||||
|
||||
// Send buffered mutations to appliers.
|
||||
// Do not need to block on low memory usage because this actor should not increase memory usage.
|
||||
ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequest req,
|
||||
Reference<RestoreLoaderData> self) {
|
||||
state Reference<LoaderBatchData> batchData = self->batch[req.batchIndex];
|
||||
|
@ -745,6 +751,9 @@ ACTOR Future<Void> handleFinishVersionBatchRequest(RestoreVersionBatchRequest re
|
|||
if (self->finishedBatch.get() == req.batchIndex - 1) {
|
||||
self->finishedBatch.set(req.batchIndex);
|
||||
}
|
||||
if (self->delayedActors > 0) {
|
||||
self->checkMemory.trigger();
|
||||
}
|
||||
req.reply.send(RestoreCommonReply(self->id(), false));
|
||||
return Void();
|
||||
}
|
Loading…
Reference in New Issue