Bug fix in restore dispatch, begin file was not being incremented. Removed try/catch because the inherited handleError() is better.

This commit is contained in:
Stephen Atherton 2017-11-15 22:38:31 -08:00
parent ab0017f023
commit cc47d0e161
1 changed files with 211 additions and 218 deletions

View File

@ -2132,264 +2132,257 @@ namespace fileBackup {
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state RestoreConfig restore(task);
try {
state Version beginVersion = Params.beginVersion().get(task);
state Reference<TaskFuture> onDone = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]);
state Version beginVersion = Params.beginVersion().get(task);
state Reference<TaskFuture> onDone = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]);
Void _ = wait(checkTaskVersion(tr->getDatabase(), task, name, version));
state int64_t remainingInBatch = Params.remainingInBatch().get(task);
state bool addingToExistingBatch = remainingInBatch > 0;
Void _ = wait(checkTaskVersion(tr->getDatabase(), task, name, version));
state int64_t remainingInBatch = Params.remainingInBatch().get(task);
state bool addingToExistingBatch = remainingInBatch > 0;
// If not adding to an existing batch then update the apply mutations end version so the mutations from the
// previous batch can be applied.
restore.setApplyEndVersion(tr, beginVersion);
// If not adding to an existing batch then update the apply mutations end version so the mutations from the
// previous batch can be applied.
restore.setApplyEndVersion(tr, beginVersion);
// Get the apply version lag
state int64_t applyLag = wait(restore.getApplyVersionLag(tr));
state int64_t batchSize = Params.batchSize().get(task);
// Get the apply version lag
state int64_t applyLag = wait(restore.getApplyVersionLag(tr));
state int64_t batchSize = Params.batchSize().get(task);
// If starting a new batch and the apply lag is too large then re-queue and wait
if(!addingToExistingBatch && applyLag > (BUGGIFY ? 1 : CLIENT_KNOBS->CORE_VERSIONSPERSECOND * 300)) {
// Wait a small amount of time and then re-add this same task.
Key _ = wait(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, beginVersion, "", 0, batchSize, remainingInBatch));
// If starting a new batch and the apply lag is too large then re-queue and wait
if(!addingToExistingBatch && applyLag > (BUGGIFY ? 1 : CLIENT_KNOBS->CORE_VERSIONSPERSECOND * 300)) {
// Wait a small amount of time and then re-add this same task.
Key _ = wait(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, beginVersion, "", 0, batchSize, remainingInBatch));
.detail("UID", restore.getUid())
.detail("BeginVersion", beginVersion)
.detail("ApplyLag", applyLag)
.detail("BatchSize", batchSize)
.detail("Decision", "too_far_behind")
.detail("TaskInstance", (uint64_t)this);
.detail("UID", restore.getUid())
.detail("BeginVersion", beginVersion)
.detail("ApplyLag", applyLag)
.detail("BatchSize", batchSize)
.detail("Decision", "too_far_behind")
.detail("TaskInstance", (uint64_t)this);
Void _ = wait(taskBucket->finish(tr, task));
return Void();
Void _ = wait(taskBucket->finish(tr, task));
return Void();
state std::string beginFile = Params.beginFile().getOrDefault(task);
// Get a batch of files. We're targeting batchSize blocks being dispatched so query for batchSize files (each of which is 0 or more blocks).
state RestoreConfig::FileSetT::Values files = wait(restore.fileSet().getRange(tr, {beginVersion, beginFile}, {}, CLIENT_KNOBS->RESTORE_DISPATCH_ADDTASK_SIZE));
state std::string beginFile = Params.beginFile().getOrDefault(task);
// Get a batch of files. We're targeting batchSize blocks being dispatched so query for batchSize files (each of which is 0 or more blocks).
state RestoreConfig::FileSetT::Values files = wait(restore.fileSet().getRange(tr, {beginVersion, beginFile}, {}, CLIENT_KNOBS->RESTORE_DISPATCH_ADDTASK_SIZE));
// allPartsDone will be set once all block tasks in the current batch are finished.
state Reference<TaskFuture> allPartsDone;
// allPartsDone will be set once all block tasks in the current batch are finished.
state Reference<TaskFuture> allPartsDone;
// If adding to existing batch then join the new block tasks to the existing batch future
// If adding to existing batch then join the new block tasks to the existing batch future
if(addingToExistingBatch) {
Key fKey = wait(restore.batchFuture().getD(tr));
allPartsDone = Reference<TaskFuture>(new TaskFuture(futureBucket, fKey));
else {
// Otherwise create a new future for the new batch
allPartsDone = futureBucket->future(tr);
restore.batchFuture().set(tr, allPartsDone->pack());
// Set batch quota remaining to batch size
remainingInBatch = batchSize;
// If there were no files to load then this batch is done and restore is almost done.
if(files.size() == 0) {
state Version restoreVersion = wait(restore.restoreVersion().getD(tr));
// If adding to existing batch then blocks could be in progress so create a new Dispatch task that waits for them to finish
if(addingToExistingBatch) {
Key fKey = wait(restore.batchFuture().getD(tr));
allPartsDone = Reference<TaskFuture>(new TaskFuture(futureBucket, fKey));
else {
// Otherwise create a new future for the new batch
allPartsDone = futureBucket->future(tr);
restore.batchFuture().set(tr, allPartsDone->pack());
// Set batch quota remaining to batch size
remainingInBatch = batchSize;
// If there were no files to load then this batch is done and restore is almost done.
if(files.size() == 0) {
state Version restoreVersion = wait(restore.restoreVersion().getD(tr));
// If adding to existing batch then blocks could be in progress so create a new Dispatch task that waits for them to finish
if(addingToExistingBatch) {
Key _ = wait(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, restoreVersion, "", 0, batchSize, 0, TaskCompletionKey::noSignal(), allPartsDone));
.detail("UID", restore.getUid())
.detail("BeginVersion", beginVersion)
.detail("BeginFile", Params.beginFile().get(task))
.detail("BeginBlock", Params.beginBlock().get(task))
.detail("RestoreVersion", restoreVersion)
.detail("ApplyLag", applyLag)
.detail("Decision", "end_of_final_batch")
.detail("TaskInstance", (uint64_t)this);
else if(beginVersion < restoreVersion) {
// If beginVersion is less than restoreVersion then do one more dispatch task to get there
Key _ = wait(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, restoreVersion, "", 0, batchSize));
.detail("UID", restore.getUid())
.detail("BeginVersion", beginVersion)
.detail("BeginFile", Params.beginFile().get(task))
.detail("BeginBlock", Params.beginBlock().get(task))
.detail("RestoreVersion", restoreVersion)
.detail("ApplyLag", applyLag)
.detail("Decision", "apply_to_restore_version")
.detail("TaskInstance", (uint64_t)this);
else if(applyLag == 0) {
// If apply lag is 0 then we are done so create the completion task
Key _ = wait(RestoreCompleteTaskFunc::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal()));
.detail("UID", restore.getUid())
.detail("BeginVersion", beginVersion)
.detail("BeginFile", Params.beginFile().get(task))
.detail("BeginBlock", Params.beginBlock().get(task))
.detail("ApplyLag", applyLag)
.detail("Decision", "restore_complete")
.detail("TaskInstance", (uint64_t)this);
} else {
// Applying of mutations is not yet finished so wait a small amount of time and then re-add this same task.
Key _ = wait(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, beginVersion, "", 0, batchSize));
.detail("UID", restore.getUid())
.detail("BeginVersion", beginVersion)
.detail("ApplyLag", applyLag)
.detail("Decision", "apply_still_behind")
.detail("TaskInstance", (uint64_t)this);
// If adding to existing batch then task is joined with a batch future so set done future
// Note that this must be done after joining at least one task with the batch future in case all other blockers already finished.
Future<Void> setDone = addingToExistingBatch ? onDone->set(tr, taskBucket) : Void();
Void _ = wait(taskBucket->finish(tr, task) && setDone);
return Void();
// Start moving through the file list and queuing up blocks. Only queue up to RESTORE_DISPATCH_ADDTASK_SIZE blocks per Dispatch task
// and target batchSize total per batch but a batch must end on a complete version boundary so exceed the limit if necessary
// to reach the end of a version of files.
state std::vector<Future<Key>> addTaskFutures;
state Version endVersion = files[0].version;
state int blocksDispatched = 0;
state int64_t beginBlock = Params.beginBlock().getOrDefault(task);
state int i = 0;
for(; i < files.size(); ++i) {
RestoreConfig::RestoreFile &f = files[i];
// Here we are "between versions" (prior to adding the first block of the first file of a new version) so this is an opportunity
// to end the current dispatch batch (which must end on a version boundary) if the batch size has been reached or exceeded
if(f.version != endVersion && remainingInBatch <= 0) {
// Next start will be at the first version after endVersion at the first file first block
beginFile = "";
beginBlock = 0;
// Set the starting point for the next task in case we stop inside this file
endVersion = f.version;
beginFile = f.fileName;
state int64_t j = beginBlock * f.blockSize;
// For each block of the file
for(; j < f.fileSize; j += f.blockSize) {
// Stop if we've reached the addtask limit
if(f.isRange) {
addTaskFutures.push_back(RestoreRangeTaskFunc::addTask(tr, taskBucket, task,
f, j, std::min<int64_t>(f.blockSize, f.fileSize - j),
else {
addTaskFutures.push_back(RestoreLogDataTaskFunc::addTask(tr, taskBucket, task,
f, j, std::min<int64_t>(f.blockSize, f.fileSize - j),
// Increment beginBlock for the file and total blocks dispatched for this task
// Stop if we've reached the addtask limit
// We just completed an entire file so the next task should start at the file after this one within endVersion (or later)
// if this iteration ends up being the last for this task
beginFile = beginFile;
beginBlock = 0;
//TraceEvent("FileRestoreDispatchedFile").detail("UID", restore.getUid()).detail("FileName", fi.filename).detail("TaskInstance", (uint64_t)this);
// If no blocks were dispatched then the next dispatch task should run now and be joined with the allPartsDone future
if(blocksDispatched == 0) {
std::string decision;
// If no files were dispatched either then the batch size wasn't large enough to catch all of the files at the next lowest non-dispatched
// version, so increase the batch size.
if(i == 0) {
batchSize *= 2;
decision = "increased_batch_size";
decision = "all_files_were_empty";
Key _ = wait(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, restoreVersion, "", 0, batchSize, 0, TaskCompletionKey::noSignal(), allPartsDone));
.detail("UID", restore.getUid())
.detail("BeginVersion", beginVersion)
.detail("BeginFile", Params.beginFile().get(task))
.detail("BeginBlock", Params.beginBlock().get(task))
.detail("EndVersion", endVersion)
.detail("RestoreVersion", restoreVersion)
.detail("ApplyLag", applyLag)
.detail("BatchSize", batchSize)
.detail("Decision", decision)
.detail("TaskInstance", (uint64_t)this)
.detail("RemainingInBatch", remainingInBatch);
.detail("Decision", "end_of_final_batch")
.detail("TaskInstance", (uint64_t)this);
else if(beginVersion < restoreVersion) {
// If beginVersion is less than restoreVersion then do one more dispatch task to get there
Key _ = wait(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, restoreVersion, "", 0, batchSize));
Void _ = wait(success(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, endVersion, beginFile, beginBlock, batchSize, remainingInBatch, TaskCompletionKey::joinWith((allPartsDone)))));
// If adding to existing batch then task is joined with a batch future so set done future.
// Note that this must be done after joining at least one task with the batch future in case all other blockers already finished.
Future<Void> setDone = addingToExistingBatch ? onDone->set(tr, taskBucket) : Void();
.detail("UID", restore.getUid())
.detail("BeginVersion", beginVersion)
.detail("BeginFile", Params.beginFile().get(task))
.detail("BeginBlock", Params.beginBlock().get(task))
.detail("RestoreVersion", restoreVersion)
.detail("ApplyLag", applyLag)
.detail("Decision", "apply_to_restore_version")
.detail("TaskInstance", (uint64_t)this);
else if(applyLag == 0) {
// If apply lag is 0 then we are done so create the completion task
Key _ = wait(RestoreCompleteTaskFunc::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal()));
Void _ = wait(setDone && taskBucket->finish(tr, task));
.detail("UID", restore.getUid())
.detail("BeginVersion", beginVersion)
.detail("BeginFile", Params.beginFile().get(task))
.detail("BeginBlock", Params.beginBlock().get(task))
.detail("ApplyLag", applyLag)
.detail("Decision", "restore_complete")
.detail("TaskInstance", (uint64_t)this);
} else {
// Applying of mutations is not yet finished so wait a small amount of time and then re-add this same task.
Key _ = wait(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, beginVersion, "", 0, batchSize));
return Void();
.detail("UID", restore.getUid())
.detail("BeginVersion", beginVersion)
.detail("ApplyLag", applyLag)
.detail("Decision", "apply_still_behind")
.detail("TaskInstance", (uint64_t)this);
// If adding to existing batch then task is joined with a batch future so set done future.
// If adding to existing batch then task is joined with a batch future so set done future
// Note that this must be done after joining at least one task with the batch future in case all other blockers already finished.
Future<Void> setDone = addingToExistingBatch ? onDone->set(tr, taskBucket) : Void();
// Increment the number of blocks dispatched in the restore config
restore.filesBlocksDispatched().atomicOp(tr, blocksDispatched, MutationRef::Type::AddValue);
Void _ = wait(taskBucket->finish(tr, task) && setDone);
return Void();
// If beginFile is not empty then we had to stop in the middle of a version (possibly within a file) so we cannot end
// the batch here because we do not know if we got all of the files and blocks from the last version queued, so
// make sure remainingInBatch is at least 1.
if(beginFile.size() != 0)
remainingInBatch = std::max<int64_t>(1, remainingInBatch);
// Start moving through the file list and queuing up blocks. Only queue up to RESTORE_DISPATCH_ADDTASK_SIZE blocks per Dispatch task
// and target batchSize total per batch but a batch must end on a complete version boundary so exceed the limit if necessary
// to reach the end of a version of files.
state std::vector<Future<Key>> addTaskFutures;
state Version endVersion = files[0].version;
state int blocksDispatched = 0;
state int64_t beginBlock = Params.beginBlock().getOrDefault(task);
state int i = 0;
// If more blocks need to be dispatched in this batch then add a follow-on task that is part of the allPartsDone group which will won't wait
// to run and will add more block tasks.
if(remainingInBatch > 0)
addTaskFutures.push_back(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, endVersion, beginFile, beginBlock, batchSize, remainingInBatch, TaskCompletionKey::joinWith(allPartsDone)));
else // Otherwise, add a follow-on task to continue after all previously dispatched blocks are done
addTaskFutures.push_back(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, endVersion, beginFile, beginBlock, batchSize, 0, TaskCompletionKey::noSignal(), allPartsDone));
for(; i < files.size(); ++i) {
RestoreConfig::RestoreFile &f = files[i];
Void _ = wait(setDone && waitForAll(addTaskFutures) && taskBucket->finish(tr, task));
// Here we are "between versions" (prior to adding the first block of the first file of a new version) so this is an opportunity
// to end the current dispatch batch (which must end on a version boundary) if the batch size has been reached or exceeded
if(f.version != endVersion && remainingInBatch <= 0) {
// Next start will be at the first version after endVersion at the first file first block
beginFile = "";
beginBlock = 0;
// Set the starting point for the next task in case we stop inside this file
endVersion = f.version;
beginFile = f.fileName;
state int64_t j = beginBlock * f.blockSize;
// For each block of the file
for(; j < f.fileSize; j += f.blockSize) {
// Stop if we've reached the addtask limit
if(f.isRange) {
addTaskFutures.push_back(RestoreRangeTaskFunc::addTask(tr, taskBucket, task,
f, j, std::min<int64_t>(f.blockSize, f.fileSize - j),
else {
addTaskFutures.push_back(RestoreLogDataTaskFunc::addTask(tr, taskBucket, task,
f, j, std::min<int64_t>(f.blockSize, f.fileSize - j),
// Increment beginBlock for the file and total blocks dispatched for this task
// Stop if we've reached the addtask limit
// We just completed an entire file so the next task should start at the file after this one within endVersion (or later)
// if this iteration ends up being the last for this task
beginFile = beginFile + '\x00';
beginBlock = 0;
//TraceEvent("FileRestoreDispatchedFile").detail("UID", restore.getUid()).detail("FileName", fi.filename).detail("TaskInstance", (uint64_t)this);
// If no blocks were dispatched then the next dispatch task should run now and be joined with the allPartsDone future
if(blocksDispatched == 0) {
std::string decision;
// If no files were dispatched either then the batch size wasn't large enough to catch all of the files at the next lowest non-dispatched
// version, so increase the batch size.
if(i == 0) {
batchSize *= 2;
decision = "increased_batch_size";
decision = "all_files_were_empty";
.detail("UID", restore.getUid())
.detail("BeginVersion", beginVersion)
.detail("BeginFile", Params.beginFile().get(task))
.detail("BeginBlock", Params.beginBlock().get(task))
.detail("EndVersion", endVersion)
.detail("ApplyLag", applyLag)
.detail("BatchSize", batchSize)
.detail("Decision", "dispatched_files")
.detail("FilesDispatched", i)
.detail("BlocksDispatched", blocksDispatched)
.detail("Decision", decision)
.detail("TaskInstance", (uint64_t)this)
.detail("RemainingInBatch", remainingInBatch);
} catch(Error &e) {
state Error err = e;
Void _ = wait(restore.logError(tr->getDatabase(), e, "RestoreDispatch: Unexpected error.", this));
throw err;
Void _ = wait(success(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, endVersion, beginFile, beginBlock, batchSize, remainingInBatch, TaskCompletionKey::joinWith((allPartsDone)))));
// If adding to existing batch then task is joined with a batch future so set done future.
// Note that this must be done after joining at least one task with the batch future in case all other blockers already finished.
Future<Void> setDone = addingToExistingBatch ? onDone->set(tr, taskBucket) : Void();
Void _ = wait(setDone && taskBucket->finish(tr, task));
return Void();
// If adding to existing batch then task is joined with a batch future so set done future.
Future<Void> setDone = addingToExistingBatch ? onDone->set(tr, taskBucket) : Void();
// Increment the number of blocks dispatched in the restore config
restore.filesBlocksDispatched().atomicOp(tr, blocksDispatched, MutationRef::Type::AddValue);
// If beginFile is not empty then we had to stop in the middle of a version (possibly within a file) so we cannot end
// the batch here because we do not know if we got all of the files and blocks from the last version queued, so
// make sure remainingInBatch is at least 1.
if(beginFile.size() != 0)
remainingInBatch = std::max<int64_t>(1, remainingInBatch);
// If more blocks need to be dispatched in this batch then add a follow-on task that is part of the allPartsDone group which will won't wait
// to run and will add more block tasks.
if(remainingInBatch > 0)
addTaskFutures.push_back(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, endVersion, beginFile, beginBlock, batchSize, remainingInBatch, TaskCompletionKey::joinWith(allPartsDone)));
else // Otherwise, add a follow-on task to continue after all previously dispatched blocks are done
addTaskFutures.push_back(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, endVersion, beginFile, beginBlock, batchSize, 0, TaskCompletionKey::noSignal(), allPartsDone));
Void _ = wait(setDone && waitForAll(addTaskFutures) && taskBucket->finish(tr, task));
.detail("BeginVersion", beginVersion)
.detail("BeginFile", Params.beginFile().get(task))
.detail("BeginBlock", Params.beginBlock().get(task))
.detail("EndVersion", endVersion)
.detail("ApplyLag", applyLag)
.detail("BatchSize", batchSize)
.detail("Decision", "dispatched_files")
.detail("FilesDispatched", i)
.detail("BlocksDispatched", blocksDispatched)
.detail("TaskInstance", (uint64_t)this)
.detail("RemainingInBatch", remainingInBatch);
return Void();