FastRestoreLoader:Clean batchData when a version batch finishes

Add sanity check for each loader phase to ensure an older version batch
will not be executed once a version batch has been marked as finished.
This commit is contained in:
Meng Xu 2020-05-04 11:31:39 -07:00
parent 0ba1551116
commit 68bcecd7d4
3 changed files with 11 additions and 0 deletions

View File

@ -336,6 +336,8 @@ ACTOR Future<Void> handleLoadFileRequest(RestoreLoadFileRequest req, Reference<R
.detail("NotProcessed", !paramExist)
.detail("Processed", isReady)
.detail("CurrentMemory", getSystemStatistics().processMemory);
// Loader destroy batchData once the batch finishes and self->finishedBatch.set(req.batchIndex);
ASSERT(self->finishedBatch.get() < req.batchIndex);
wait(isSchedulable(self, req.batchIndex, __FUNCTION__));
@ -376,6 +378,8 @@ ACTOR Future<Void> handleSendMutationsRequest(RestoreSendMutationsToAppliersRequ
.detail("BatchIndex", req.batchIndex)
.detail("UseRangeFile", req.useRangeFile)
.detail("LoaderSendStatus", batchStatus->toString());
// Loader destroy batchData once the batch finishes and self->finishedBatch.set(req.batchIndex);
ASSERT(self->finishedBatch.get() < req.batchIndex);
// Ensure each file is sent exactly once by using batchStatus->sendAllLogs and batchStatus->sendAllRanges
if (!req.useRangeFile) {
@ -945,6 +949,9 @@ ACTOR Future<Void> handleFinishVersionBatchRequest(RestoreVersionBatchRequest re
wait(self->finishedBatch.whenAtLeast(req.batchIndex - 1));
if (self->finishedBatch.get() == req.batchIndex - 1) {
self->finishedBatch.set(req.batchIndex);
// Clean up batchData
self->batch.erase(req.batchIndex);
self->status.erase(req.batchIndex);
}
if (self->delayedActors > 0) {
self->checkMemory.trigger();

View File

@ -177,6 +177,7 @@ struct RestoreLoaderData : RestoreRoleData, public ReferenceCounted<RestoreLoade
void resetPerRestoreRequest() {
batch.clear();
status.clear();
finishedBatch = NotifiedVersion(0);
}
void initBackupContainer(Key url) {

View File

@ -57,6 +57,9 @@ ACTOR Future<Void> handleInitVersionBatchRequest(RestoreVersionBatchRequest req,
.detail("BatchIndex", req.batchIndex)
.detail("Role", getRoleStr(self->role))
.detail("VersionBatchNotifiedVersion", self->versionBatchId.get());
// Loader destroy batchData once the batch finishes and self->finishedBatch.set(req.batchIndex);
ASSERT(self->finishedBatch.get() < req.batchIndex);
// batchId is continuous. (req.batchIndex-1) is the id of the just finished batch.
wait(self->versionBatchId.whenAtLeast(req.batchIndex - 1));