Improved behavior of slow writes during backup. KeyRange and Log backup tasks now use TaskBucket::saveAndExtend() to keep the task alive until flushing the file finishes or fails with an error (blob uploads fail after a limited number of retries). This prevents blob uploads from being retried too often if the destination is slow since a task abort and retry would start the backoff counters back at zero. Also removed a debugging behavior that was accidentally checked in.

This commit is contained in:
Stephen Atherton 2017-10-01 16:01:24 -07:00
parent a098919b20
commit a95107417f
3 changed files with 43 additions and 19 deletions

View File

@ -446,6 +446,29 @@ FileBackupAgent::FileBackupAgent()
namespace fileBackup {
// Try to save and extend task repeatedly until it fails or f is ready (or throws)
// In case the task was started or saveAndExtend'd recently, firstSaveAndExtendTimestamp can be used to indicate
// when the first saveAndExtend should be done.
ACTOR static Future<Void> saveAndExtendIncrementally(Database cx, Reference<TaskBucket> taskBucket, Reference<Task> task, Future<Void> f, double firstSaveAndExtendTimestamp = 0) {
// delaySeconds is half of the taskBucket task timeout.
state double delaySeconds = 0.5 * taskBucket->getTimeoutSeconds();
state Future<Void> timeout = delayUntil(firstSaveAndExtendTimestamp);
loop {
choose {
when(Void _ = wait(f)) {
break;
}
when(Void _ = wait(timeout)) {
bool keepGoing = wait(taskBucket->saveAndExtend(cx, task));
if(!keepGoing)
throw timed_out();
timeout = delay(delaySeconds);
}
}
}
return Void();
}
ACTOR static Future<Void> writeString(Reference<IAsyncFile> file, Standalone<StringRef> s, int64_t *pOffset) {
state uint32_t lenBuf = bigEndian32((uint32_t)s.size());
Void _ = wait(file->write(&lenBuf, sizeof(lenBuf), *pOffset));
@ -1088,13 +1111,12 @@ namespace fileBackup {
if (outFile){
TEST(true); // Backup range task wrote multiple versions
bool isFinished = wait(taskBucket->isFinished(cx, task));
if (isFinished){
Void _ = wait(truncateCloseFile(cx, task->params[FileBackupAgent::keyErrors], task->params[FileBackupAgent::keyBackupContainer].toString(), outFileName, outFile));
return Void();
}
state Key nextKey = keyAfter(lastKey);
Void _ = wait(endKeyRangeFile(cx, task->params[FileBackupAgent::keyErrors], &rangeFile, task->params[FileBackupAgent::keyBackupContainer].toString(), &outFileName, nextKey, outVersion));
Void _ = wait(saveAndExtendIncrementally(cx, taskBucket, task,
endKeyRangeFile(cx, task->params[FileBackupAgent::keyErrors], &rangeFile, task->params[FileBackupAgent::keyBackupContainer].toString(), &outFileName, nextKey, outVersion),
timeout
)
);
// outFileName has now been modified to be the file's final (non temporary) name.
bool keepGoing = wait(recordRangeFile(backup, cx, task, taskBucket, KeyRangeRef(beginKey, nextKey), outFileName));
@ -1138,13 +1160,12 @@ namespace fileBackup {
if (err.code() == error_code_end_of_stream) {
if (outFile) {
bool isFinished = wait(taskBucket->isFinished(cx, task));
if (isFinished){
Void _ = wait(truncateCloseFile(cx, task->params[FileBackupAgent::keyErrors], task->params[FileBackupAgent::keyBackupContainer].toString(), outFileName, outFile));
return Void();
}
try {
Void _ = wait(endKeyRangeFile(cx, task->params[FileBackupAgent::keyErrors], &rangeFile, task->params[FileBackupAgent::keyBackupContainer].toString(), &outFileName, endKey, outVersion));
Void _ = wait(saveAndExtendIncrementally(cx, taskBucket, task,
endKeyRangeFile(cx, task->params[FileBackupAgent::keyErrors], &rangeFile, task->params[FileBackupAgent::keyBackupContainer].toString(), &outFileName, endKey, outVersion),
timeout
)
);
// outFileName has now been modified to be the file's final (non temporary) name.
bool keepGoing = wait(recordRangeFile(backup, cx, task, taskBucket, KeyRangeRef(beginKey, endKey), outFileName));
@ -1390,7 +1411,11 @@ namespace fileBackup {
task->params[BackupLogRangeTaskFunc::keyFileSize] = BinaryWriter::toValue<int64_t>(logFile.offset, Unversioned());
std::string logFileName = FileBackupAgent::getLogFilename(beginVersion, endVersion, logFile.offset, logFile.blockSize);
Void _ = wait(endLogFile(cx, taskBucket, futureBucket, task, outFile, tempFileName, logFileName, logFile.offset));
Void _ = wait(saveAndExtendIncrementally(cx, taskBucket, task,
endLogFile(cx, task, outFile, tempFileName, logFileName, logFile.offset),
timeout
)
);
return Void();
}
@ -1468,17 +1493,13 @@ namespace fileBackup {
return Void();
}
ACTOR static Future<Void> endLogFile(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task, Reference<IAsyncFile> tempFile, std::string tempFileName, std::string logFileName, int64_t size) {
ACTOR static Future<Void> endLogFile(Database cx, Reference<Task> task, Reference<IAsyncFile> tempFile, std::string tempFileName, std::string logFileName, int64_t size) {
state std::string backupContainer = task->params[FileBackupAgent::keyBackupContainer].toString();
try {
if (tempFile) {
Void _ = wait(truncateCloseFile(cx, task->params[FileBackupAgent::keyErrors], backupContainer, logFileName, tempFile, size));
}
bool isFinished = wait(taskBucket->isFinished(cx, task));
if (isFinished)
return Void();
Void _ = wait(IBackupContainer::openContainer(backupContainer)->renameFile(tempFileName, logFileName));
}
catch (Error &e) {

View File

@ -157,6 +157,9 @@ public:
Database src;
Map<Key, Future<Reference<KeyRangeMap<Version>>>> key_version;
double getTimeoutSeconds() const {
return (double)timeout / CLIENT_KNOBS->CORE_VERSIONSPERSECOND;
}
private:
friend class TaskBucketImpl;

View File

@ -688,7 +688,7 @@ ACTOR Future<std::string> uploadPart_impl(Reference<BlobStoreEndpoint> bstore, s
HTTP::Headers headers;
// Send MD5 sum for content so blobstore can verify it
headers["Content-MD5"] = contentMD5;
state Reference<HTTP::Response> r; // = wait(bstore->doRequest("PUT", resource, headers, pContent, contentLen));
state Reference<HTTP::Response> r = wait(bstore->doRequest("PUT", resource, headers, pContent, contentLen));
// For uploads, Blobstore returns an MD5 sum of uploaded content so check that too.
auto sum = r->headers.find("Content-MD5");