From cc47d0e161b77267f780a5dd171966ba17d98444 Mon Sep 17 00:00:00 2001 From: Stephen Atherton Date: Wed, 15 Nov 2017 22:38:31 -0800 Subject: [PATCH] Bug fix in restore dispatch, begin file was not being incremented. Removed try/catch because the inherited handleError() is better. --- fdbclient/FileBackupAgent.actor.cpp | 429 ++++++++++++++-------------- 1 file changed, 211 insertions(+), 218 deletions(-) diff --git a/fdbclient/FileBackupAgent.actor.cpp b/fdbclient/FileBackupAgent.actor.cpp index d5f0d7c6c3..7114b81af5 100644 --- a/fdbclient/FileBackupAgent.actor.cpp +++ b/fdbclient/FileBackupAgent.actor.cpp @@ -2132,264 +2132,257 @@ namespace fileBackup { ACTOR static Future _finish(Reference tr, Reference taskBucket, Reference futureBucket, Reference task) { state RestoreConfig restore(task); - try { - state Version beginVersion = Params.beginVersion().get(task); - state Reference onDone = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]); + state Version beginVersion = Params.beginVersion().get(task); + state Reference onDone = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]); - Void _ = wait(checkTaskVersion(tr->getDatabase(), task, name, version)); - state int64_t remainingInBatch = Params.remainingInBatch().get(task); - state bool addingToExistingBatch = remainingInBatch > 0; + Void _ = wait(checkTaskVersion(tr->getDatabase(), task, name, version)); + state int64_t remainingInBatch = Params.remainingInBatch().get(task); + state bool addingToExistingBatch = remainingInBatch > 0; - // If not adding to an existing batch then update the apply mutations end version so the mutations from the - // previous batch can be applied. - if(!addingToExistingBatch) - restore.setApplyEndVersion(tr, beginVersion); + // If not adding to an existing batch then update the apply mutations end version so the mutations from the + // previous batch can be applied. + if(!addingToExistingBatch) + restore.setApplyEndVersion(tr, beginVersion); - // Get the apply version lag - state int64_t applyLag = wait(restore.getApplyVersionLag(tr)); - state int64_t batchSize = Params.batchSize().get(task); + // Get the apply version lag + state int64_t applyLag = wait(restore.getApplyVersionLag(tr)); + state int64_t batchSize = Params.batchSize().get(task); - // If starting a new batch and the apply lag is too large then re-queue and wait - if(!addingToExistingBatch && applyLag > (BUGGIFY ? 1 : CLIENT_KNOBS->CORE_VERSIONSPERSECOND * 300)) { - // Wait a small amount of time and then re-add this same task. - Void _ = wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); - Key _ = wait(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, beginVersion, "", 0, batchSize, remainingInBatch)); + // If starting a new batch and the apply lag is too large then re-queue and wait + if(!addingToExistingBatch && applyLag > (BUGGIFY ? 1 : CLIENT_KNOBS->CORE_VERSIONSPERSECOND * 300)) { + // Wait a small amount of time and then re-add this same task. + Void _ = wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); + Key _ = wait(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, beginVersion, "", 0, batchSize, remainingInBatch)); - TraceEvent("FileRestoreDispatch") - .detail("UID", restore.getUid()) - .detail("BeginVersion", beginVersion) - .detail("ApplyLag", applyLag) - .detail("BatchSize", batchSize) - .detail("Decision", "too_far_behind") - .detail("TaskInstance", (uint64_t)this); + TraceEvent("FileRestoreDispatch") + .detail("UID", restore.getUid()) + .detail("BeginVersion", beginVersion) + .detail("ApplyLag", applyLag) + .detail("BatchSize", batchSize) + .detail("Decision", "too_far_behind") + .detail("TaskInstance", (uint64_t)this); - Void _ = wait(taskBucket->finish(tr, task)); - return Void(); - } + Void _ = wait(taskBucket->finish(tr, task)); + return Void(); + } - state std::string beginFile = Params.beginFile().getOrDefault(task); - // Get a batch of files. We're targeting batchSize blocks being dispatched so query for batchSize files (each of which is 0 or more blocks). - state RestoreConfig::FileSetT::Values files = wait(restore.fileSet().getRange(tr, {beginVersion, beginFile}, {}, CLIENT_KNOBS->RESTORE_DISPATCH_ADDTASK_SIZE)); + state std::string beginFile = Params.beginFile().getOrDefault(task); + // Get a batch of files. We're targeting batchSize blocks being dispatched so query for batchSize files (each of which is 0 or more blocks). + state RestoreConfig::FileSetT::Values files = wait(restore.fileSet().getRange(tr, {beginVersion, beginFile}, {}, CLIENT_KNOBS->RESTORE_DISPATCH_ADDTASK_SIZE)); - // allPartsDone will be set once all block tasks in the current batch are finished. - state Reference allPartsDone; + // allPartsDone will be set once all block tasks in the current batch are finished. + state Reference allPartsDone; - // If adding to existing batch then join the new block tasks to the existing batch future + // If adding to existing batch then join the new block tasks to the existing batch future + if(addingToExistingBatch) { + Key fKey = wait(restore.batchFuture().getD(tr)); + allPartsDone = Reference(new TaskFuture(futureBucket, fKey)); + } + else { + // Otherwise create a new future for the new batch + allPartsDone = futureBucket->future(tr); + restore.batchFuture().set(tr, allPartsDone->pack()); + // Set batch quota remaining to batch size + remainingInBatch = batchSize; + } + + // If there were no files to load then this batch is done and restore is almost done. + if(files.size() == 0) { + state Version restoreVersion = wait(restore.restoreVersion().getD(tr)); + + // If adding to existing batch then blocks could be in progress so create a new Dispatch task that waits for them to finish if(addingToExistingBatch) { - Key fKey = wait(restore.batchFuture().getD(tr)); - allPartsDone = Reference(new TaskFuture(futureBucket, fKey)); - } - else { - // Otherwise create a new future for the new batch - allPartsDone = futureBucket->future(tr); - restore.batchFuture().set(tr, allPartsDone->pack()); - // Set batch quota remaining to batch size - remainingInBatch = batchSize; - } - - // If there were no files to load then this batch is done and restore is almost done. - if(files.size() == 0) { - state Version restoreVersion = wait(restore.restoreVersion().getD(tr)); - - // If adding to existing batch then blocks could be in progress so create a new Dispatch task that waits for them to finish - if(addingToExistingBatch) { - Key _ = wait(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, restoreVersion, "", 0, batchSize, 0, TaskCompletionKey::noSignal(), allPartsDone)); - - TraceEvent("FileRestoreDispatch") - .detail("UID", restore.getUid()) - .detail("BeginVersion", beginVersion) - .detail("BeginFile", Params.beginFile().get(task)) - .detail("BeginBlock", Params.beginBlock().get(task)) - .detail("RestoreVersion", restoreVersion) - .detail("ApplyLag", applyLag) - .detail("Decision", "end_of_final_batch") - .detail("TaskInstance", (uint64_t)this); - } - else if(beginVersion < restoreVersion) { - // If beginVersion is less than restoreVersion then do one more dispatch task to get there - Key _ = wait(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, restoreVersion, "", 0, batchSize)); - - TraceEvent("FileRestoreDispatch") - .detail("UID", restore.getUid()) - .detail("BeginVersion", beginVersion) - .detail("BeginFile", Params.beginFile().get(task)) - .detail("BeginBlock", Params.beginBlock().get(task)) - .detail("RestoreVersion", restoreVersion) - .detail("ApplyLag", applyLag) - .detail("Decision", "apply_to_restore_version") - .detail("TaskInstance", (uint64_t)this); - } - else if(applyLag == 0) { - // If apply lag is 0 then we are done so create the completion task - Key _ = wait(RestoreCompleteTaskFunc::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal())); - - TraceEvent("FileRestoreDispatch") - .detail("UID", restore.getUid()) - .detail("BeginVersion", beginVersion) - .detail("BeginFile", Params.beginFile().get(task)) - .detail("BeginBlock", Params.beginBlock().get(task)) - .detail("ApplyLag", applyLag) - .detail("Decision", "restore_complete") - .detail("TaskInstance", (uint64_t)this); - } else { - // Applying of mutations is not yet finished so wait a small amount of time and then re-add this same task. - Void _ = wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); - Key _ = wait(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, beginVersion, "", 0, batchSize)); - - TraceEvent("FileRestoreDispatch") - .detail("UID", restore.getUid()) - .detail("BeginVersion", beginVersion) - .detail("ApplyLag", applyLag) - .detail("Decision", "apply_still_behind") - .detail("TaskInstance", (uint64_t)this); - } - - // If adding to existing batch then task is joined with a batch future so set done future - // Note that this must be done after joining at least one task with the batch future in case all other blockers already finished. - Future setDone = addingToExistingBatch ? onDone->set(tr, taskBucket) : Void(); - - Void _ = wait(taskBucket->finish(tr, task) && setDone); - return Void(); - } - - // Start moving through the file list and queuing up blocks. Only queue up to RESTORE_DISPATCH_ADDTASK_SIZE blocks per Dispatch task - // and target batchSize total per batch but a batch must end on a complete version boundary so exceed the limit if necessary - // to reach the end of a version of files. - state std::vector> addTaskFutures; - state Version endVersion = files[0].version; - state int blocksDispatched = 0; - state int64_t beginBlock = Params.beginBlock().getOrDefault(task); - state int i = 0; - - for(; i < files.size(); ++i) { - RestoreConfig::RestoreFile &f = files[i]; - - // Here we are "between versions" (prior to adding the first block of the first file of a new version) so this is an opportunity - // to end the current dispatch batch (which must end on a version boundary) if the batch size has been reached or exceeded - if(f.version != endVersion && remainingInBatch <= 0) { - // Next start will be at the first version after endVersion at the first file first block - ++endVersion; - beginFile = ""; - beginBlock = 0; - break; - } - - // Set the starting point for the next task in case we stop inside this file - endVersion = f.version; - beginFile = f.fileName; - - state int64_t j = beginBlock * f.blockSize; - // For each block of the file - for(; j < f.fileSize; j += f.blockSize) { - // Stop if we've reached the addtask limit - if(blocksDispatched == CLIENT_KNOBS->RESTORE_DISPATCH_ADDTASK_SIZE) - break; - - if(f.isRange) { - addTaskFutures.push_back(RestoreRangeTaskFunc::addTask(tr, taskBucket, task, - f, j, std::min(f.blockSize, f.fileSize - j), - TaskCompletionKey::joinWith(allPartsDone))); - } - else { - addTaskFutures.push_back(RestoreLogDataTaskFunc::addTask(tr, taskBucket, task, - f, j, std::min(f.blockSize, f.fileSize - j), - TaskCompletionKey::joinWith(allPartsDone))); - } - - // Increment beginBlock for the file and total blocks dispatched for this task - ++beginBlock; - ++blocksDispatched; - --remainingInBatch; - } - - // Stop if we've reached the addtask limit - if(blocksDispatched == CLIENT_KNOBS->RESTORE_DISPATCH_ADDTASK_SIZE) - break; - - // We just completed an entire file so the next task should start at the file after this one within endVersion (or later) - // if this iteration ends up being the last for this task - beginFile = beginFile; - beginBlock = 0; - - //TraceEvent("FileRestoreDispatchedFile").detail("UID", restore.getUid()).detail("FileName", fi.filename).detail("TaskInstance", (uint64_t)this); - } - - // If no blocks were dispatched then the next dispatch task should run now and be joined with the allPartsDone future - if(blocksDispatched == 0) { - std::string decision; - - // If no files were dispatched either then the batch size wasn't large enough to catch all of the files at the next lowest non-dispatched - // version, so increase the batch size. - if(i == 0) { - batchSize *= 2; - decision = "increased_batch_size"; - } - else - decision = "all_files_were_empty"; + Key _ = wait(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, restoreVersion, "", 0, batchSize, 0, TaskCompletionKey::noSignal(), allPartsDone)); TraceEvent("FileRestoreDispatch") .detail("UID", restore.getUid()) .detail("BeginVersion", beginVersion) .detail("BeginFile", Params.beginFile().get(task)) .detail("BeginBlock", Params.beginBlock().get(task)) - .detail("EndVersion", endVersion) + .detail("RestoreVersion", restoreVersion) .detail("ApplyLag", applyLag) - .detail("BatchSize", batchSize) - .detail("Decision", decision) - .detail("TaskInstance", (uint64_t)this) - .detail("RemainingInBatch", remainingInBatch); + .detail("Decision", "end_of_final_batch") + .detail("TaskInstance", (uint64_t)this); + } + else if(beginVersion < restoreVersion) { + // If beginVersion is less than restoreVersion then do one more dispatch task to get there + Key _ = wait(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, restoreVersion, "", 0, batchSize)); - Void _ = wait(success(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, endVersion, beginFile, beginBlock, batchSize, remainingInBatch, TaskCompletionKey::joinWith((allPartsDone))))); - - // If adding to existing batch then task is joined with a batch future so set done future. - // Note that this must be done after joining at least one task with the batch future in case all other blockers already finished. - Future setDone = addingToExistingBatch ? onDone->set(tr, taskBucket) : Void(); + TraceEvent("FileRestoreDispatch") + .detail("UID", restore.getUid()) + .detail("BeginVersion", beginVersion) + .detail("BeginFile", Params.beginFile().get(task)) + .detail("BeginBlock", Params.beginBlock().get(task)) + .detail("RestoreVersion", restoreVersion) + .detail("ApplyLag", applyLag) + .detail("Decision", "apply_to_restore_version") + .detail("TaskInstance", (uint64_t)this); + } + else if(applyLag == 0) { + // If apply lag is 0 then we are done so create the completion task + Key _ = wait(RestoreCompleteTaskFunc::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal())); - Void _ = wait(setDone && taskBucket->finish(tr, task)); + TraceEvent("FileRestoreDispatch") + .detail("UID", restore.getUid()) + .detail("BeginVersion", beginVersion) + .detail("BeginFile", Params.beginFile().get(task)) + .detail("BeginBlock", Params.beginBlock().get(task)) + .detail("ApplyLag", applyLag) + .detail("Decision", "restore_complete") + .detail("TaskInstance", (uint64_t)this); + } else { + // Applying of mutations is not yet finished so wait a small amount of time and then re-add this same task. + Void _ = wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); + Key _ = wait(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, beginVersion, "", 0, batchSize)); - return Void(); + TraceEvent("FileRestoreDispatch") + .detail("UID", restore.getUid()) + .detail("BeginVersion", beginVersion) + .detail("ApplyLag", applyLag) + .detail("Decision", "apply_still_behind") + .detail("TaskInstance", (uint64_t)this); } - // If adding to existing batch then task is joined with a batch future so set done future. + // If adding to existing batch then task is joined with a batch future so set done future + // Note that this must be done after joining at least one task with the batch future in case all other blockers already finished. Future setDone = addingToExistingBatch ? onDone->set(tr, taskBucket) : Void(); - // Increment the number of blocks dispatched in the restore config - restore.filesBlocksDispatched().atomicOp(tr, blocksDispatched, MutationRef::Type::AddValue); + Void _ = wait(taskBucket->finish(tr, task) && setDone); + return Void(); + } - // If beginFile is not empty then we had to stop in the middle of a version (possibly within a file) so we cannot end - // the batch here because we do not know if we got all of the files and blocks from the last version queued, so - // make sure remainingInBatch is at least 1. - if(beginFile.size() != 0) - remainingInBatch = std::max(1, remainingInBatch); + // Start moving through the file list and queuing up blocks. Only queue up to RESTORE_DISPATCH_ADDTASK_SIZE blocks per Dispatch task + // and target batchSize total per batch but a batch must end on a complete version boundary so exceed the limit if necessary + // to reach the end of a version of files. + state std::vector> addTaskFutures; + state Version endVersion = files[0].version; + state int blocksDispatched = 0; + state int64_t beginBlock = Params.beginBlock().getOrDefault(task); + state int i = 0; - // If more blocks need to be dispatched in this batch then add a follow-on task that is part of the allPartsDone group which will won't wait - // to run and will add more block tasks. - if(remainingInBatch > 0) - addTaskFutures.push_back(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, endVersion, beginFile, beginBlock, batchSize, remainingInBatch, TaskCompletionKey::joinWith(allPartsDone))); - else // Otherwise, add a follow-on task to continue after all previously dispatched blocks are done - addTaskFutures.push_back(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, endVersion, beginFile, beginBlock, batchSize, 0, TaskCompletionKey::noSignal(), allPartsDone)); + for(; i < files.size(); ++i) { + RestoreConfig::RestoreFile &f = files[i]; - Void _ = wait(setDone && waitForAll(addTaskFutures) && taskBucket->finish(tr, task)); + // Here we are "between versions" (prior to adding the first block of the first file of a new version) so this is an opportunity + // to end the current dispatch batch (which must end on a version boundary) if the batch size has been reached or exceeded + if(f.version != endVersion && remainingInBatch <= 0) { + // Next start will be at the first version after endVersion at the first file first block + ++endVersion; + beginFile = ""; + beginBlock = 0; + break; + } + + // Set the starting point for the next task in case we stop inside this file + endVersion = f.version; + beginFile = f.fileName; + + state int64_t j = beginBlock * f.blockSize; + // For each block of the file + for(; j < f.fileSize; j += f.blockSize) { + // Stop if we've reached the addtask limit + if(blocksDispatched == CLIENT_KNOBS->RESTORE_DISPATCH_ADDTASK_SIZE) + break; + + if(f.isRange) { + addTaskFutures.push_back(RestoreRangeTaskFunc::addTask(tr, taskBucket, task, + f, j, std::min(f.blockSize, f.fileSize - j), + TaskCompletionKey::joinWith(allPartsDone))); + } + else { + addTaskFutures.push_back(RestoreLogDataTaskFunc::addTask(tr, taskBucket, task, + f, j, std::min(f.blockSize, f.fileSize - j), + TaskCompletionKey::joinWith(allPartsDone))); + } + + // Increment beginBlock for the file and total blocks dispatched for this task + ++beginBlock; + ++blocksDispatched; + --remainingInBatch; + } + + // Stop if we've reached the addtask limit + if(blocksDispatched == CLIENT_KNOBS->RESTORE_DISPATCH_ADDTASK_SIZE) + break; + + // We just completed an entire file so the next task should start at the file after this one within endVersion (or later) + // if this iteration ends up being the last for this task + beginFile = beginFile + '\x00'; + beginBlock = 0; + + //TraceEvent("FileRestoreDispatchedFile").detail("UID", restore.getUid()).detail("FileName", fi.filename).detail("TaskInstance", (uint64_t)this); + } + + // If no blocks were dispatched then the next dispatch task should run now and be joined with the allPartsDone future + if(blocksDispatched == 0) { + std::string decision; + + // If no files were dispatched either then the batch size wasn't large enough to catch all of the files at the next lowest non-dispatched + // version, so increase the batch size. + if(i == 0) { + batchSize *= 2; + decision = "increased_batch_size"; + } + else + decision = "all_files_were_empty"; TraceEvent("FileRestoreDispatch") + .detail("UID", restore.getUid()) .detail("BeginVersion", beginVersion) .detail("BeginFile", Params.beginFile().get(task)) .detail("BeginBlock", Params.beginBlock().get(task)) .detail("EndVersion", endVersion) .detail("ApplyLag", applyLag) .detail("BatchSize", batchSize) - .detail("Decision", "dispatched_files") - .detail("FilesDispatched", i) - .detail("BlocksDispatched", blocksDispatched) + .detail("Decision", decision) .detail("TaskInstance", (uint64_t)this) .detail("RemainingInBatch", remainingInBatch); - } catch(Error &e) { - state Error err = e; - Void _ = wait(restore.logError(tr->getDatabase(), e, "RestoreDispatch: Unexpected error.", this)); - throw err; + Void _ = wait(success(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, endVersion, beginFile, beginBlock, batchSize, remainingInBatch, TaskCompletionKey::joinWith((allPartsDone))))); + + // If adding to existing batch then task is joined with a batch future so set done future. + // Note that this must be done after joining at least one task with the batch future in case all other blockers already finished. + Future setDone = addingToExistingBatch ? onDone->set(tr, taskBucket) : Void(); + + Void _ = wait(setDone && taskBucket->finish(tr, task)); + + return Void(); } + // If adding to existing batch then task is joined with a batch future so set done future. + Future setDone = addingToExistingBatch ? onDone->set(tr, taskBucket) : Void(); + + // Increment the number of blocks dispatched in the restore config + restore.filesBlocksDispatched().atomicOp(tr, blocksDispatched, MutationRef::Type::AddValue); + + // If beginFile is not empty then we had to stop in the middle of a version (possibly within a file) so we cannot end + // the batch here because we do not know if we got all of the files and blocks from the last version queued, so + // make sure remainingInBatch is at least 1. + if(beginFile.size() != 0) + remainingInBatch = std::max(1, remainingInBatch); + + // If more blocks need to be dispatched in this batch then add a follow-on task that is part of the allPartsDone group which will won't wait + // to run and will add more block tasks. + if(remainingInBatch > 0) + addTaskFutures.push_back(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, endVersion, beginFile, beginBlock, batchSize, remainingInBatch, TaskCompletionKey::joinWith(allPartsDone))); + else // Otherwise, add a follow-on task to continue after all previously dispatched blocks are done + addTaskFutures.push_back(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, endVersion, beginFile, beginBlock, batchSize, 0, TaskCompletionKey::noSignal(), allPartsDone)); + + Void _ = wait(setDone && waitForAll(addTaskFutures) && taskBucket->finish(tr, task)); + + TraceEvent("FileRestoreDispatch") + .detail("BeginVersion", beginVersion) + .detail("BeginFile", Params.beginFile().get(task)) + .detail("BeginBlock", Params.beginBlock().get(task)) + .detail("EndVersion", endVersion) + .detail("ApplyLag", applyLag) + .detail("BatchSize", batchSize) + .detail("Decision", "dispatched_files") + .detail("FilesDispatched", i) + .detail("BlocksDispatched", blocksDispatched) + .detail("TaskInstance", (uint64_t)this) + .detail("RemainingInBatch", remainingInBatch); + return Void(); }