diff --git a/fdbclient/FileBackupAgent.actor.cpp b/fdbclient/FileBackupAgent.actor.cpp index 58957ce8bd..7fbde3d4e2 100644 --- a/fdbclient/FileBackupAgent.actor.cpp +++ b/fdbclient/FileBackupAgent.actor.cpp @@ -446,6 +446,29 @@ FileBackupAgent::FileBackupAgent() namespace fileBackup { + // Try to save and extend task repeatedly until it fails or f is ready (or throws) + // In case the task was started or saveAndExtend'd recently, firstSaveAndExtendTimestamp can be used to indicate + // when the first saveAndExtend should be done. + ACTOR static Future saveAndExtendIncrementally(Database cx, Reference taskBucket, Reference task, Future f, double firstSaveAndExtendTimestamp = 0) { + // delaySeconds is half of the taskBucket task timeout. + state double delaySeconds = 0.5 * taskBucket->getTimeoutSeconds(); + state Future timeout = delayUntil(firstSaveAndExtendTimestamp); + loop { + choose { + when(Void _ = wait(f)) { + break; + } + when(Void _ = wait(timeout)) { + bool keepGoing = wait(taskBucket->saveAndExtend(cx, task)); + if(!keepGoing) + throw timed_out(); + timeout = delay(delaySeconds); + } + } + } + return Void(); + } + ACTOR static Future writeString(Reference file, Standalone s, int64_t *pOffset) { state uint32_t lenBuf = bigEndian32((uint32_t)s.size()); Void _ = wait(file->write(&lenBuf, sizeof(lenBuf), *pOffset)); @@ -1088,13 +1111,12 @@ namespace fileBackup { if (outFile){ TEST(true); // Backup range task wrote multiple versions - bool isFinished = wait(taskBucket->isFinished(cx, task)); - if (isFinished){ - Void _ = wait(truncateCloseFile(cx, task->params[FileBackupAgent::keyErrors], task->params[FileBackupAgent::keyBackupContainer].toString(), outFileName, outFile)); - return Void(); - } state Key nextKey = keyAfter(lastKey); - Void _ = wait(endKeyRangeFile(cx, task->params[FileBackupAgent::keyErrors], &rangeFile, task->params[FileBackupAgent::keyBackupContainer].toString(), &outFileName, nextKey, outVersion)); + Void _ = wait(saveAndExtendIncrementally(cx, taskBucket, task, + endKeyRangeFile(cx, task->params[FileBackupAgent::keyErrors], &rangeFile, task->params[FileBackupAgent::keyBackupContainer].toString(), &outFileName, nextKey, outVersion), + timeout + ) + ); // outFileName has now been modified to be the file's final (non temporary) name. bool keepGoing = wait(recordRangeFile(backup, cx, task, taskBucket, KeyRangeRef(beginKey, nextKey), outFileName)); @@ -1138,13 +1160,12 @@ namespace fileBackup { if (err.code() == error_code_end_of_stream) { if (outFile) { - bool isFinished = wait(taskBucket->isFinished(cx, task)); - if (isFinished){ - Void _ = wait(truncateCloseFile(cx, task->params[FileBackupAgent::keyErrors], task->params[FileBackupAgent::keyBackupContainer].toString(), outFileName, outFile)); - return Void(); - } try { - Void _ = wait(endKeyRangeFile(cx, task->params[FileBackupAgent::keyErrors], &rangeFile, task->params[FileBackupAgent::keyBackupContainer].toString(), &outFileName, endKey, outVersion)); + Void _ = wait(saveAndExtendIncrementally(cx, taskBucket, task, + endKeyRangeFile(cx, task->params[FileBackupAgent::keyErrors], &rangeFile, task->params[FileBackupAgent::keyBackupContainer].toString(), &outFileName, endKey, outVersion), + timeout + ) + ); // outFileName has now been modified to be the file's final (non temporary) name. bool keepGoing = wait(recordRangeFile(backup, cx, task, taskBucket, KeyRangeRef(beginKey, endKey), outFileName)); @@ -1390,7 +1411,11 @@ namespace fileBackup { task->params[BackupLogRangeTaskFunc::keyFileSize] = BinaryWriter::toValue(logFile.offset, Unversioned()); std::string logFileName = FileBackupAgent::getLogFilename(beginVersion, endVersion, logFile.offset, logFile.blockSize); - Void _ = wait(endLogFile(cx, taskBucket, futureBucket, task, outFile, tempFileName, logFileName, logFile.offset)); + Void _ = wait(saveAndExtendIncrementally(cx, taskBucket, task, + endLogFile(cx, task, outFile, tempFileName, logFileName, logFile.offset), + timeout + ) + ); return Void(); } @@ -1468,17 +1493,13 @@ namespace fileBackup { return Void(); } - ACTOR static Future endLogFile(Database cx, Reference taskBucket, Reference futureBucket, Reference task, Reference tempFile, std::string tempFileName, std::string logFileName, int64_t size) { + ACTOR static Future endLogFile(Database cx, Reference task, Reference tempFile, std::string tempFileName, std::string logFileName, int64_t size) { state std::string backupContainer = task->params[FileBackupAgent::keyBackupContainer].toString(); try { if (tempFile) { Void _ = wait(truncateCloseFile(cx, task->params[FileBackupAgent::keyErrors], backupContainer, logFileName, tempFile, size)); } - bool isFinished = wait(taskBucket->isFinished(cx, task)); - if (isFinished) - return Void(); - Void _ = wait(IBackupContainer::openContainer(backupContainer)->renameFile(tempFileName, logFileName)); } catch (Error &e) { diff --git a/fdbclient/TaskBucket.h b/fdbclient/TaskBucket.h index e4f6d7e334..db558dfcae 100644 --- a/fdbclient/TaskBucket.h +++ b/fdbclient/TaskBucket.h @@ -157,6 +157,9 @@ public: Database src; Map>>> key_version; + double getTimeoutSeconds() const { + return (double)timeout / CLIENT_KNOBS->CORE_VERSIONSPERSECOND; + } private: friend class TaskBucketImpl; diff --git a/fdbrpc/BlobStore.actor.cpp b/fdbrpc/BlobStore.actor.cpp index dabc94641b..6cfb1c885f 100644 --- a/fdbrpc/BlobStore.actor.cpp +++ b/fdbrpc/BlobStore.actor.cpp @@ -688,7 +688,7 @@ ACTOR Future uploadPart_impl(Reference bstore, s HTTP::Headers headers; // Send MD5 sum for content so blobstore can verify it headers["Content-MD5"] = contentMD5; - state Reference r; // = wait(bstore->doRequest("PUT", resource, headers, pContent, contentLen)); + state Reference r = wait(bstore->doRequest("PUT", resource, headers, pContent, contentLen)); // For uploads, Blobstore returns an MD5 sum of uploaded content so check that too. auto sum = r->headers.find("Content-MD5");