Missed file from merge of master into backup-refactor

This commit is contained in:
Stephen Atherton 2017-10-12 11:04:11 -07:00 committed by Bhaskar Muppana
parent 11517f7bfc
commit 659e39103e
1 changed files with 49 additions and 26 deletions

View File

@ -134,7 +134,7 @@ public:
return Void();
}
TraceEvent(SevWarn, "FileRestoreError").error(e).detail("RestoreUID", uid).detail("Description", details).detail("TaskInstance", (uint64_t)taskInstance);
std::string msg = format("ERROR: %s %s", e.what(), details.c_str());
std::string msg = format("ERROR: %s (%s)", details.c_str(), e.what());
return lastError().set(cx, {msg, (int64_t)now()});
}
@ -311,6 +311,29 @@ FileBackupAgent::FileBackupAgent()
namespace fileBackup {
// Try to save and extend task repeatedly until it fails or f is ready (or throws)
// In case the task was started or saveAndExtend'd recently, firstSaveAndExtendTimestamp can be used to indicate
// when the first saveAndExtend should be done.
ACTOR static Future<Void> saveAndExtendIncrementally(Database cx, Reference<TaskBucket> taskBucket, Reference<Task> task, Future<Void> f, double firstSaveAndExtendTimestamp = 0) {
// delaySeconds is half of the taskBucket task timeout.
state double delaySeconds = 0.5 * taskBucket->getTimeoutSeconds();
state Future<Void> timeout = delayUntil(firstSaveAndExtendTimestamp);
loop {
choose {
when(Void _ = wait(f)) {
break;
}
when(Void _ = wait(timeout)) {
bool keepGoing = wait(taskBucket->saveAndExtend(cx, task));
if(!keepGoing)
throw timed_out();
timeout = delay(delaySeconds);
}
}
}
return Void();
}
ACTOR static Future<Void> writeString(Reference<IAsyncFile> file, Standalone<StringRef> s, int64_t *pOffset) {
state uint32_t lenBuf = bigEndian32((uint32_t)s.size());
Void _ = wait(file->write(&lenBuf, sizeof(lenBuf), *pOffset));
@ -637,7 +660,7 @@ namespace fileBackup {
}
catch (Error &e) {
state Error err = e;
Void _ = wait(config.logError(cx, e, format("ERROR: Failed to open file `%s' because of error %s", fileName.c_str(), err.what())));
Void _ = wait(config.logError(cx, e, format("ERROR: Failed to open file `%s' because of error: %s", fileName.c_str(), err.what())));
throw err;
}
}
@ -669,7 +692,7 @@ namespace fileBackup {
throw;
state Error err = e;
Void _ = wait(config.logError(cx, err, format("ERROR: Failed to write to file `%s' in container '%s' because of error %s", fileName.c_str(), backupContainer.c_str(), err.what())));
Void _ = wait(config.logError(cx, err, format("ERROR: Failed to write to file `%s' in container '%s' because of error: %s", fileName.c_str(), backupContainer.c_str(), err.what())));
throw err;
}
@ -751,7 +774,7 @@ namespace fileBackup {
throw;
state Error err = e;
Void _ = wait(config.logError(tr->getDatabase(), err, format("ERROR: Failed to write to file `%s' because of error %s", filename.c_str(), err.what())));
Void _ = wait(config.logError(tr->getDatabase(), err, format("ERROR: Failed to write to file `%s' because of error: %s", filename.c_str(), err.what())));
throw err;
}
@ -973,13 +996,12 @@ namespace fileBackup {
if (outFile){
TEST(true); // Backup range task wrote multiple versions
bool isFinished = wait(taskBucket->isFinished(cx, task));
if (isFinished){
Void _ = wait(truncateCloseFile(cx, backup, backupContainer, outFileName, outFile));
return Void();
}
state Key nextKey = keyAfter(lastKey);
Void _ = wait(endKeyRangeFile(cx, backup, &rangeFile, backupContainer, &outFileName, nextKey, outVersion));
Void _ = wait(saveAndExtendIncrementally(cx, taskBucket, task,
endKeyRangeFile(cx, backup, &rangeFile, backupContainer, &outFileName, endKey, outVersion),
timeout // time at which to do the first saveAndExtend
)
);
// outFileName has now been modified to be the file's final (non temporary) name.
bool keepGoing = wait(recordRangeFile(backup, cx, task, taskBucket, KeyRangeRef(beginKey, nextKey), outFileName));
@ -1023,13 +1045,12 @@ namespace fileBackup {
if (err.code() == error_code_end_of_stream) {
if (outFile) {
bool isFinished = wait(taskBucket->isFinished(cx, task));
if (isFinished){
Void _ = wait(truncateCloseFile(cx, backup, backupContainer, outFileName, outFile));
return Void();
}
try {
Void _ = wait(endKeyRangeFile(cx, backup, &rangeFile, backupContainer, &outFileName, endKey, outVersion));
Void _ = wait(saveAndExtendIncrementally(cx, taskBucket, task,
endKeyRangeFile(cx, backup, &rangeFile, backupContainer, &outFileName, endKey, outVersion),
timeout // time at which to do the first saveAndExtend
)
);
// outFileName has now been modified to be the file's final (non temporary) name.
bool keepGoing = wait(recordRangeFile(backup, cx, task, taskBucket, KeyRangeRef(beginKey, endKey), outFileName));
@ -1043,7 +1064,7 @@ namespace fileBackup {
throw e2;
}
Void _ = wait(backup.logError(cx, e2, format("ERROR: Failed to write to file `%s' because of error %s", outFileName.c_str(), e2.what())));
Void _ = wait(backup.logError(cx, e2, format("ERROR: Failed to write to file `%s' because of error: %s", outFileName.c_str(), e2.what())));
throw e2;
}
}
@ -1051,7 +1072,7 @@ namespace fileBackup {
return Void();
}
Void _ = wait(backup.logError(cx, err, format("ERROR: Failed to write to file `%s' because of error %s", outFileName.c_str(), err.what())));
Void _ = wait(backup.logError(cx, err, format("ERROR: Failed to write to file `%s' because of error: %s", outFileName.c_str(), err.what())));
throw err;
}
@ -1204,7 +1225,8 @@ namespace fileBackup {
}
state Error err = e;
Void _ = wait(config.logError(cx, err, format("ERROR: Failed to write to file `%s' because of error %s", fileName.c_str(), err.what())));
Void _ = wait(config.logError(cx, err, format("ERROR: Failed to write to file `%s' because of error: %s", fileName.c_str(), err.what())));
throw err;
}
@ -1286,7 +1308,12 @@ namespace fileBackup {
Params.fileSize().set(task, logFile.offset);
std::string logFileName = FileBackupAgent::getLogFilename(beginVersion, endVersion, logFile.offset, logFile.blockSize);
Void _ = wait(endLogFile(cx, taskBucket, futureBucket, task, outFile, tempFileName, logFileName, logFile.offset, backupContainer));
Void _ = wait(saveAndExtendIncrementally(cx, taskBucket, task,
endLogFile(cx, task, outFile, tempFileName, logFileName, logFile.offset, backupContainer),
timeout // time at which to do the first saveAndExtend
)
);
return Void();
}
@ -1360,16 +1387,12 @@ namespace fileBackup {
return Void();
}
ACTOR static Future<Void> endLogFile(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task, Reference<IAsyncFile> tempFile, std::string tempFileName, std::string logFileName, int64_t size, std::string backupContainer) {
ACTOR static Future<Void> endLogFile(Database cx, Reference<Task> task, Reference<IAsyncFile> tempFile, std::string tempFileName, std::string logFileName, int64_t size, std::string backupContainer) {
try {
if (tempFile) {
Void _ = wait(truncateCloseFile(cx, BackupConfig(task), backupContainer, logFileName, tempFile, size));
}
bool isFinished = wait(taskBucket->isFinished(cx, task));
if (isFinished)
return Void();
Void _ = wait(IBackupContainer::openContainer(backupContainer)->renameFile(tempFileName, logFileName));
}
catch (Error &e) {
@ -3242,7 +3265,7 @@ public:
try {
Void _ = wait(timeoutError(bc->create(), 30));
} catch(Error &e) {
fprintf(stderr, "ERROR: Could not create backup container: %s\n", e.what());
fprintf(stderr, "ERROR: Could not create backup container: %s\n", e.what());
throw backup_error();
}