FastRestoreAgent:Use atomicParallelRestore to kick off restore

Replace the handcrafted version with atomicParallelRestore actor
which is simulation tested
This commit is contained in:
Meng Xu 2020-04-27 22:07:45 -07:00
parent 28ce9fe736
commit f5e8345496
3 changed files with 45 additions and 109 deletions

View File

@ -2192,8 +2192,7 @@ ACTOR Future<Void> runRestore(Database db, std::string originalClusterFile, std:
// Fast restore agent that kicks off the restore: send restore requests to restore workers.
ACTOR Future<Void> runFastRestoreAgent(Database db, std::string tagName, std::string container,
Standalone<VectorRef<KeyRangeRef>> ranges, Version dbVersion,
bool performRestore, bool verbose, bool waitForDone, std::string addPrefix,
std::string removePrefix) {
bool performRestore, bool verbose, bool waitForDone) {
try {
state FileBackupAgent backupAgent;
state Version restoreVersion = invalidVersion;
@ -2219,9 +2218,26 @@ ACTOR Future<Void> runFastRestoreAgent(Database db, std::string tagName, std::st
dbVersion = desc.maxRestorableVersion.get();
TraceEvent("FastRestoreAgent").detail("TargetRestoreVersion", dbVersion);
}
Version _restoreVersion = wait(fastRestore(db, KeyRef(tagName), KeyRef(container), waitForDone, dbVersion,
verbose, range, KeyRef(addPrefix), KeyRef(removePrefix)));
restoreVersion = _restoreVersion;
state UID randomUID = deterministicRandom()->randomUniqueID();
TraceEvent("FastRestoreAgent")
.detail("SubmitRestoreRequests", ranges.size())
.detail("RestoreUID", randomUID);
wait(backupAgent.submitParallelRestore(db, KeyRef(tagName), ranges, KeyRef(container), dbVersion, true,
randomUID));
if (waitForDone) {
// Wait for parallel restore to finish and unlock DB after that
TraceEvent("FastRestoreAgent").detail("BackupAndParallelRestore", "WaitForRestoreToFinish");
wait(backupAgent.parallelRestoreFinish(db, randomUID));
TraceEvent("FastRestoreAgent").detail("BackupAndParallelRestore", "RestoreFinished");
} else {
TraceEvent("FastRestoreAgent")
.detail("RestoreUID", randomUID)
.detail("OperationGuide", "Manually unlock DB when restore finishes");
printf("WARNING: DB will be in locked state after restore. Need UID:%s to unlock DB\n",
randomUID.toString());
}
restoreVersion = dbVersion;
} else {
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(container);
state BackupDescription description = wait(bc->describeBackup());
@ -3740,7 +3756,7 @@ int main(int argc, char* argv[]) {
switch (restoreType) {
case RESTORE_START:
f = stopAfter(runFastRestoreAgent(db, tagName, restoreContainer, backupKeys, restoreVersion, !dryRun,
!quietDisplay, waitForDone, addPrefix, removePrefix));
!quietDisplay, waitForDone));
break;
case RESTORE_WAIT:
printf("[TODO][ERROR] FastRestore does not support RESTORE_WAIT yet!\n");
@ -3887,102 +3903,3 @@ int main(int argc, char* argv[]) {
flushAndExit(status);
}
//------Restore Agent: Kick off the restore by sending the restore requests
ACTOR static Future<FileBackupAgent::ERestoreState> waitFastRestore(Database cx, Key tagName, bool verbose) {
// We should wait on all restore to finish before proceeds
TraceEvent("FastRestore").detail("Progress", "WaitForRestoreToFinish");
state ReadYourWritesTransaction tr(cx);
state Future<Void> fRestoreRequestDone;
state bool restoreRequestDone = false;
loop {
try {
tr.reset();
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
// In case restoreRequestDoneKey is already set before we set watch on it
Optional<Value> restoreRequestDoneKeyValue = wait(tr.get(restoreRequestDoneKey));
if (restoreRequestDoneKeyValue.present()) {
restoreRequestDone = true;
tr.clear(restoreRequestDoneKey);
wait(tr.commit());
break;
} else if (!restoreRequestDone) {
fRestoreRequestDone = tr.watch(restoreRequestDoneKey);
wait(tr.commit());
wait(fRestoreRequestDone);
} else {
break;
}
} catch (Error& e) {
wait(tr.onError(e));
}
}
TraceEvent("FastRestore").detail("Progress", "RestoreFinished");
return FileBackupAgent::ERestoreState::COMPLETED;
}
ACTOR static Future<Version> _fastRestore(Database cx, Key tagName, Key url, bool waitForComplete,
Version targetVersion, bool verbose, KeyRange range, Key addPrefix,
Key removePrefix) {
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(url.toString());
state BackupDescription desc = wait(bc->describeBackup());
wait(desc.resolveVersionTimes(cx));
if (targetVersion == invalidVersion && desc.maxRestorableVersion.present())
targetVersion = desc.maxRestorableVersion.get();
Optional<RestorableFileSet> restoreSet = wait(bc->getRestoreSet(targetVersion));
TraceEvent("FastRestore").detail("BackupDesc", desc.toString()).detail("TargetVersion", targetVersion);
if (!restoreSet.present()) {
TraceEvent(SevWarn, "FileBackupAgentRestoreNotPossible")
.detail("BackupContainer", bc->getURL())
.detail("TargetVersion", targetVersion);
throw restore_invalid_version();
}
// NOTE: The restore agent makes sure we only support 1 restore range for each restore request for now!
// The simulation test did test restoring multiple restore ranges in one restore request though.
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state int restoreIndex = 0;
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
Standalone<StringRef> restoreTag(tagName.toString() + "_" + std::to_string(restoreIndex));
bool locked = true;
struct RestoreRequest restoreRequest(restoreIndex, restoreTag, KeyRef(bc->getURL()), true, targetVersion,
true, range, Key(), Key(), locked,
deterministicRandom()->randomUniqueID());
tr->set(restoreRequestKeyFor(restoreRequest.index), restoreRequestValue(restoreRequest));
// backupRanges.size = 1 because we only support restoring 1 range in real mode for now
tr->set(restoreRequestTriggerKey, restoreRequestTriggerValue(deterministicRandom()->randomUniqueID(),1));
wait(tr->commit()); // Trigger fast restore
break;
} catch (Error& e) {
if (e.code() != error_code_restore_duplicate_tag) {
wait(tr->onError(e));
}
}
}
if (waitForComplete) {
FileBackupAgent::ERestoreState finalState = wait(waitFastRestore(cx, tagName, verbose));
if (finalState != FileBackupAgent::ERestoreState::COMPLETED) throw restore_error();
}
return targetVersion;
}
ACTOR Future<Version> fastRestore(Database cx, Standalone<StringRef> tagName, Standalone<StringRef> url,
bool waitForComplete, long targetVersion, bool verbose, Standalone<KeyRangeRef> range,
Standalone<StringRef> addPrefix, Standalone<StringRef> removePrefix) {
Version result =
wait(_fastRestore(cx, tagName, url, waitForComplete, targetVersion, verbose, range, addPrefix, removePrefix));
return result;
}

View File

@ -893,10 +893,6 @@ public:
}
};
ACTOR Future<Version> fastRestore(Database cx, Standalone<StringRef> tagName, Standalone<StringRef> url,
bool waitForComplete, long targetVersion, bool verbose, Standalone<KeyRangeRef> range,
Standalone<StringRef> addPrefix, Standalone<StringRef> removePrefix);
// Helper class for reading restore data from a buffer and throwing the right errors.
struct StringRefReader {
StringRefReader(StringRef s = StringRef(), Error e = Error()) : rptr(s.begin()), end(s.end()), failure_error(e) {}

View File

@ -3628,6 +3628,29 @@ public:
ACTOR static Future<Void> submitParallelRestore(Database cx, Key backupTag,
Standalone<VectorRef<KeyRangeRef>> backupRanges, KeyRef bcUrl,
Version targetVersion, bool lockDB, UID randomUID) {
// Sanity check backup is valid
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(bcUrl.toString());
state BackupDescription desc = wait(bc->describeBackup());
wait(desc.resolveVersionTimes(cx));
Optional<RestorableFileSet> restoreSet = wait(bc->getRestoreSet(targetVersion));
if (!restoreSet.present()) {
TraceEvent(SevWarn, "FileBackupAgentRestoreNotPossible")
.detail("BackupContainer", bc->getURL())
.detail("TargetVersion", targetVersion);
throw restore_invalid_version();
}
if (targetVersion == invalidVersion && desc.maxRestorableVersion.present()) {
targetVersion = desc.maxRestorableVersion.get();
TraceEvent(SevWarn, "FastRestoreSubmitRestoreRequestWithInvalidTargetVersion")
.detail("OverrideTargetVersion", targetVersion);
}
TraceEvent("FastRestoreSubmitRestoreRequest")
.detail("BackupDesc", desc.toString())
.detail("TargetVersion", targetVersion);
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state int restoreIndex = 0;
state int numTries = 0;