Refactor submitParallelRestore function into FileBackupAgent
This commit is contained in:
parent
5584884c12
commit
81f7181c9e
|
@ -276,7 +276,9 @@ public:
|
|||
static StringRef restoreStateText(ERestoreState id);
|
||||
|
||||
// parallel restore
|
||||
Future<Version> parallelRestoreFinish(Database cx);
|
||||
Future<Void> parallelRestoreFinish(Database cx);
|
||||
Future<Void> submitParallelRestore(Database cx, Key backupTag, Standalone<VectorRef<KeyRangeRef>> backupRanges,
|
||||
KeyRef bcUrl, Version targetVersion, bool locked);
|
||||
|
||||
// restore() will
|
||||
// - make sure that url is readable and appears to be a complete backup
|
||||
|
|
|
@ -4543,12 +4543,41 @@ Future<Void> FileBackupAgent::parallelRestoreFinish(Database cx) {
|
|||
break;
|
||||
}
|
||||
} catch (Error& e) {
|
||||
wait(tr2.onError(e));
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> FileBackupAgent::submitParallelRestore(Database cx, Key backupTag,
|
||||
Standalone<VectorRef<KeyRangeRef>> backupRanges, KeyRef bcUrl,
|
||||
Version targetVersion, bool locked) {
|
||||
loop {
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
state int restoreIndex = 0;
|
||||
tr.reset();
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
try {
|
||||
// Note: we always lock DB here in case DB is modified at the bacupRanges boundary.
|
||||
for (restoreIndex = 0; restoreIndex < backupRanges.size(); restoreIndex++) {
|
||||
auto range = backupRanges[restoreIndex];
|
||||
Standalone<StringRef> restoreTag(backupTag.toString() + "_" + std::to_string(restoreIndex));
|
||||
// Register the request request in DB, which will be picked up by restore worker leader
|
||||
struct RestoreRequest restoreRequest(restoreIndex, restoreTag, bcUrl, true, targetVersion, true, range,
|
||||
Key(), Key(), locked, deterministicRandom()->randomUniqueID());
|
||||
tr.set(restoreRequestKeyFor(restoreRequest.index), restoreRequestValue(restoreRequest));
|
||||
}
|
||||
tr.set(restoreRequestTriggerKey,
|
||||
restoreRequestTriggerValue(deterministicRandom()->randomUniqueID(), backupRanges.size()));
|
||||
wait(tr.commit()); // Trigger restore
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Future<Version> FileBackupAgent::restore(Database cx, Optional<Database> cxOrig, Key tagName, Key url, Standalone<VectorRef<KeyRangeRef>> ranges, bool waitForComplete, Version targetVersion, bool verbose, Key addPrefix, Key removePrefix, bool lockDB) {
|
||||
return FileBackupAgentImpl::restore(this, cx, cxOrig, tagName, url, ranges, waitForComplete, targetVersion, verbose, addPrefix, removePrefix, lockDB, deterministicRandom()->randomUniqueID());
|
||||
}
|
||||
|
|
|
@ -447,35 +447,10 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
|||
state std::vector<Future<Version>> restores;
|
||||
state std::vector<Standalone<StringRef>> restoreTags;
|
||||
|
||||
// Restore each range by calling backupAgent.restore()
|
||||
// Submit parallel restore requests
|
||||
TraceEvent("FastRestore").detail("PrepareRestores", self->backupRanges.size());
|
||||
loop {
|
||||
state ReadYourWritesTransaction tr1(cx);
|
||||
tr1.reset();
|
||||
tr1.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr1.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
try {
|
||||
// Note: we always lock DB here in case DB is modified at the bacupRanges boundary.
|
||||
for (restoreIndex = 0; restoreIndex < self->backupRanges.size(); restoreIndex++) {
|
||||
auto range = self->backupRanges[restoreIndex];
|
||||
Standalone<StringRef> restoreTag(self->backupTag.toString() + "_" +
|
||||
std::to_string(restoreIndex));
|
||||
restoreTags.push_back(restoreTag);
|
||||
// Register the request request in DB, which will be picked up by restore worker leader
|
||||
struct RestoreRequest restoreRequest(
|
||||
restoreIndex, restoreTag, KeyRef(lastBackupContainer->getURL()), true, targetVersion,
|
||||
true, range, Key(), Key(), self->locked, deterministicRandom()->randomUniqueID());
|
||||
tr1.set(restoreRequestKeyFor(restoreRequest.index), restoreRequestValue(restoreRequest));
|
||||
}
|
||||
tr1.set(restoreRequestTriggerKey,
|
||||
restoreRequestTriggerValue(deterministicRandom()->randomUniqueID(),
|
||||
self->backupRanges.size()));
|
||||
wait(tr1.commit()); // Trigger restore
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr1.onError(e));
|
||||
}
|
||||
};
|
||||
wait(backupAgent.submitParallelRestore(cx, self->backupTag, self->backupRanges,
|
||||
KeyRef(lastBackupContainer->getURL()), targetVersion, self->locked));
|
||||
TraceEvent("FastRestore").detail("TriggerRestore", "Setting up restoreRequestTriggerKey");
|
||||
|
||||
// Sometimes kill and restart the restore
|
||||
|
@ -503,7 +478,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
// We should wait on all restore before proceeds
|
||||
// Wait for parallel restore to finish before we can proceed
|
||||
TraceEvent("FastRestore").detail("BackupAndParallelRestore", "WaitForRestoreToFinish");
|
||||
wait(backupAgent.parallelRestoreFinish(cx));
|
||||
TraceEvent("FastRestore").detail("BackupAndParallelRestore", "RestoreFinished");
|
||||
|
|
Loading…
Reference in New Issue