Merge pull request #2855 from xumengpanda/mengxu/fr-api-atomicrestore-PR

Add ApiCorrectnessAtomicRestore workload for the new performant restore
This commit is contained in:
Jingyu Zhou 2020-03-25 18:05:26 -07:00 committed by GitHub
commit feedab02a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 237 additions and 124 deletions

View File

@ -3879,7 +3879,7 @@ ACTOR static Future<FileBackupAgent::ERestoreState> waitFastRestore(Database cx,
// We should wait on all restore to finish before proceeds
TraceEvent("FastRestore").detail("Progress", "WaitForRestoreToFinish");
state ReadYourWritesTransaction tr(cx);
state Future<Void> watchForRestoreRequestDone;
state Future<Void> fRestoreRequestDone;
state bool restoreRequestDone = false;
loop {
@ -3887,6 +3887,7 @@ ACTOR static Future<FileBackupAgent::ERestoreState> waitFastRestore(Database cx,
tr.reset();
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
// In case restoreRequestDoneKey is already set before we set watch on it
Optional<Value> restoreRequestDoneKeyValue = wait(tr.get(restoreRequestDoneKey));
if (restoreRequestDoneKeyValue.present()) {
@ -3894,12 +3895,13 @@ ACTOR static Future<FileBackupAgent::ERestoreState> waitFastRestore(Database cx,
tr.clear(restoreRequestDoneKey);
wait(tr.commit());
break;
} else {
watchForRestoreRequestDone = tr.watch(restoreRequestDoneKey);
} else if (!restoreRequestDone) {
fRestoreRequestDone = tr.watch(restoreRequestDoneKey);
wait(tr.commit());
wait(fRestoreRequestDone);
} else {
break;
}
// The clear transaction may fail in uncertain state, which may already clear the restoreRequestDoneKey
if (restoreRequestDone) break;
} catch (Error& e) {
wait(tr.onError(e));
}

View File

@ -275,6 +275,13 @@ public:
enum ERestoreState { UNITIALIZED = 0, QUEUED = 1, STARTING = 2, RUNNING = 3, COMPLETED = 4, ABORTED = 5 };
static StringRef restoreStateText(ERestoreState id);
// parallel restore
Future<Void> parallelRestoreFinish(Database cx, UID randomUID);
Future<Void> submitParallelRestore(Database cx, Key backupTag, Standalone<VectorRef<KeyRangeRef>> backupRanges,
KeyRef bcUrl, Version targetVersion, bool lockDB, UID randomUID);
Future<Void> atomicParallelRestore(Database cx, Key tagName, Standalone<VectorRef<KeyRangeRef>> ranges,
Key addPrefix, Key removePrefix);
// restore() will
// - make sure that url is readable and appears to be a complete backup
// - make sure the requested TargetVersion is valid

View File

@ -3557,6 +3557,112 @@ class FileBackupAgentImpl {
public:
static const int MAX_RESTORABLE_FILE_METASECTION_BYTES = 1024 * 8;
// Parallel restore
ACTOR static Future<Void> parallelRestoreFinish(Database cx, UID randomUID) {
state ReadYourWritesTransaction tr(cx);
state Future<Void> watchForRestoreRequestDone;
state bool restoreDone = false;
TraceEvent("FastRestoreAgentWaitForRestoreToFinish").detail("DBLock", randomUID);
loop {
try {
tr.reset();
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Optional<Value> restoreRequestDoneKeyValue = wait(tr.get(restoreRequestDoneKey));
// Restore may finish before restoreAgent waits on the restore finish event.
if (restoreRequestDoneKeyValue.present()) {
restoreDone = true; // In case commit clears the key but in unknown_state
tr.clear(restoreRequestDoneKey);
wait(tr.commit());
break;
} else if (!restoreDone) {
watchForRestoreRequestDone = tr.watch(restoreRequestDoneKey);
wait(tr.commit());
wait(watchForRestoreRequestDone);
} else {
break;
}
} catch (Error& e) {
wait(tr.onError(e));
}
}
TraceEvent("FastRestoreAgentRestoreFinished").detail("UnlockDBStart", randomUID);
try {
wait(unlockDatabase(cx, randomUID));
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) { // Should only happen in simulation
TraceEvent(SevWarnAlways, "FastRestoreAgentOnCancelingActor")
.detail("DBLock", randomUID)
.detail("ManualCheck", "Is DB locked");
} else {
TraceEvent(SevError, "FastRestoreAgentUnlockDBFailed")
.detail("DBLock", randomUID)
.detail("ErrorCode", e.code())
.detail("Error", e.what());
ASSERT_WE_THINK(false); // This unlockDatabase should always succeed, we think.
}
}
TraceEvent("FastRestoreAgentRestoreFinished").detail("UnlockDBFinish", randomUID);
return Void();
}
ACTOR static Future<Void> submitParallelRestore(Database cx, Key backupTag,
Standalone<VectorRef<KeyRangeRef>> backupRanges, KeyRef bcUrl,
Version targetVersion, bool lockDB, UID randomUID) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state int restoreIndex = 0;
state int numTries = 0;
// lock DB for restore
loop {
try {
if (lockDB) {
wait(lockDatabase(cx, randomUID));
}
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
wait(checkDatabaseLock(tr, randomUID));
TraceEvent("FastRestoreAgentSubmitRestoreRequests").detail("DBIsLocked", randomUID);
break;
} catch (Error& e) {
TraceEvent("FastRestoreAgentSubmitRestoreRequests").detail("CheckLockError", e.what());
TraceEvent(numTries > 50 ? SevError : SevWarnAlways, "FastRestoreMayFail")
.detail("Reason", "DB is not properly locked")
.detail("ExpectedLockID", randomUID);
numTries++;
wait(delay(5.0));
}
}
// set up restore request
loop {
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
try {
// Note: we always lock DB here in case DB is modified at the bacupRanges boundary.
for (restoreIndex = 0; restoreIndex < backupRanges.size(); restoreIndex++) {
auto range = backupRanges[restoreIndex];
Standalone<StringRef> restoreTag(backupTag.toString() + "_" + std::to_string(restoreIndex));
// Register the request request in DB, which will be picked up by restore worker leader
struct RestoreRequest restoreRequest(restoreIndex, restoreTag, bcUrl, true, targetVersion, true,
range, Key(), Key(), lockDB,
deterministicRandom()->randomUniqueID());
tr->set(restoreRequestKeyFor(restoreRequest.index), restoreRequestValue(restoreRequest));
}
tr->set(restoreRequestTriggerKey,
restoreRequestTriggerValue(deterministicRandom()->randomUniqueID(), backupRanges.size()));
wait(tr->commit()); // Trigger restore
break;
} catch (Error& e) {
wait(tr->onError(e));
}
}
return Void();
}
// This method will return the final status of the backup at tag, and return the URL that was used on the tag
// when that status value was read.
ACTOR static Future<int> waitBackup(FileBackupAgent* backupAgent, Database cx, std::string tagName, bool stopWhenDone, Reference<IBackupContainer> *pContainer = nullptr, UID *pUID = nullptr) {
@ -4325,7 +4431,9 @@ public:
//used for correctness only, locks the database before discontinuing the backup and that same lock is then used while doing the restore.
//the tagname of the backup must be the same as the restore.
ACTOR static Future<Version> atomicRestore(FileBackupAgent* backupAgent, Database cx, Key tagName, Standalone<VectorRef<KeyRangeRef>> ranges, Key addPrefix, Key removePrefix) {
ACTOR static Future<Version> atomicRestore(FileBackupAgent* backupAgent, Database cx, Key tagName,
Standalone<VectorRef<KeyRangeRef>> ranges, Key addPrefix,
Key removePrefix, bool fastRestore) {
state Reference<ReadYourWritesTransaction> ryw_tr = Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(cx));
state BackupConfig backupConfig;
loop {
@ -4402,6 +4510,7 @@ public:
ryw_tr->reset();
loop {
try {
ryw_tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
ryw_tr->setOption(FDBTransactionOptions::LOCK_AWARE);
@ -4419,9 +4528,30 @@ public:
Reference<IBackupContainer> bc = wait(backupConfig.backupContainer().getOrThrow(cx));
TraceEvent("AS_StartRestore");
Version ver = wait( restore(backupAgent, cx, cx, tagName, KeyRef(bc->getURL()), ranges, true, -1, true, addPrefix, removePrefix, true, randomUid) );
return ver;
if (fastRestore) {
TraceEvent("AtomicParallelRestoreStartRestore");
Version targetVersion = -1;
bool lockDB = true;
wait(submitParallelRestore(cx, tagName, ranges, KeyRef(bc->getURL()), targetVersion, lockDB, randomUid));
TraceEvent("AtomicParallelRestoreWaitForRestoreFinish");
wait(parallelRestoreFinish(cx, randomUid));
return -1;
} else {
TraceEvent("AS_StartRestore");
Version ver = wait(restore(backupAgent, cx, cx, tagName, KeyRef(bc->getURL()), ranges, true, -1, true,
addPrefix, removePrefix, true, randomUid));
return ver;
}
}
// Similar to atomicRestore, only used in simulation test.
// locks the database before discontinuing the backup and that same lock is then used while doing the restore.
// the tagname of the backup must be the same as the restore.
ACTOR static Future<Void> atomicParallelRestore(FileBackupAgent* backupAgent, Database cx, Key tagName,
Standalone<VectorRef<KeyRangeRef>> ranges, Key addPrefix,
Key removePrefix) {
Version ver = wait(atomicRestore(backupAgent, cx, tagName, ranges, addPrefix, removePrefix, true));
return Void();
}
};
@ -4429,12 +4559,29 @@ const std::string BackupAgentBase::defaultTagName = "default";
const int BackupAgentBase::logHeaderSize = 12;
const int FileBackupAgent::dataFooterSize = 20;
// Return if parallel restore has finished
Future<Void> FileBackupAgent::parallelRestoreFinish(Database cx, UID randomUID) {
return FileBackupAgentImpl::parallelRestoreFinish(cx, randomUID);
}
Future<Void> FileBackupAgent::submitParallelRestore(Database cx, Key backupTag,
Standalone<VectorRef<KeyRangeRef>> backupRanges, KeyRef bcUrl,
Version targetVersion, bool lockDB, UID randomUID) {
return FileBackupAgentImpl::submitParallelRestore(cx, backupTag, backupRanges, bcUrl, targetVersion, lockDB,
randomUID);
}
Future<Void> FileBackupAgent::atomicParallelRestore(Database cx, Key tagName, Standalone<VectorRef<KeyRangeRef>> ranges,
Key addPrefix, Key removePrefix) {
return FileBackupAgentImpl::atomicParallelRestore(this, cx, tagName, ranges, addPrefix, removePrefix);
}
Future<Version> FileBackupAgent::restore(Database cx, Optional<Database> cxOrig, Key tagName, Key url, Standalone<VectorRef<KeyRangeRef>> ranges, bool waitForComplete, Version targetVersion, bool verbose, Key addPrefix, Key removePrefix, bool lockDB) {
return FileBackupAgentImpl::restore(this, cx, cxOrig, tagName, url, ranges, waitForComplete, targetVersion, verbose, addPrefix, removePrefix, lockDB, deterministicRandom()->randomUniqueID());
}
Future<Version> FileBackupAgent::atomicRestore(Database cx, Key tagName, Standalone<VectorRef<KeyRangeRef>> ranges, Key addPrefix, Key removePrefix) {
return FileBackupAgentImpl::atomicRestore(this, cx, tagName, ranges, addPrefix, removePrefix);
return FileBackupAgentImpl::atomicRestore(this, cx, tagName, ranges, addPrefix, removePrefix, false);
}
Future<ERestoreState> FileBackupAgent::abortRestore(Reference<ReadYourWritesTransaction> tr, Key tagName) {

View File

@ -187,29 +187,7 @@ ACTOR Future<Void> startProcessRestoreRequests(Reference<RestoreMasterData> self
TraceEvent("FastRestoreMasterWaitOnRestoreRequests", self->id());
// lock DB for restore
numTries = 0;
loop {
try {
wait(lockDatabase(cx, randomUID));
state Reference<ReadYourWritesTransaction> tr =
Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(cx));
tr->reset();
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
wait(checkDatabaseLock(tr, randomUID));
TraceEvent("FastRestoreMasterProcessRestoreRequests", self->id()).detail("DBIsLocked", randomUID);
break;
} catch (Error& e) {
TraceEvent("FastRestoreMasterProcessRestoreRequests", self->id()).detail("CheckLockError", e.what());
TraceEvent(numTries > 50 ? SevError : SevWarnAlways, "FastRestoreMayFail")
.detail("Reason", "DB is not properly locked")
.detail("ExpectedLockID", randomUID);
numTries++;
wait(delay(5.0));
}
}
// DB has been locked where restore request is submitted
wait(clearDB(cx));
// Step: Perform the restore requests
@ -237,22 +215,6 @@ ACTOR Future<Void> startProcessRestoreRequests(Reference<RestoreMasterData> self
// Step: Notify all restore requests have been handled by cleaning up the restore keys
wait(signalRestoreCompleted(self, cx));
try {
wait(unlockDatabase(cx, randomUID));
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) { // Should only happen in simulation
TraceEvent(SevWarnAlways, "FastRestoreMasterOnCancelingActor", self->id())
.detail("DBLock", randomUID)
.detail("ManualCheck", "Is DB locked");
} else {
TraceEvent(SevError, "FastRestoreMasterUnlockDBFailed", self->id())
.detail("DBLock", randomUID)
.detail("ErrorCode", e.code())
.detail("Error", e.what());
ASSERT_WE_THINK(false); // This unlockDatabase should always succeed, we think.
}
}
TraceEvent("FastRestoreMasterRestoreCompleted", self->id());
return Void();

View File

@ -27,6 +27,7 @@
//A workload which test the correctness of backup and restore process
struct AtomicRestoreWorkload : TestWorkload {
double startAfter, restoreAfter;
bool fastRestore; // true: use fast restore, false: use old style restore
Standalone<VectorRef<KeyRangeRef>> backupRanges;
AtomicRestoreWorkload(WorkloadContext const& wcx)
@ -34,6 +35,7 @@ struct AtomicRestoreWorkload : TestWorkload {
startAfter = getOption(options, LiteralStringRef("startAfter"), 10.0);
restoreAfter = getOption(options, LiteralStringRef("restoreAfter"), 20.0);
fastRestore = getOption(options, LiteralStringRef("fastRestore"), false);
backupRanges.push_back_deep(backupRanges.arena(), normalKeys);
}
@ -79,26 +81,31 @@ struct AtomicRestoreWorkload : TestWorkload {
wait( delay(self->restoreAfter * deterministicRandom()->random01()) );
TraceEvent("AtomicRestore_RestoreStart");
loop {
std::vector<Future<Version>> restores;
if (deterministicRandom()->random01() < 0.5) {
for (auto &range : self->backupRanges)
restores.push_back(backupAgent.atomicRestore(cx, BackupAgentBase::getDefaultTag(), range, StringRef(), StringRef()));
if (self->fastRestore) { // New fast parallel restore
TraceEvent(SevWarnAlways, "AtomicParallelRestore");
wait(backupAgent.atomicParallelRestore(cx, BackupAgentBase::getDefaultTag(), self->backupRanges,
StringRef(), StringRef()));
} else { // Old style restore
loop {
std::vector<Future<Version>> restores;
if (deterministicRandom()->random01() < 0.5) {
for (auto& range : self->backupRanges)
restores.push_back(backupAgent.atomicRestore(cx, BackupAgentBase::getDefaultTag(), range,
StringRef(), StringRef()));
} else {
restores.push_back(backupAgent.atomicRestore(cx, BackupAgentBase::getDefaultTag(),
self->backupRanges, StringRef(), StringRef()));
}
try {
wait(waitForAll(restores));
break;
} catch (Error& e) {
if (e.code() != error_code_backup_unneeded && e.code() != error_code_backup_duplicate) throw;
}
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
}
else {
restores.push_back(backupAgent.atomicRestore(cx, BackupAgentBase::getDefaultTag(), self->backupRanges, StringRef(), StringRef()));
}
try {
wait(waitForAll(restores));
break;
}
catch (Error& e) {
if (e.code() != error_code_backup_unneeded && e.code() != error_code_backup_duplicate)
throw;
}
wait( delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY) );
}
// SOMEDAY: Remove after backup agents can exist quiescently
if (g_simulator.backupAgents == ISimulator::BackupToFile) {
g_simulator.backupAgents = ISimulator::NoBackupAgents;

View File

@ -453,39 +453,15 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
state std::vector<Future<Version>> restores;
state std::vector<Standalone<StringRef>> restoreTags;
// Restore each range by calling backupAgent.restore()
// Submit parallel restore requests
TraceEvent("FastRestore").detail("PrepareRestores", self->backupRanges.size());
loop {
state ReadYourWritesTransaction tr1(cx);
tr1.reset();
tr1.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr1.setOption(FDBTransactionOptions::LOCK_AWARE);
try {
// Note: we always lock DB here in case DB is modified at the bacupRanges boundary.
for (restoreIndex = 0; restoreIndex < self->backupRanges.size(); restoreIndex++) {
auto range = self->backupRanges[restoreIndex];
Standalone<StringRef> restoreTag(self->backupTag.toString() + "_" +
std::to_string(restoreIndex));
restoreTags.push_back(restoreTag);
// Register the request request in DB, which will be picked up by restore worker leader
struct RestoreRequest restoreRequest(
restoreIndex, restoreTag, KeyRef(lastBackupContainer->getURL()), true, targetVersion,
true, range, Key(), Key(), self->locked, deterministicRandom()->randomUniqueID());
tr1.set(restoreRequestKeyFor(restoreRequest.index), restoreRequestValue(restoreRequest));
}
tr1.set(restoreRequestTriggerKey,
restoreRequestTriggerValue(deterministicRandom()->randomUniqueID(),
self->backupRanges.size()));
wait(tr1.commit()); // Trigger restore
break;
} catch (Error& e) {
wait(tr1.onError(e));
}
};
wait(backupAgent.submitParallelRestore(cx, self->backupTag, self->backupRanges,
KeyRef(lastBackupContainer->getURL()), targetVersion,
self->locked, randomID));
TraceEvent("FastRestore").detail("TriggerRestore", "Setting up restoreRequestTriggerKey");
// Sometimes kill and restart the restore
// In real cluster, aborting a restore needs:
// In real cluster, aborting a restore needs:
// (1) kill restore cluster; (2) clear dest. DB restore system keyspace.
// TODO: Consider gracefully abort a restore and restart.
if (BUGGIFY && TEST_ABORT_FASTRESTORE) {
@ -509,34 +485,9 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
}
}
// We should wait on all restore before proceeds
// Wait for parallel restore to finish before we can proceed
TraceEvent("FastRestore").detail("BackupAndParallelRestore", "WaitForRestoreToFinish");
restoreDone = false;
state Future<Void> watchForRestoreRequestDone;
loop {
try {
if (restoreDone) break;
tr2.reset();
tr2.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr2.setOption(FDBTransactionOptions::LOCK_AWARE);
Optional<Value> restoreRequestDoneKeyValue = wait(tr2.get(restoreRequestDoneKey));
// Restore may finish before restoreAgent waits on the restore finish event.
if (restoreRequestDoneKeyValue.present()) {
restoreDone = true; // In case commit clears the key but in unknown_state
tr2.clear(restoreRequestDoneKey);
wait(tr2.commit());
break;
} else {
watchForRestoreRequestDone = tr2.watch(restoreRequestDoneKey);
wait(tr2.commit());
wait(watchForRestoreRequestDone);
break;
}
} catch (Error& e) {
wait(tr2.onError(e));
}
}
wait(backupAgent.parallelRestoreFinish(cx, randomID));
TraceEvent("FastRestore").detail("BackupAndParallelRestore", "RestoreFinished");
for (auto& restore : restores) {

View File

@ -210,6 +210,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES slow/ParallelRestoreCorrectnessAtomicOpTinyData.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreCorrectnessCycle.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreCorrectnessMultiCycles.txt)
add_fdb_test(TEST_FILES slow/ParallelRestoreApiCorrectnessAtomicRestore.txt)
# Note that status tests are not deterministic.
add_fdb_test(TEST_FILES status/invalid_proc_addresses.txt)
add_fdb_test(TEST_FILES status/local_6_machine_no_replicas_remain.txt)

View File

@ -25,4 +25,4 @@ testName=AtomicRestore
startAfter=10.0
restoreAfter=50.0
clearAfterTest=false
simBackupAgents=BackupToFile
simBackupAgents=BackupToFile

View File

@ -0,0 +1,36 @@
testTitle=ApiCorrectnessTest
testName=ApiCorrectness
runSetup=true
clearAfterTest=true
numKeys=5000
onlyLowerCase=true
shortKeysRatio=0.5
minShortKeyLength=1
maxShortKeyLength=3
minLongKeyLength=1
maxLongKeyLength=128
minValueLength=1
maxValueLength=1000
numGets=1000
numGetRanges=100
numGetRangeSelectors=100
numGetKeys=100
numClears=100
numClearRanges=10
maxTransactionBytes=500000
randomTestDuration=60
timeout=2100
testName=AtomicRestore
startAfter=10.0
restoreAfter=50.0
clearAfterTest=false
simBackupAgents=BackupToFile
fastRestore=true
; Each testName=RunRestoreWorkerWorkload creates a restore worker
; We need at least 3 restore workers: master, loader, and applier
testName=RunRestoreWorkerWorkload
;timeout is in seconds
timeout=360000