Merge pull request #3041 from xumengpanda/mengxu/sim-restore-agent

Performant restore [31/xx]: Fix for fast restore agent
This commit is contained in:
Jingyu Zhou 2020-04-28 20:28:18 -07:00 committed by GitHub
commit f4010c632a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 62 additions and 123 deletions

View File

@ -2192,8 +2192,7 @@ ACTOR Future<Void> runRestore(Database db, std::string originalClusterFile, std:
// Fast restore agent that kicks off the restore: send restore requests to restore workers.
ACTOR Future<Void> runFastRestoreAgent(Database db, std::string tagName, std::string container,
Standalone<VectorRef<KeyRangeRef>> ranges, Version dbVersion,
bool performRestore, bool verbose, bool waitForDone, std::string addPrefix,
std::string removePrefix) {
bool performRestore, bool verbose, bool waitForDone) {
try {
state FileBackupAgent backupAgent;
state Version restoreVersion = invalidVersion;
@ -2219,9 +2218,26 @@ ACTOR Future<Void> runFastRestoreAgent(Database db, std::string tagName, std::st
dbVersion = desc.maxRestorableVersion.get();
TraceEvent("FastRestoreAgent").detail("TargetRestoreVersion", dbVersion);
}
Version _restoreVersion = wait(fastRestore(db, KeyRef(tagName), KeyRef(container), waitForDone, dbVersion,
verbose, range, KeyRef(addPrefix), KeyRef(removePrefix)));
restoreVersion = _restoreVersion;
state UID randomUID = deterministicRandom()->randomUniqueID();
TraceEvent("FastRestoreAgent")
.detail("SubmitRestoreRequests", ranges.size())
.detail("RestoreUID", randomUID);
wait(backupAgent.submitParallelRestore(db, KeyRef(tagName), ranges, KeyRef(container), dbVersion, true,
randomUID));
if (waitForDone) {
// Wait for parallel restore to finish and unlock DB after that
TraceEvent("FastRestoreAgent").detail("BackupAndParallelRestore", "WaitForRestoreToFinish");
wait(backupAgent.parallelRestoreFinish(db, randomUID));
TraceEvent("FastRestoreAgent").detail("BackupAndParallelRestore", "RestoreFinished");
} else {
TraceEvent("FastRestoreAgent")
.detail("RestoreUID", randomUID)
.detail("OperationGuide", "Manually unlock DB when restore finishes");
printf("WARNING: DB will be in locked state after restore. Need UID:%s to unlock DB\n",
randomUID.toString().c_str());
}
restoreVersion = dbVersion;
} else {
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(container);
state BackupDescription description = wait(bc->describeBackup());
@ -3740,7 +3756,7 @@ int main(int argc, char* argv[]) {
switch (restoreType) {
case RESTORE_START:
f = stopAfter(runFastRestoreAgent(db, tagName, restoreContainer, backupKeys, restoreVersion, !dryRun,
!quietDisplay, waitForDone, addPrefix, removePrefix));
!quietDisplay, waitForDone));
break;
case RESTORE_WAIT:
printf("[TODO][ERROR] FastRestore does not support RESTORE_WAIT yet!\n");
@ -3887,102 +3903,3 @@ int main(int argc, char* argv[]) {
flushAndExit(status);
}
//------Restore Agent: Kick off the restore by sending the restore requests
ACTOR static Future<FileBackupAgent::ERestoreState> waitFastRestore(Database cx, Key tagName, bool verbose) {
// We should wait on all restore to finish before proceeds
TraceEvent("FastRestore").detail("Progress", "WaitForRestoreToFinish");
state ReadYourWritesTransaction tr(cx);
state Future<Void> fRestoreRequestDone;
state bool restoreRequestDone = false;
loop {
try {
tr.reset();
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
// In case restoreRequestDoneKey is already set before we set watch on it
Optional<Value> restoreRequestDoneKeyValue = wait(tr.get(restoreRequestDoneKey));
if (restoreRequestDoneKeyValue.present()) {
restoreRequestDone = true;
tr.clear(restoreRequestDoneKey);
wait(tr.commit());
break;
} else if (!restoreRequestDone) {
fRestoreRequestDone = tr.watch(restoreRequestDoneKey);
wait(tr.commit());
wait(fRestoreRequestDone);
} else {
break;
}
} catch (Error& e) {
wait(tr.onError(e));
}
}
TraceEvent("FastRestore").detail("Progress", "RestoreFinished");
return FileBackupAgent::ERestoreState::COMPLETED;
}
ACTOR static Future<Version> _fastRestore(Database cx, Key tagName, Key url, bool waitForComplete,
Version targetVersion, bool verbose, KeyRange range, Key addPrefix,
Key removePrefix) {
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(url.toString());
state BackupDescription desc = wait(bc->describeBackup());
wait(desc.resolveVersionTimes(cx));
if (targetVersion == invalidVersion && desc.maxRestorableVersion.present())
targetVersion = desc.maxRestorableVersion.get();
Optional<RestorableFileSet> restoreSet = wait(bc->getRestoreSet(targetVersion));
TraceEvent("FastRestore").detail("BackupDesc", desc.toString()).detail("TargetVersion", targetVersion);
if (!restoreSet.present()) {
TraceEvent(SevWarn, "FileBackupAgentRestoreNotPossible")
.detail("BackupContainer", bc->getURL())
.detail("TargetVersion", targetVersion);
throw restore_invalid_version();
}
// NOTE: The restore agent makes sure we only support 1 restore range for each restore request for now!
// The simulation test did test restoring multiple restore ranges in one restore request though.
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state int restoreIndex = 0;
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
Standalone<StringRef> restoreTag(tagName.toString() + "_" + std::to_string(restoreIndex));
bool locked = true;
struct RestoreRequest restoreRequest(restoreIndex, restoreTag, KeyRef(bc->getURL()), true, targetVersion,
true, range, Key(), Key(), locked,
deterministicRandom()->randomUniqueID());
tr->set(restoreRequestKeyFor(restoreRequest.index), restoreRequestValue(restoreRequest));
// backupRanges.size = 1 because we only support restoring 1 range in real mode for now
tr->set(restoreRequestTriggerKey, restoreRequestTriggerValue(deterministicRandom()->randomUniqueID(),1));
wait(tr->commit()); // Trigger fast restore
break;
} catch (Error& e) {
if (e.code() != error_code_restore_duplicate_tag) {
wait(tr->onError(e));
}
}
}
if (waitForComplete) {
FileBackupAgent::ERestoreState finalState = wait(waitFastRestore(cx, tagName, verbose));
if (finalState != FileBackupAgent::ERestoreState::COMPLETED) throw restore_error();
}
return targetVersion;
}
ACTOR Future<Version> fastRestore(Database cx, Standalone<StringRef> tagName, Standalone<StringRef> url,
bool waitForComplete, long targetVersion, bool verbose, Standalone<KeyRangeRef> range,
Standalone<StringRef> addPrefix, Standalone<StringRef> removePrefix) {
Version result =
wait(_fastRestore(cx, tagName, url, waitForComplete, targetVersion, verbose, range, addPrefix, removePrefix));
return result;
}

View File

@ -893,10 +893,6 @@ public:
}
};
ACTOR Future<Version> fastRestore(Database cx, Standalone<StringRef> tagName, Standalone<StringRef> url,
bool waitForComplete, long targetVersion, bool verbose, Standalone<KeyRangeRef> range,
Standalone<StringRef> addPrefix, Standalone<StringRef> removePrefix);
// Helper class for reading restore data from a buffer and throwing the right errors.
struct StringRefReader {
StringRefReader(StringRef s = StringRef(), Error e = Error()) : rptr(s.begin()), end(s.end()), failure_error(e) {}

View File

@ -3628,6 +3628,29 @@ public:
ACTOR static Future<Void> submitParallelRestore(Database cx, Key backupTag,
Standalone<VectorRef<KeyRangeRef>> backupRanges, KeyRef bcUrl,
Version targetVersion, bool lockDB, UID randomUID) {
// Sanity check backup is valid
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(bcUrl.toString());
state BackupDescription desc = wait(bc->describeBackup());
wait(desc.resolveVersionTimes(cx));
Optional<RestorableFileSet> restoreSet = wait(bc->getRestoreSet(targetVersion));
if (!restoreSet.present()) {
TraceEvent(SevWarn, "FileBackupAgentRestoreNotPossible")
.detail("BackupContainer", bc->getURL())
.detail("TargetVersion", targetVersion);
throw restore_invalid_version();
}
if (targetVersion == invalidVersion && desc.maxRestorableVersion.present()) {
targetVersion = desc.maxRestorableVersion.get();
TraceEvent(SevWarn, "FastRestoreSubmitRestoreRequestWithInvalidTargetVersion")
.detail("OverrideTargetVersion", targetVersion);
}
TraceEvent("FastRestoreSubmitRestoreRequest")
.detail("BackupDesc", desc.toString())
.detail("TargetVersion", targetVersion);
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state int restoreIndex = 0;
state int numTries = 0;

View File

@ -566,19 +566,19 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
// Fast Restore
init( FASTRESTORE_FAILURE_TIMEOUT, 3600 );
init( FASTRESTORE_HEARTBEAT_INTERVAL, 60 );
init( FASTRESTORE_SAMPLING_PERCENT, 1 ); if( randomize ) { FASTRESTORE_SAMPLING_PERCENT = deterministicRandom()->random01() * 100; }
init( FASTRESTORE_NUM_LOADERS, 3 ); if( randomize ) { FASTRESTORE_NUM_LOADERS = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_NUM_APPLIERS, 3 ); if( randomize ) { FASTRESTORE_NUM_APPLIERS = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_TXN_BATCH_MAX_BYTES, 512.0 ); if( randomize ) { FASTRESTORE_TXN_BATCH_MAX_BYTES = deterministicRandom()->random01() * 1024.0 * 1024.0 + 1.0; }
init( FASTRESTORE_VERSIONBATCH_MAX_BYTES, 10.0 * 1024.0 * 1024.0 ); if( randomize ) { FASTRESTORE_VERSIONBATCH_MAX_BYTES = deterministicRandom()->random01() * 10.0 * 1024.0 * 1024.0 * 1024.0; }
init( FASTRESTORE_VB_PARALLELISM, 3 ); if( randomize ) { FASTRESTORE_VB_PARALLELISM = deterministicRandom()->random01() * 20 + 1; }
init( FASTRESTORE_VB_MONITOR_DELAY, 5 ); if( randomize ) { FASTRESTORE_VB_MONITOR_DELAY = deterministicRandom()->random01() * 20 + 1; }
init( FASTRESTORE_VB_LAUNCH_DELAY, 5 ); if( randomize ) { FASTRESTORE_VB_LAUNCH_DELAY = deterministicRandom()->random01() * 60 + 1; }
init( FASTRESTORE_ROLE_LOGGING_DELAY, 5 ); if( randomize ) { FASTRESTORE_ROLE_LOGGING_DELAY = deterministicRandom()->random01() * 60 + 1; }
init( FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL, 5 ); if( randomize ) { FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL = deterministicRandom()->random01() * 60 + 1; }
init( FASTRESTORE_ATOMICOP_WEIGHT, 100 ); if( randomize ) { FASTRESTORE_ATOMICOP_WEIGHT = deterministicRandom()->random01() * 200 + 1; }
init( FASTRESTORE_APPLYING_PARALLELISM, 100 ); if( randomize ) { FASTRESTORE_APPLYING_PARALLELISM = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_MONITOR_LEADER_DELAY, 5 ); if( randomize ) { FASTRESTORE_MONITOR_LEADER_DELAY = deterministicRandom()->random01() * 100; }
init( FASTRESTORE_SAMPLING_PERCENT, 1 ); if( randomize && BUGGIFY ) { FASTRESTORE_SAMPLING_PERCENT = deterministicRandom()->random01() * 100; }
init( FASTRESTORE_NUM_LOADERS, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_LOADERS = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_NUM_APPLIERS, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_NUM_APPLIERS = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_TXN_BATCH_MAX_BYTES, 512.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_TXN_BATCH_MAX_BYTES = deterministicRandom()->random01() * 1024.0 * 1024.0 + 1.0; }
init( FASTRESTORE_VERSIONBATCH_MAX_BYTES, 10.0 * 1024.0 * 1024.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_VERSIONBATCH_MAX_BYTES = deterministicRandom()->random01() * 10.0 * 1024.0 * 1024.0 * 1024.0; }
init( FASTRESTORE_VB_PARALLELISM, 3 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_PARALLELISM = deterministicRandom()->random01() * 20 + 1; }
init( FASTRESTORE_VB_MONITOR_DELAY, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_MONITOR_DELAY = deterministicRandom()->random01() * 20 + 1; }
init( FASTRESTORE_VB_LAUNCH_DELAY, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_VB_LAUNCH_DELAY = deterministicRandom()->random01() * 60 + 1; }
init( FASTRESTORE_ROLE_LOGGING_DELAY, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_ROLE_LOGGING_DELAY = deterministicRandom()->random01() * 60 + 1; }
init( FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_UPDATE_PROCESS_STATS_INTERVAL = deterministicRandom()->random01() * 60 + 1; }
init( FASTRESTORE_ATOMICOP_WEIGHT, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_ATOMICOP_WEIGHT = deterministicRandom()->random01() * 200 + 1; }
init( FASTRESTORE_APPLYING_PARALLELISM, 100 ); if( randomize && BUGGIFY ) { FASTRESTORE_APPLYING_PARALLELISM = deterministicRandom()->random01() * 10 + 1; }
init( FASTRESTORE_MONITOR_LEADER_DELAY, 5 ); if( randomize && BUGGIFY ) { FASTRESTORE_MONITOR_LEADER_DELAY = deterministicRandom()->random01() * 100; }
init( FASTRESTORE_STRAGGLER_THRESHOLD_SECONDS, 60 ); if( randomize && BUGGIFY ) { FASTRESTORE_STRAGGLER_THRESHOLD_SECONDS = deterministicRandom()->random01() * 240 + 10; }
init( FASTRESTORE_TRACK_REQUEST_LATENCY, true ); if( randomize && BUGGIFY ) { FASTRESTORE_TRACK_REQUEST_LATENCY = false; }
init( FASTRESTORE_TRACK_LOADER_SEND_REQUESTS, false ); if( randomize && BUGGIFY ) { FASTRESTORE_TRACK_LOADER_SEND_REQUESTS = true; }

View File

@ -147,7 +147,10 @@ ACTOR Future<Void> collectRestoreWorkerInterface(Reference<RestoreWorkerData> se
}
break;
}
TraceEvent("FastRestore").suppressFor(10.0).detail("NotEnoughWorkers", agentValues.size());
TraceEvent("FastRestore")
.suppressFor(10.0)
.detail("NotEnoughWorkers", agentValues.size())
.detail("MinWorkers", min_num_workers);
wait(delay(5.0));
} catch (Error& e) {
wait(tr.onError(e));