Refactor parallelRestoreFinish function into FileBackupAgent
This commit is contained in:
parent
4e477f5489
commit
5584884c12
|
@ -275,6 +275,9 @@ public:
|
|||
enum ERestoreState { UNITIALIZED = 0, QUEUED = 1, STARTING = 2, RUNNING = 3, COMPLETED = 4, ABORTED = 5 };
|
||||
static StringRef restoreStateText(ERestoreState id);
|
||||
|
||||
// parallel restore
|
||||
Future<Version> parallelRestoreFinish(Database cx);
|
||||
|
||||
// restore() will
|
||||
// - make sure that url is readable and appears to be a complete backup
|
||||
// - make sure the requested TargetVersion is valid
|
||||
|
|
|
@ -4518,6 +4518,37 @@ const std::string BackupAgentBase::defaultTagName = "default";
|
|||
const int BackupAgentBase::logHeaderSize = 12;
|
||||
const int FileBackupAgent::dataFooterSize = 20;
|
||||
|
||||
// Return if parallel restore has finished
|
||||
Future<Void> FileBackupAgent::parallelRestoreFinish(Database cx) {
|
||||
state bool restoreDone = false;
|
||||
state Future<Void> watchForRestoreRequestDone;
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
if (restoreDone) break;
|
||||
tr.reset();
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Optional<Value> restoreRequestDoneKeyValue = wait(tr.get(restoreRequestDoneKey));
|
||||
// Restore may finish before restoreAgent waits on the restore finish event.
|
||||
if (restoreRequestDoneKeyValue.present()) {
|
||||
restoreDone = true; // In case commit clears the key but in unknown_state
|
||||
tr.clear(restoreRequestDoneKey);
|
||||
wait(tr.commit());
|
||||
break;
|
||||
} else {
|
||||
watchForRestoreRequestDone = tr.watch(restoreRequestDoneKey);
|
||||
wait(tr.commit());
|
||||
wait(watchForRestoreRequestDone);
|
||||
break;
|
||||
}
|
||||
} catch (Error& e) {
|
||||
wait(tr2.onError(e));
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Version> FileBackupAgent::restore(Database cx, Optional<Database> cxOrig, Key tagName, Key url, Standalone<VectorRef<KeyRangeRef>> ranges, bool waitForComplete, Version targetVersion, bool verbose, Key addPrefix, Key removePrefix, bool lockDB) {
|
||||
return FileBackupAgentImpl::restore(this, cx, cxOrig, tagName, url, ranges, waitForComplete, targetVersion, verbose, addPrefix, removePrefix, lockDB, deterministicRandom()->randomUniqueID());
|
||||
}
|
||||
|
|
|
@ -505,32 +505,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
|||
|
||||
// We should wait on all restore before proceeds
|
||||
TraceEvent("FastRestore").detail("BackupAndParallelRestore", "WaitForRestoreToFinish");
|
||||
restoreDone = false;
|
||||
state Future<Void> watchForRestoreRequestDone;
|
||||
loop {
|
||||
try {
|
||||
if (restoreDone) break;
|
||||
tr2.reset();
|
||||
tr2.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr2.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Optional<Value> restoreRequestDoneKeyValue = wait(tr2.get(restoreRequestDoneKey));
|
||||
// Restore may finish before restoreAgent waits on the restore finish event.
|
||||
if (restoreRequestDoneKeyValue.present()) {
|
||||
restoreDone = true; // In case commit clears the key but in unknown_state
|
||||
tr2.clear(restoreRequestDoneKey);
|
||||
wait(tr2.commit());
|
||||
break;
|
||||
} else {
|
||||
watchForRestoreRequestDone = tr2.watch(restoreRequestDoneKey);
|
||||
wait(tr2.commit());
|
||||
wait(watchForRestoreRequestDone);
|
||||
break;
|
||||
}
|
||||
} catch (Error& e) {
|
||||
wait(tr2.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
wait(backupAgent.parallelRestoreFinish(cx));
|
||||
TraceEvent("FastRestore").detail("BackupAndParallelRestore", "RestoreFinished");
|
||||
|
||||
for (auto& restore : restores) {
|
||||
|
|
Loading…
Reference in New Issue