FastRestore:Pipeline multiple version batches

This commit is contained in:
Meng Xu 2020-01-15 17:24:01 -08:00
parent 441f3e2814
commit 8d3f3aa926
1 changed files with 10 additions and 0 deletions

View File

@ -250,12 +250,18 @@ ACTOR static Future<Version> processRestoreRequest(Reference<RestoreMasterData>
self->dumpVersionBatches(self->versionBatches);
ASSERT(self->batchIndex == 1); // versionBatchIndex starts at 1 because NotifiedVersion starts at 0
std::vector<Future<Void>> fBatches;
// TODO: Control how many batches can be processed in parallel. Avoid dead lock due to OOM on loaders
for (versionBatch = self->versionBatches.begin(); versionBatch != self->versionBatches.end(); versionBatch++) {
self->batch[self->batchIndex] = Reference<MasterBatchData>(new MasterBatchData());
// fBatches.push_back(
// distributeWorkloadPerVersionBatch(self, self->batchIndex, cx, request, versionBatch->second));
wait(distributeWorkloadPerVersionBatch(self, self->batchIndex, cx, request, versionBatch->second));
self->batchIndex++;
}
//wait(waitForAll(fBatches));
TraceEvent("FastRestore").detail("RestoreToVersion", request.targetVersion);
return request.targetVersion;
}
@ -360,9 +366,13 @@ ACTOR static Future<Void> distributeWorkloadPerVersionBatch(Reference<RestoreMas
// Parse log files and send mutations to appliers before we parse range files
// TODO: Allow loading both range and log files in parallel
ASSERT(batchData->samples.empty());
ASSERT(batchData->samplesSize < 1 && batchData->samplesSize > -1); // samplesSize should be 0
wait(loadFilesOnLoaders(batchData, self->loadersInterf, batchIndex, cx, request, versionBatch, false));
wait(loadFilesOnLoaders(batchData, self->loadersInterf, batchIndex, cx, request, versionBatch, true));
ASSERT(batchData->rangeToApplier.empty());
splitKeyRangeForAppliers(batchData, self->appliersInterf, batchIndex);
// Loaders should ensure log files' mutations sent to appliers before range files' mutations