Merge branch 'master' of https://github.com/apple/foundationdb into fdbtest
This commit is contained in:
commit
916c4ef628
|
@ -96,29 +96,67 @@ enum enumRestoreType {
|
|||
//
|
||||
enum {
|
||||
// Backup constants
|
||||
OPT_DESTCONTAINER, OPT_SNAPSHOTINTERVAL, OPT_ERRORLIMIT, OPT_NOSTOPWHENDONE,
|
||||
OPT_EXPIRE_BEFORE_VERSION, OPT_EXPIRE_BEFORE_DATETIME, OPT_EXPIRE_DELETE_BEFORE_DAYS,
|
||||
OPT_EXPIRE_RESTORABLE_AFTER_VERSION, OPT_EXPIRE_RESTORABLE_AFTER_DATETIME, OPT_EXPIRE_MIN_RESTORABLE_DAYS,
|
||||
OPT_BASEURL, OPT_BLOB_CREDENTIALS, OPT_DESCRIBE_DEEP, OPT_DESCRIBE_TIMESTAMPS,
|
||||
OPT_DUMP_BEGIN, OPT_DUMP_END, OPT_JSON, OPT_DELETE_DATA, OPT_MIN_CLEANUP_SECONDS,
|
||||
OPT_DESTCONTAINER,
|
||||
OPT_SNAPSHOTINTERVAL,
|
||||
OPT_ERRORLIMIT,
|
||||
OPT_NOSTOPWHENDONE,
|
||||
OPT_EXPIRE_BEFORE_VERSION,
|
||||
OPT_EXPIRE_BEFORE_DATETIME,
|
||||
OPT_EXPIRE_DELETE_BEFORE_DAYS,
|
||||
OPT_EXPIRE_RESTORABLE_AFTER_VERSION,
|
||||
OPT_EXPIRE_RESTORABLE_AFTER_DATETIME,
|
||||
OPT_EXPIRE_MIN_RESTORABLE_DAYS,
|
||||
OPT_BASEURL,
|
||||
OPT_BLOB_CREDENTIALS,
|
||||
OPT_DESCRIBE_DEEP,
|
||||
OPT_DESCRIBE_TIMESTAMPS,
|
||||
OPT_DUMP_BEGIN,
|
||||
OPT_DUMP_END,
|
||||
OPT_JSON,
|
||||
OPT_DELETE_DATA,
|
||||
OPT_MIN_CLEANUP_SECONDS,
|
||||
OPT_USE_PARTITIONED_LOG,
|
||||
|
||||
// Backup and Restore constants
|
||||
OPT_TAGNAME, OPT_BACKUPKEYS, OPT_WAITFORDONE,
|
||||
OPT_TAGNAME,
|
||||
OPT_BACKUPKEYS,
|
||||
OPT_WAITFORDONE,
|
||||
OPT_INCREMENTALONLY,
|
||||
|
||||
// Backup Modify
|
||||
OPT_MOD_ACTIVE_INTERVAL, OPT_MOD_VERIFY_UID,
|
||||
OPT_MOD_ACTIVE_INTERVAL,
|
||||
OPT_MOD_VERIFY_UID,
|
||||
|
||||
// Restore constants
|
||||
OPT_RESTORECONTAINER, OPT_RESTORE_VERSION, OPT_RESTORE_TIMESTAMP, OPT_PREFIX_ADD, OPT_PREFIX_REMOVE, OPT_RESTORE_CLUSTERFILE_DEST, OPT_RESTORE_CLUSTERFILE_ORIG,
|
||||
OPT_RESTORECONTAINER,
|
||||
OPT_RESTORE_VERSION,
|
||||
OPT_RESTORE_TIMESTAMP,
|
||||
OPT_PREFIX_ADD,
|
||||
OPT_PREFIX_REMOVE,
|
||||
OPT_RESTORE_CLUSTERFILE_DEST,
|
||||
OPT_RESTORE_CLUSTERFILE_ORIG,
|
||||
OPT_RESTORE_BEGIN_VERSION,
|
||||
|
||||
// Shared constants
|
||||
OPT_CLUSTERFILE, OPT_QUIET, OPT_DRYRUN, OPT_FORCE,
|
||||
OPT_HELP, OPT_DEVHELP, OPT_VERSION, OPT_PARENTPID, OPT_CRASHONERROR,
|
||||
OPT_NOBUFSTDOUT, OPT_BUFSTDOUTERR, OPT_TRACE, OPT_TRACE_DIR,
|
||||
OPT_KNOB, OPT_TRACE_LOG_GROUP, OPT_MEMLIMIT, OPT_LOCALITY,
|
||||
OPT_CLUSTERFILE,
|
||||
OPT_QUIET,
|
||||
OPT_DRYRUN,
|
||||
OPT_FORCE,
|
||||
OPT_HELP,
|
||||
OPT_DEVHELP,
|
||||
OPT_VERSION,
|
||||
OPT_PARENTPID,
|
||||
OPT_CRASHONERROR,
|
||||
OPT_NOBUFSTDOUT,
|
||||
OPT_BUFSTDOUTERR,
|
||||
OPT_TRACE,
|
||||
OPT_TRACE_DIR,
|
||||
OPT_KNOB,
|
||||
OPT_TRACE_LOG_GROUP,
|
||||
OPT_MEMLIMIT,
|
||||
OPT_LOCALITY,
|
||||
|
||||
//DB constants
|
||||
// DB constants
|
||||
OPT_SOURCE_CLUSTER,
|
||||
OPT_DEST_CLUSTER,
|
||||
OPT_CLEANUP,
|
||||
|
@ -154,7 +192,7 @@ CSimpleOpt::SOption g_rgAgentOptions[] = {
|
|||
#ifndef TLS_DISABLED
|
||||
TLS_OPTION_FLAGS
|
||||
#endif
|
||||
SO_END_OF_OPTIONS
|
||||
SO_END_OF_OPTIONS
|
||||
};
|
||||
|
||||
CSimpleOpt::SOption g_rgBackupStartOptions[] = {
|
||||
|
@ -197,6 +235,7 @@ CSimpleOpt::SOption g_rgBackupStartOptions[] = {
|
|||
{ OPT_DEVHELP, "--dev-help", SO_NONE },
|
||||
{ OPT_KNOB, "--knob_", SO_REQ_SEP },
|
||||
{ OPT_BLOB_CREDENTIALS, "--blob_credentials", SO_REQ_SEP },
|
||||
{ OPT_INCREMENTALONLY, "--incremental", SO_NONE },
|
||||
#ifndef TLS_DISABLED
|
||||
TLS_OPTION_FLAGS
|
||||
#endif
|
||||
|
@ -603,6 +642,7 @@ CSimpleOpt::SOption g_rgRestoreOptions[] = {
|
|||
{ OPT_BACKUPKEYS, "--keys", SO_REQ_SEP },
|
||||
{ OPT_WAITFORDONE, "-w", SO_NONE },
|
||||
{ OPT_WAITFORDONE, "--waitfordone", SO_NONE },
|
||||
{ OPT_RESTORE_BEGIN_VERSION, "--begin_version", SO_REQ_SEP },
|
||||
{ OPT_RESTORE_VERSION, "--version", SO_REQ_SEP },
|
||||
{ OPT_RESTORE_VERSION, "-v", SO_REQ_SEP },
|
||||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
|
@ -622,6 +662,7 @@ CSimpleOpt::SOption g_rgRestoreOptions[] = {
|
|||
{ OPT_HELP, "--help", SO_NONE },
|
||||
{ OPT_DEVHELP, "--dev-help", SO_NONE },
|
||||
{ OPT_BLOB_CREDENTIALS, "--blob_credentials", SO_REQ_SEP },
|
||||
{ OPT_INCREMENTALONLY, "--incremental", SO_NONE },
|
||||
#ifndef TLS_DISABLED
|
||||
TLS_OPTION_FLAGS
|
||||
#endif
|
||||
|
@ -975,6 +1016,9 @@ static void printBackupUsage(bool devhelp) {
|
|||
" remove mutations for it. By default this is set to one hour.\n");
|
||||
printf(" --delete_data\n"
|
||||
" This flag will cause cleanup to remove mutations for the most stale backup or DR.\n");
|
||||
// TODO: Enable this command-line argument once atomics are supported
|
||||
// printf(" --incremental\n"
|
||||
// " Performs incremental backup without the base backup.\n");
|
||||
#ifndef TLS_DISABLED
|
||||
printf(TLS_HELP);
|
||||
#endif
|
||||
|
@ -1032,8 +1076,11 @@ static void printRestoreUsage(bool devhelp ) {
|
|||
printf(" --trace_format FORMAT\n"
|
||||
" Select the format of the trace files. xml (the default) and json are supported.\n"
|
||||
" Has no effect unless --log is specified.\n");
|
||||
// TODO: Enable this command-line argument once atomics are supported
|
||||
// printf(" --incremental\n"
|
||||
// " Performs incremental restore without the base backup.\n");
|
||||
#ifndef TLS_DISABLED
|
||||
printf(TLS_HELP);
|
||||
printf(TLS_HELP);
|
||||
#endif
|
||||
printf(" -v DBVERSION The version at which the database will be restored.\n");
|
||||
printf(" --timestamp Instead of a numeric version, use this to specify a timestamp in %s\n", BackupAgentBase::timeFormat().c_str());
|
||||
|
@ -1721,7 +1768,8 @@ ACTOR Future<Void> submitDBBackup(Database src, Database dest, Standalone<Vector
|
|||
|
||||
ACTOR Future<Void> submitBackup(Database db, std::string url, int snapshotIntervalSeconds,
|
||||
Standalone<VectorRef<KeyRangeRef>> backupRanges, std::string tagName, bool dryRun,
|
||||
bool waitForCompletion, bool stopWhenDone, bool usePartitionedLog) {
|
||||
bool waitForCompletion, bool stopWhenDone, bool usePartitionedLog,
|
||||
bool incrementalBackupOnly) {
|
||||
try {
|
||||
state FileBackupAgent backupAgent;
|
||||
|
||||
|
@ -1766,7 +1814,7 @@ ACTOR Future<Void> submitBackup(Database db, std::string url, int snapshotInterv
|
|||
|
||||
else {
|
||||
wait(backupAgent.submitBackup(db, KeyRef(url), snapshotIntervalSeconds, tagName, backupRanges, stopWhenDone,
|
||||
usePartitionedLog));
|
||||
usePartitionedLog, incrementalBackupOnly));
|
||||
|
||||
// Wait for the backup to complete, if requested
|
||||
if (waitForCompletion) {
|
||||
|
@ -2077,7 +2125,10 @@ Reference<IBackupContainer> openBackupContainer(const char *name, std::string de
|
|||
return c;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> runRestore(Database db, std::string originalClusterFile, std::string tagName, std::string container, Standalone<VectorRef<KeyRangeRef>> ranges, Version targetVersion, std::string targetTimestamp, bool performRestore, bool verbose, bool waitForDone, std::string addPrefix, std::string removePrefix) {
|
||||
ACTOR Future<Void> runRestore(Database db, std::string originalClusterFile, std::string tagName, std::string container,
|
||||
Standalone<VectorRef<KeyRangeRef>> ranges, Version beginVersion, Version targetVersion,
|
||||
std::string targetTimestamp, bool performRestore, bool verbose, bool waitForDone,
|
||||
std::string addPrefix, std::string removePrefix, bool incrementalBackupOnly) {
|
||||
if(ranges.empty()) {
|
||||
ranges.push_back_deep(ranges.arena(), normalKeys);
|
||||
}
|
||||
|
@ -2119,19 +2170,23 @@ ACTOR Future<Void> runRestore(Database db, std::string originalClusterFile, std:
|
|||
|
||||
BackupDescription desc = wait(bc->describeBackup());
|
||||
|
||||
if(!desc.maxRestorableVersion.present()) {
|
||||
if (incrementalBackupOnly && desc.contiguousLogEnd.present()) {
|
||||
targetVersion = desc.contiguousLogEnd.get() - 1;
|
||||
} else if (desc.maxRestorableVersion.present()) {
|
||||
targetVersion = desc.maxRestorableVersion.get();
|
||||
} else {
|
||||
fprintf(stderr, "The specified backup is not restorable to any version.\n");
|
||||
throw restore_error();
|
||||
}
|
||||
|
||||
targetVersion = desc.maxRestorableVersion.get();
|
||||
|
||||
if(verbose)
|
||||
printf("Using target restore version %" PRId64 "\n", targetVersion);
|
||||
}
|
||||
|
||||
if (performRestore) {
|
||||
Version restoredVersion = wait(backupAgent.restore(db, origDb, KeyRef(tagName), KeyRef(container), ranges, waitForDone, targetVersion, verbose, KeyRef(addPrefix), KeyRef(removePrefix)));
|
||||
Version restoredVersion = wait(backupAgent.restore(
|
||||
db, origDb, KeyRef(tagName), KeyRef(container), ranges, waitForDone, targetVersion, verbose,
|
||||
KeyRef(addPrefix), KeyRef(removePrefix), true, incrementalBackupOnly, beginVersion));
|
||||
|
||||
if(waitForDone && verbose) {
|
||||
// If restore is now complete then report version restored
|
||||
|
@ -2910,11 +2965,13 @@ int main(int argc, char* argv[]) {
|
|||
std::string removePrefix;
|
||||
Standalone<VectorRef<KeyRangeRef>> backupKeys;
|
||||
int maxErrors = 20;
|
||||
Version beginVersion = invalidVersion;
|
||||
Version restoreVersion = invalidVersion;
|
||||
std::string restoreTimestamp;
|
||||
bool waitForDone = false;
|
||||
bool stopWhenDone = true;
|
||||
bool usePartitionedLog = false; // Set to true to use new backup system
|
||||
bool incrementalBackupOnly = false;
|
||||
bool forceAction = false;
|
||||
bool trace = false;
|
||||
bool quietDisplay = false;
|
||||
|
@ -3167,6 +3224,10 @@ int main(int argc, char* argv[]) {
|
|||
case OPT_USE_PARTITIONED_LOG:
|
||||
usePartitionedLog = true;
|
||||
break;
|
||||
case OPT_INCREMENTALONLY:
|
||||
// TODO: Enable this command-line argument once atomics are supported
|
||||
// incrementalBackupOnly = true;
|
||||
break;
|
||||
case OPT_RESTORECONTAINER:
|
||||
restoreContainer = args->OptionArg();
|
||||
// If the url starts with '/' then prepend "file://" for backwards compatibility
|
||||
|
@ -3194,6 +3255,17 @@ int main(int argc, char* argv[]) {
|
|||
}
|
||||
break;
|
||||
}
|
||||
case OPT_RESTORE_BEGIN_VERSION: {
|
||||
const char* a = args->OptionArg();
|
||||
long long ver = 0;
|
||||
if (!sscanf(a, "%lld", &ver)) {
|
||||
fprintf(stderr, "ERROR: Could not parse database beginVersion `%s'\n", a);
|
||||
printHelpTeaser(argv[0]);
|
||||
return FDB_EXIT_ERROR;
|
||||
}
|
||||
beginVersion = ver;
|
||||
break;
|
||||
}
|
||||
case OPT_RESTORE_VERSION: {
|
||||
const char* a = args->OptionArg();
|
||||
long long ver = 0;
|
||||
|
@ -3567,7 +3639,8 @@ int main(int argc, char* argv[]) {
|
|||
// Test out the backup url to make sure it parses. Doesn't test to make sure it's actually writeable.
|
||||
openBackupContainer(argv[0], destinationContainer);
|
||||
f = stopAfter(submitBackup(db, destinationContainer, snapshotIntervalSeconds, backupKeys, tagName,
|
||||
dryRun, waitForDone, stopWhenDone, usePartitionedLog));
|
||||
dryRun, waitForDone, stopWhenDone, usePartitionedLog,
|
||||
incrementalBackupOnly));
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -3697,7 +3770,9 @@ int main(int argc, char* argv[]) {
|
|||
|
||||
switch(restoreType) {
|
||||
case RESTORE_START:
|
||||
f = stopAfter( runRestore(db, restoreClusterFileOrig, tagName, restoreContainer, backupKeys, restoreVersion, restoreTimestamp, !dryRun, !quietDisplay, waitForDone, addPrefix, removePrefix) );
|
||||
f = stopAfter(runRestore(db, restoreClusterFileOrig, tagName, restoreContainer, backupKeys,
|
||||
beginVersion, restoreVersion, restoreTimestamp, !dryRun, !quietDisplay,
|
||||
waitForDone, addPrefix, removePrefix, incrementalBackupOnly));
|
||||
break;
|
||||
case RESTORE_WAIT:
|
||||
f = stopAfter( success(ba.waitRestore(db, KeyRef(tagName), true)) );
|
||||
|
|
|
@ -286,11 +286,19 @@ public:
|
|||
// - submit a restore on the given tagName
|
||||
// - Optionally wait for the restore's completion. Will restore_error if restore fails or is aborted.
|
||||
// restore() will return the targetVersion which will be either the valid version passed in or the max restorable version for the given url.
|
||||
Future<Version> restore(Database cx, Optional<Database> cxOrig, Key tagName, Key url, Standalone<VectorRef<KeyRangeRef>> ranges, bool waitForComplete = true, Version targetVersion = -1, bool verbose = true, Key addPrefix = Key(), Key removePrefix = Key(), bool lockDB = true);
|
||||
Future<Version> restore(Database cx, Optional<Database> cxOrig, Key tagName, Key url, bool waitForComplete = true, Version targetVersion = -1, bool verbose = true, KeyRange range = normalKeys, Key addPrefix = Key(), Key removePrefix = Key(), bool lockDB = true) {
|
||||
Future<Version> restore(Database cx, Optional<Database> cxOrig, Key tagName, Key url,
|
||||
Standalone<VectorRef<KeyRangeRef>> ranges, bool waitForComplete = true,
|
||||
Version targetVersion = -1, bool verbose = true, Key addPrefix = Key(),
|
||||
Key removePrefix = Key(), bool lockDB = true, bool incrementalBackupOnly = false,
|
||||
Version beginVersion = -1);
|
||||
Future<Version> restore(Database cx, Optional<Database> cxOrig, Key tagName, Key url, bool waitForComplete = true,
|
||||
Version targetVersion = -1, bool verbose = true, KeyRange range = normalKeys,
|
||||
Key addPrefix = Key(), Key removePrefix = Key(), bool lockDB = true,
|
||||
bool incrementalBackupOnly = false, Version beginVersion = -1) {
|
||||
Standalone<VectorRef<KeyRangeRef>> rangeRef;
|
||||
rangeRef.push_back_deep(rangeRef.arena(), range);
|
||||
return restore(cx, cxOrig, tagName, url, rangeRef, waitForComplete, targetVersion, verbose, addPrefix, removePrefix, lockDB);
|
||||
return restore(cx, cxOrig, tagName, url, rangeRef, waitForComplete, targetVersion, verbose, addPrefix,
|
||||
removePrefix, lockDB, incrementalBackupOnly, beginVersion);
|
||||
}
|
||||
Future<Version> atomicRestore(Database cx, Key tagName, Standalone<VectorRef<KeyRangeRef>> ranges, Key addPrefix = Key(), Key removePrefix = Key());
|
||||
Future<Version> atomicRestore(Database cx, Key tagName, KeyRange range = normalKeys, Key addPrefix = Key(), Key removePrefix = Key()) {
|
||||
|
@ -315,13 +323,14 @@ public:
|
|||
|
||||
Future<Void> submitBackup(Reference<ReadYourWritesTransaction> tr, Key outContainer, int snapshotIntervalSeconds,
|
||||
std::string tagName, Standalone<VectorRef<KeyRangeRef>> backupRanges,
|
||||
bool stopWhenDone = true, bool partitionedLog = false);
|
||||
bool stopWhenDone = true, bool partitionedLog = false,
|
||||
bool incrementalBackupOnly = false);
|
||||
Future<Void> submitBackup(Database cx, Key outContainer, int snapshotIntervalSeconds, std::string tagName,
|
||||
Standalone<VectorRef<KeyRangeRef>> backupRanges, bool stopWhenDone = true,
|
||||
bool partitionedLog = false) {
|
||||
bool partitionedLog = false, bool incrementalBackupOnly = false) {
|
||||
return runRYWTransactionFailIfLocked(cx, [=](Reference<ReadYourWritesTransaction> tr) {
|
||||
return submitBackup(tr, outContainer, snapshotIntervalSeconds, tagName, backupRanges, stopWhenDone,
|
||||
partitionedLog);
|
||||
partitionedLog, incrementalBackupOnly);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -810,6 +819,11 @@ public:
|
|||
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
||||
}
|
||||
|
||||
// Set to true if only requesting incremental backup without base snapshot.
|
||||
KeyBackedProperty<bool> incrementalBackupOnly() {
|
||||
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
||||
}
|
||||
|
||||
// Latest version for which all prior versions have saved by backup workers.
|
||||
KeyBackedProperty<Version> latestBackupWorkerSavedVersion() {
|
||||
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
||||
|
@ -847,17 +861,25 @@ public:
|
|||
auto workerEnabled = backupWorkerEnabled().get(tr);
|
||||
auto plogEnabled = partitionedLogEnabled().get(tr);
|
||||
auto workerVersion = latestBackupWorkerSavedVersion().get(tr);
|
||||
return map(success(lastLog) && success(firstSnapshot) && success(workerEnabled) && success(plogEnabled) && success(workerVersion), [=](Void) -> Optional<Version> {
|
||||
// The latest log greater than the oldest snapshot is the restorable version
|
||||
Optional<Version> logVersion = workerEnabled.get().present() && workerEnabled.get().get() &&
|
||||
plogEnabled.get().present() && plogEnabled.get().get()
|
||||
? workerVersion.get()
|
||||
: lastLog.get();
|
||||
if (logVersion.present() && firstSnapshot.get().present() && logVersion.get() > firstSnapshot.get().get()) {
|
||||
return std::max(logVersion.get() - 1, firstSnapshot.get().get());
|
||||
}
|
||||
return {};
|
||||
});
|
||||
auto incrementalBackup = incrementalBackupOnly().get(tr);
|
||||
return map(success(lastLog) && success(firstSnapshot) && success(workerEnabled) && success(plogEnabled) &&
|
||||
success(workerVersion) && success(incrementalBackup),
|
||||
[=](Void) -> Optional<Version> {
|
||||
// The latest log greater than the oldest snapshot is the restorable version
|
||||
Optional<Version> logVersion = workerEnabled.get().present() && workerEnabled.get().get() &&
|
||||
plogEnabled.get().present() && plogEnabled.get().get()
|
||||
? workerVersion.get()
|
||||
: lastLog.get();
|
||||
if (logVersion.present() && firstSnapshot.get().present() &&
|
||||
logVersion.get() > firstSnapshot.get().get()) {
|
||||
return std::max(logVersion.get() - 1, firstSnapshot.get().get());
|
||||
}
|
||||
if (logVersion.present() && incrementalBackup.isReady() && incrementalBackup.get().present() &&
|
||||
incrementalBackup.get().get()) {
|
||||
return logVersion.get() - 1;
|
||||
}
|
||||
return {};
|
||||
});
|
||||
}
|
||||
|
||||
KeyBackedProperty<std::vector<KeyRange>> backupRanges() {
|
||||
|
|
|
@ -1365,7 +1365,32 @@ public:
|
|||
return getSnapshotFileKeyRange_impl(Reference<BackupContainerFileSystem>::addRef(this), file);
|
||||
}
|
||||
|
||||
ACTOR static Future<Optional<RestorableFileSet>> getRestoreSet_impl(Reference<BackupContainerFileSystem> bc, Version targetVersion) {
|
||||
static Optional<RestorableFileSet> getRestoreSetFromLogs(std::vector<LogFile> logs, Version targetVersion,
|
||||
RestorableFileSet restorable) {
|
||||
Version end = logs.begin()->endVersion;
|
||||
computeRestoreEndVersion(logs, &restorable.logs, &end, targetVersion);
|
||||
if (end >= targetVersion) {
|
||||
restorable.continuousBeginVersion = logs.begin()->beginVersion;
|
||||
restorable.continuousEndVersion = end;
|
||||
return Optional<RestorableFileSet>(restorable);
|
||||
}
|
||||
return Optional<RestorableFileSet>();
|
||||
}
|
||||
|
||||
ACTOR static Future<Optional<RestorableFileSet>> getRestoreSet_impl(Reference<BackupContainerFileSystem> bc,
|
||||
Version targetVersion, bool logsOnly,
|
||||
Version beginVersion) {
|
||||
if (logsOnly) {
|
||||
state RestorableFileSet restorableSet;
|
||||
state std::vector<LogFile> logFiles;
|
||||
Version begin = beginVersion == invalidVersion ? 0 : beginVersion;
|
||||
wait(store(logFiles, bc->listLogFiles(begin, targetVersion, false)));
|
||||
// List logs in version order so log continuity can be analyzed
|
||||
std::sort(logFiles.begin(), logFiles.end());
|
||||
if (!logFiles.empty()) {
|
||||
return getRestoreSetFromLogs(logFiles, targetVersion, restorableSet);
|
||||
}
|
||||
}
|
||||
// Find the most recent keyrange snapshot to end at or before targetVersion
|
||||
state Optional<KeyspaceSnapshotFile> snapshot;
|
||||
std::vector<KeyspaceSnapshotFile> snapshots = wait(bc->listKeyspaceSnapshots());
|
||||
|
@ -1436,21 +1461,17 @@ public:
|
|||
|
||||
// If there are logs and the first one starts at or before the snapshot begin version then proceed
|
||||
if(!logs.empty() && logs.front().beginVersion <= snapshot.get().beginVersion) {
|
||||
Version end = logs.begin()->endVersion;
|
||||
computeRestoreEndVersion(logs, &restorable.logs, &end, targetVersion);
|
||||
if (end >= targetVersion) {
|
||||
restorable.continuousBeginVersion = logs.begin()->beginVersion;
|
||||
restorable.continuousEndVersion = end;
|
||||
return Optional<RestorableFileSet>(restorable);
|
||||
}
|
||||
return getRestoreSetFromLogs(logs, targetVersion, restorable);
|
||||
}
|
||||
}
|
||||
|
||||
return Optional<RestorableFileSet>();
|
||||
}
|
||||
|
||||
Future<Optional<RestorableFileSet>> getRestoreSet(Version targetVersion) final {
|
||||
return getRestoreSet_impl(Reference<BackupContainerFileSystem>::addRef(this), targetVersion);
|
||||
Future<Optional<RestorableFileSet>> getRestoreSet(Version targetVersion, bool logsOnly,
|
||||
Version beginVersion) final {
|
||||
return getRestoreSet_impl(Reference<BackupContainerFileSystem>::addRef(this), targetVersion, logsOnly,
|
||||
beginVersion);
|
||||
}
|
||||
|
||||
private:
|
||||
|
|
|
@ -282,7 +282,8 @@ public:
|
|||
|
||||
// Get exactly the files necessary to restore to targetVersion. Returns non-present if
|
||||
// restore to given version is not possible.
|
||||
virtual Future<Optional<RestorableFileSet>> getRestoreSet(Version targetVersion) = 0;
|
||||
virtual Future<Optional<RestorableFileSet>> getRestoreSet(Version targetVersion, bool logsOnly = false,
|
||||
Version beginVersion = -1) = 0;
|
||||
|
||||
// Get an IBackupContainer based on a container spec string
|
||||
static Reference<IBackupContainer> openContainer(std::string url);
|
||||
|
|
|
@ -131,6 +131,9 @@ public:
|
|||
KeyBackedProperty<Key> removePrefix() {
|
||||
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
||||
}
|
||||
KeyBackedProperty<bool> incrementalBackupOnly() {
|
||||
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
||||
}
|
||||
// XXX: Remove restoreRange() once it is safe to remove. It has been changed to restoreRanges
|
||||
KeyBackedProperty<KeyRange> restoreRange() {
|
||||
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
||||
|
@ -141,6 +144,9 @@ public:
|
|||
KeyBackedProperty<Key> batchFuture() {
|
||||
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
||||
}
|
||||
KeyBackedProperty<Version> beginVersion() {
|
||||
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
||||
}
|
||||
KeyBackedProperty<Version> restoreVersion() {
|
||||
return configSpace.pack(LiteralStringRef(__FUNCTION__));
|
||||
}
|
||||
|
@ -2474,7 +2480,8 @@ namespace fileBackup {
|
|||
state Future<std::vector<KeyRange>> backupRangesFuture = config.backupRanges().getOrThrow(tr);
|
||||
state Future<Key> destUidValueFuture = config.destUidValue().getOrThrow(tr);
|
||||
state Future<Optional<bool>> partitionedLog = config.partitionedLogEnabled().get(tr);
|
||||
wait(success(backupRangesFuture) && success(destUidValueFuture) && success(partitionedLog));
|
||||
state Future<Optional<bool>> incrementalBackupOnly = config.incrementalBackupOnly().get(tr);
|
||||
wait(success(backupRangesFuture) && success(destUidValueFuture) && success(partitionedLog) && success(incrementalBackupOnly));
|
||||
std::vector<KeyRange> backupRanges = backupRangesFuture.get();
|
||||
Key destUidValue = destUidValueFuture.get();
|
||||
|
||||
|
@ -2494,7 +2501,10 @@ namespace fileBackup {
|
|||
wait(config.initNewSnapshot(tr, 0));
|
||||
|
||||
// Using priority 1 for both of these to at least start both tasks soon
|
||||
wait(success(BackupSnapshotDispatchTask::addTask(tr, taskBucket, task, 1, TaskCompletionKey::joinWith(backupFinished))));
|
||||
// Do not add snapshot task if we only want the incremental backup
|
||||
if (!incrementalBackupOnly.get().present() || !incrementalBackupOnly.get().get()) {
|
||||
wait(success(BackupSnapshotDispatchTask::addTask(tr, taskBucket, task, 1, TaskCompletionKey::joinWith(backupFinished))));
|
||||
}
|
||||
wait(success(BackupLogsDispatchTask::addTask(tr, taskBucket, task, 1, 0, beginVersion, TaskCompletionKey::joinWith(backupFinished))));
|
||||
|
||||
// If a clean stop is requested, the log and snapshot tasks will quit after the backup is restorable, then the following
|
||||
|
@ -3008,8 +3018,10 @@ namespace fileBackup {
|
|||
state int64_t remainingInBatch = Params.remainingInBatch().get(task);
|
||||
state bool addingToExistingBatch = remainingInBatch > 0;
|
||||
state Version restoreVersion;
|
||||
state Future<Optional<bool>> incrementalBackupOnly = restore.incrementalBackupOnly().get(tr);
|
||||
|
||||
wait(store(restoreVersion, restore.restoreVersion().getOrThrow(tr))
|
||||
&& success(incrementalBackupOnly)
|
||||
&& checkTaskVersion(tr->getDatabase(), task, name, version));
|
||||
|
||||
// If not adding to an existing batch then update the apply mutations end version so the mutations from the
|
||||
|
@ -3398,6 +3410,7 @@ namespace fileBackup {
|
|||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
state RestoreConfig restore(task);
|
||||
state Version restoreVersion;
|
||||
state Version beginVersion;
|
||||
state Reference<IBackupContainer> bc;
|
||||
|
||||
loop {
|
||||
|
@ -3408,6 +3421,8 @@ namespace fileBackup {
|
|||
wait(checkTaskVersion(tr->getDatabase(), task, name, version));
|
||||
Version _restoreVersion = wait(restore.restoreVersion().getOrThrow(tr));
|
||||
restoreVersion = _restoreVersion;
|
||||
Optional<Version> _beginVersion = wait(restore.beginVersion().get(tr));
|
||||
beginVersion = _beginVersion.present() ? _beginVersion.get() : invalidVersion;
|
||||
wait(taskBucket->keepRunning(tr, task));
|
||||
|
||||
ERestoreState oldState = wait(restore.stateEnum().getD(tr));
|
||||
|
@ -3447,14 +3462,21 @@ namespace fileBackup {
|
|||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
Optional<RestorableFileSet> restorable = wait(bc->getRestoreSet(restoreVersion));
|
||||
Optional<bool> _incremental = wait(restore.incrementalBackupOnly().get(tr));
|
||||
state bool incremental = _incremental.present() ? _incremental.get() : false;
|
||||
if (beginVersion == invalidVersion) {
|
||||
beginVersion = 0;
|
||||
}
|
||||
Optional<RestorableFileSet> restorable = wait(bc->getRestoreSet(restoreVersion, incremental, beginVersion));
|
||||
if (!incremental) {
|
||||
beginVersion = restorable.get().snapshot.beginVersion;
|
||||
}
|
||||
|
||||
if(!restorable.present())
|
||||
throw restore_missing_data();
|
||||
|
||||
// First version for which log data should be applied
|
||||
Params.firstVersion().set(task, restorable.get().snapshot.beginVersion);
|
||||
Params.firstVersion().set(task, beginVersion);
|
||||
|
||||
// Convert the two lists in restorable (logs and ranges) to a single list of RestoreFiles.
|
||||
// Order does not matter, they will be put in order when written to the restoreFileMap below.
|
||||
|
@ -3463,6 +3485,7 @@ namespace fileBackup {
|
|||
for(const RangeFile &f : restorable.get().ranges) {
|
||||
files.push_back({f.version, f.fileName, true, f.blockSize, f.fileSize});
|
||||
}
|
||||
|
||||
for(const LogFile &f : restorable.get().logs) {
|
||||
files.push_back({f.beginVersion, f.fileName, false, f.blockSize, f.fileSize, f.endVersion});
|
||||
}
|
||||
|
@ -3526,6 +3549,7 @@ namespace fileBackup {
|
|||
restore.stateEnum().set(tr, ERestoreState::RUNNING);
|
||||
|
||||
// Set applyMutation versions
|
||||
|
||||
restore.setApplyBeginVersion(tr, firstVersion);
|
||||
restore.setApplyEndVersion(tr, firstVersion);
|
||||
|
||||
|
@ -3533,6 +3557,14 @@ namespace fileBackup {
|
|||
wait(success(RestoreDispatchTaskFunc::addTask(tr, taskBucket, task, 0, "", 0, CLIENT_KNOBS->RESTORE_DISPATCH_BATCH_SIZE)));
|
||||
|
||||
wait(taskBucket->finish(tr, task));
|
||||
state Future<Optional<bool>> logsOnly = restore.incrementalBackupOnly().get(tr);
|
||||
wait(success(logsOnly));
|
||||
if (logsOnly.get().present() && logsOnly.get().get()) {
|
||||
// If this is an incremental restore, we need to set the applyMutationsMapPrefix
|
||||
// to the earliest log version so no mutations are missed
|
||||
Value versionEncoded = BinaryWriter::toValue(Params.firstVersion().get(task), Unversioned());
|
||||
wait(krmSetRange(tr, restore.applyMutationsMapPrefix(), normalKeys, versionEncoded));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -3760,7 +3792,7 @@ public:
|
|||
ACTOR static Future<Void> submitBackup(FileBackupAgent* backupAgent, Reference<ReadYourWritesTransaction> tr,
|
||||
Key outContainer, int snapshotIntervalSeconds, std::string tagName,
|
||||
Standalone<VectorRef<KeyRangeRef>> backupRanges, bool stopWhenDone,
|
||||
bool partitionedLog) {
|
||||
bool partitionedLog, bool incrementalBackupOnly) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr->setOption(FDBTransactionOptions::COMMIT_ON_FIRST_PROXY);
|
||||
|
@ -3863,13 +3895,17 @@ public:
|
|||
config.backupRanges().set(tr, normalizedRanges);
|
||||
config.snapshotIntervalSeconds().set(tr, snapshotIntervalSeconds);
|
||||
config.partitionedLogEnabled().set(tr, partitionedLog);
|
||||
config.incrementalBackupOnly().set(tr, incrementalBackupOnly);
|
||||
|
||||
Key taskKey = wait(fileBackup::StartFullBackupTaskFunc::addTask(tr, backupAgent->taskBucket, uid, TaskCompletionKey::noSignal()));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> submitRestore(FileBackupAgent* backupAgent, Reference<ReadYourWritesTransaction> tr, Key tagName, Key backupURL, Standalone<VectorRef<KeyRangeRef>> ranges, Version restoreVersion, Key addPrefix, Key removePrefix, bool lockDB, UID uid) {
|
||||
ACTOR static Future<Void> submitRestore(FileBackupAgent* backupAgent, Reference<ReadYourWritesTransaction> tr,
|
||||
Key tagName, Key backupURL, Standalone<VectorRef<KeyRangeRef>> ranges,
|
||||
Version restoreVersion, Key addPrefix, Key removePrefix, bool lockDB,
|
||||
bool incrementalBackupOnly, Version beginVersion, UID uid) {
|
||||
KeyRangeMap<int> restoreRangeSet;
|
||||
for (auto& range : ranges) {
|
||||
restoreRangeSet.insert(range, 1);
|
||||
|
@ -3917,7 +3953,7 @@ public:
|
|||
for (index = 0; index < restoreRanges.size(); index++) {
|
||||
KeyRange restoreIntoRange = KeyRangeRef(restoreRanges[index].begin, restoreRanges[index].end).removePrefix(removePrefix).withPrefix(addPrefix);
|
||||
Standalone<RangeResultRef> existingRows = wait(tr->getRange(restoreIntoRange, 1));
|
||||
if (existingRows.size() > 0) {
|
||||
if (existingRows.size() > 0 && !incrementalBackupOnly) {
|
||||
throw restore_destination_not_empty();
|
||||
}
|
||||
}
|
||||
|
@ -3934,6 +3970,8 @@ public:
|
|||
restore.sourceContainer().set(tr, bc);
|
||||
restore.stateEnum().set(tr, ERestoreState::QUEUED);
|
||||
restore.restoreVersion().set(tr, restoreVersion);
|
||||
restore.incrementalBackupOnly().set(tr, incrementalBackupOnly);
|
||||
restore.beginVersion().set(tr, beginVersion);
|
||||
if (BUGGIFY && restoreRanges.size() == 1) {
|
||||
restore.restoreRange().set(tr, restoreRanges[0]);
|
||||
}
|
||||
|
@ -4451,7 +4489,8 @@ public:
|
|||
ACTOR static Future<Version> restore(FileBackupAgent* backupAgent, Database cx, Optional<Database> cxOrig,
|
||||
Key tagName, Key url, Standalone<VectorRef<KeyRangeRef>> ranges,
|
||||
bool waitForComplete, Version targetVersion, bool verbose, Key addPrefix,
|
||||
Key removePrefix, bool lockDB, UID randomUid) {
|
||||
Key removePrefix, bool lockDB, bool incrementalBackupOnly,
|
||||
Version beginVersion, UID randomUid) {
|
||||
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(url.toString());
|
||||
|
||||
state BackupDescription desc = wait(bc->describeBackup());
|
||||
|
@ -4463,7 +4502,12 @@ public:
|
|||
if(targetVersion == invalidVersion && desc.maxRestorableVersion.present())
|
||||
targetVersion = desc.maxRestorableVersion.get();
|
||||
|
||||
Optional<RestorableFileSet> restoreSet = wait(bc->getRestoreSet(targetVersion));
|
||||
if (targetVersion == invalidVersion && incrementalBackupOnly && desc.contiguousLogEnd.present()) {
|
||||
targetVersion = desc.contiguousLogEnd.get() - 1;
|
||||
}
|
||||
|
||||
Optional<RestorableFileSet> restoreSet =
|
||||
wait(bc->getRestoreSet(targetVersion, incrementalBackupOnly, beginVersion));
|
||||
|
||||
if(!restoreSet.present()) {
|
||||
TraceEvent(SevWarn, "FileBackupAgentRestoreNotPossible")
|
||||
|
@ -4482,7 +4526,8 @@ public:
|
|||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
wait(submitRestore(backupAgent, tr, tagName, url, ranges, targetVersion, addPrefix, removePrefix, lockDB, randomUid));
|
||||
wait(submitRestore(backupAgent, tr, tagName, url, ranges, targetVersion, addPrefix, removePrefix,
|
||||
lockDB, incrementalBackupOnly, beginVersion, randomUid));
|
||||
wait(tr->commit());
|
||||
break;
|
||||
} catch(Error &e) {
|
||||
|
@ -4619,7 +4664,7 @@ public:
|
|||
} else {
|
||||
TraceEvent("AS_StartRestore");
|
||||
Version ver = wait(restore(backupAgent, cx, cx, tagName, KeyRef(bc->getURL()), ranges, true, -1, true,
|
||||
addPrefix, removePrefix, true, randomUid));
|
||||
addPrefix, removePrefix, true, false, invalidVersion, randomUid));
|
||||
return ver;
|
||||
}
|
||||
}
|
||||
|
@ -4656,8 +4701,13 @@ Future<Void> FileBackupAgent::atomicParallelRestore(Database cx, Key tagName, St
|
|||
return FileBackupAgentImpl::atomicParallelRestore(this, cx, tagName, ranges, addPrefix, removePrefix);
|
||||
}
|
||||
|
||||
Future<Version> FileBackupAgent::restore(Database cx, Optional<Database> cxOrig, Key tagName, Key url, Standalone<VectorRef<KeyRangeRef>> ranges, bool waitForComplete, Version targetVersion, bool verbose, Key addPrefix, Key removePrefix, bool lockDB) {
|
||||
return FileBackupAgentImpl::restore(this, cx, cxOrig, tagName, url, ranges, waitForComplete, targetVersion, verbose, addPrefix, removePrefix, lockDB, deterministicRandom()->randomUniqueID());
|
||||
Future<Version> FileBackupAgent::restore(Database cx, Optional<Database> cxOrig, Key tagName, Key url,
|
||||
Standalone<VectorRef<KeyRangeRef>> ranges, bool waitForComplete,
|
||||
Version targetVersion, bool verbose, Key addPrefix, Key removePrefix,
|
||||
bool lockDB, bool incrementalBackupOnly, Version beginVersion) {
|
||||
return FileBackupAgentImpl::restore(this, cx, cxOrig, tagName, url, ranges, waitForComplete, targetVersion, verbose,
|
||||
addPrefix, removePrefix, lockDB, incrementalBackupOnly, beginVersion,
|
||||
deterministicRandom()->randomUniqueID());
|
||||
}
|
||||
|
||||
Future<Version> FileBackupAgent::atomicRestore(Database cx, Key tagName, Standalone<VectorRef<KeyRangeRef>> ranges, Key addPrefix, Key removePrefix) {
|
||||
|
@ -4683,9 +4733,9 @@ Future<ERestoreState> FileBackupAgent::waitRestore(Database cx, Key tagName, boo
|
|||
Future<Void> FileBackupAgent::submitBackup(Reference<ReadYourWritesTransaction> tr, Key outContainer,
|
||||
int snapshotIntervalSeconds, std::string tagName,
|
||||
Standalone<VectorRef<KeyRangeRef>> backupRanges, bool stopWhenDone,
|
||||
bool partitionedLog) {
|
||||
bool partitionedLog, bool incrementalBackupOnly) {
|
||||
return FileBackupAgentImpl::submitBackup(this, tr, outContainer, snapshotIntervalSeconds, tagName, backupRanges,
|
||||
stopWhenDone, partitionedLog);
|
||||
stopWhenDone, partitionedLog, incrementalBackupOnly);
|
||||
}
|
||||
|
||||
Future<Void> FileBackupAgent::discontinueBackup(Reference<ReadYourWritesTransaction> tr, Key tagName){
|
||||
|
|
|
@ -27,8 +27,6 @@
|
|||
|
||||
class ClientKnobs : public Knobs {
|
||||
public:
|
||||
int BYTE_LIMIT_UNLIMITED;
|
||||
int ROW_LIMIT_UNLIMITED;
|
||||
|
||||
int TOO_MANY; // FIXME: this should really be split up so we can control these more specifically
|
||||
|
||||
|
|
|
@ -354,7 +354,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
|
|||
"busy_read" : 0,
|
||||
"busy_write" : 0,
|
||||
"count" : 0,
|
||||
"is_recommended": 0
|
||||
"recommended_only": 0
|
||||
},
|
||||
"manual" : {
|
||||
"count" : 0
|
||||
|
|
|
@ -152,6 +152,7 @@ set(FDBSERVER_SRCS
|
|||
workloads/Fuzz.cpp
|
||||
workloads/FuzzApiCorrectness.actor.cpp
|
||||
workloads/HealthMetricsApi.actor.cpp
|
||||
workloads/IncrementalBackup.actor.cpp
|
||||
workloads/Increment.actor.cpp
|
||||
workloads/IndexScan.actor.cpp
|
||||
workloads/Inventory.actor.cpp
|
||||
|
|
|
@ -444,6 +444,7 @@ ACTOR static Future<Void> loadFilesOnLoaders(Reference<ControllerBatchData> batc
|
|||
: std::min(versionBatch.endVersion, request.targetVersion + 1);
|
||||
param.asset.addPrefix = request.addPrefix;
|
||||
param.asset.removePrefix = request.removePrefix;
|
||||
param.asset.batchIndex = batchIndex;
|
||||
|
||||
TraceEvent("FastRestoreControllerPhaseLoadFiles")
|
||||
.detail("BatchIndex", batchIndex)
|
||||
|
|
|
@ -178,10 +178,15 @@ ACTOR Future<Void> traceRoleVersionBatchProgress(Reference<RestoreRoleData> self
|
|||
loop {
|
||||
int batchIndex = self->finishedBatch.get();
|
||||
int maxBatchIndex = self->versionBatchId.get();
|
||||
int maxPrintBatchIndex = batchIndex + SERVER_KNOBS->FASTRESTORE_VB_PARALLELISM;
|
||||
|
||||
TraceEvent ev("FastRestoreVersionBatchProgressState", self->nodeID);
|
||||
ev.detail("Role", role).detail("Node", self->nodeID).detail("FinishedBatch", batchIndex).detail("InitializedBatch", maxBatchIndex);
|
||||
while (batchIndex <= maxBatchIndex) {
|
||||
if (batchIndex > maxPrintBatchIndex) {
|
||||
ev.detail("SkipVersionBatches", maxBatchIndex - batchIndex + 1);
|
||||
break;
|
||||
}
|
||||
std::stringstream typeName;
|
||||
typeName << "VersionBatch" << batchIndex;
|
||||
ev.detail(typeName.str(), self->getVersionBatchState(batchIndex));
|
||||
|
|
|
@ -1632,7 +1632,7 @@ static Future<vector<std::pair<iface, EventMap>>> getServerMetrics(vector<iface>
|
|||
ACTOR template <class iface>
|
||||
static Future<vector<TraceEventFields>> getServerBusiestWriteTags(vector<iface> servers, std::unordered_map<NetworkAddress, WorkerInterface> address_workers, WorkerDetails rkWorker) {
|
||||
state vector<Future<Optional<TraceEventFields>>> futures;
|
||||
for (auto s : servers) {
|
||||
for (const auto& s : servers) {
|
||||
futures.push_back(latestEventOnWorker(rkWorker.interf, s.id().toString() + "/BusiestWriteTag"));
|
||||
}
|
||||
wait(waitForAll(futures));
|
||||
|
@ -1795,6 +1795,7 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
|
|||
txnSystemPriorityStartOut.updateValues(StatusCounter(gps.getValue("TxnSystemPriorityStartOut")));
|
||||
txnDefaultPriorityStartOut.updateValues(StatusCounter(gps.getValue("TxnDefaultPriorityStartOut")));
|
||||
txnBatchPriorityStartOut.updateValues(StatusCounter(gps.getValue("TxnBatchPriorityStartOut")));
|
||||
txnMemoryErrors.updateValues(StatusCounter(gps.getValue("TxnRequestErrors")));
|
||||
}
|
||||
|
||||
for (auto &ps : proxyStats) {
|
||||
|
@ -1803,7 +1804,6 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
|
|||
txnConflicts.updateValues( StatusCounter(ps.getValue("TxnConflicts")) );
|
||||
txnCommitOutSuccess.updateValues( StatusCounter(ps.getValue("TxnCommitOutSuccess")) );
|
||||
txnKeyLocationOut.updateValues( StatusCounter(ps.getValue("KeyServerLocationOut")) );
|
||||
txnMemoryErrors.updateValues( StatusCounter(ps.getValue("TxnRequestErrors")) );
|
||||
txnMemoryErrors.updateValues( StatusCounter(ps.getValue("KeyServerLocationErrors")) );
|
||||
txnMemoryErrors.updateValues( StatusCounter(ps.getValue("TxnCommitErrors")) );
|
||||
}
|
||||
|
@ -1876,10 +1876,10 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
|
|||
autoThrottledTagsObj["busy_read"] = autoThrottledTagsBusyRead;
|
||||
autoThrottledTagsObj["busy_write"] = autoThrottledTagsBusyWrite;
|
||||
if(autoThrottlingEnabled) {
|
||||
autoThrottledTagsObj["is_recommended"] = 0;
|
||||
autoThrottledTagsObj["recommended_only"] = 0;
|
||||
}
|
||||
else {
|
||||
autoThrottledTagsObj["is_recommended"] = 1;
|
||||
autoThrottledTagsObj["recommended_only"] = 1;
|
||||
}
|
||||
|
||||
throttledTagsObj["auto"] = autoThrottledTagsObj;
|
||||
|
|
|
@ -1235,7 +1235,7 @@ ACTOR Future<Void> fetchKeys( StorageCacheData *data, AddingCacheRange* cacheRan
|
|||
try {
|
||||
TEST(true); // Fetching keys for transferred cacheRange
|
||||
|
||||
state Standalone<RangeResultRef> this_block = wait( tryFetchRange( data->cx, fetchVersion, keys, GetRangeLimits( CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, fetchBlockBytes ), &isTooOld ) );
|
||||
state Standalone<RangeResultRef> this_block = wait( tryFetchRange( data->cx, fetchVersion, keys, GetRangeLimits( GetRangeLimits::ROW_LIMIT_UNLIMITED, fetchBlockBytes ), &isTooOld ) );
|
||||
|
||||
state int expectedSize = (int)this_block.expectedSize() + (8-(int)sizeof(KeyValueRef))*this_block.size();
|
||||
|
||||
|
|
|
@ -61,7 +61,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
shareLogRange = getOption(options, LiteralStringRef("shareLogRange"), false);
|
||||
prefixesMandatory = getOption(options, LiteralStringRef("prefixesMandatory"), std::vector<std::string>());
|
||||
shouldSkipRestoreRanges = deterministicRandom()->random01() < 0.3 ? true : false;
|
||||
|
||||
|
||||
TraceEvent("BARW_ClientId").detail("Id", wcx.clientId);
|
||||
UID randomID = nondeterministicRandom()->randomUniqueID();
|
||||
TraceEvent("BARW_PerformRestore", randomID).detail("Value", performRestore);
|
||||
|
@ -225,7 +225,8 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
state Future<Void> status = statusLoop(cx, tag.toString());
|
||||
|
||||
try {
|
||||
wait(backupAgent->submitBackup(cx, StringRef(backupContainer), deterministicRandom()->randomInt(0, 100), tag.toString(), backupRanges, stopDifferentialDelay ? false : true));
|
||||
wait(backupAgent->submitBackup(cx, StringRef(backupContainer), deterministicRandom()->randomInt(0, 100),
|
||||
tag.toString(), backupRanges, stopDifferentialDelay ? false : true));
|
||||
}
|
||||
catch (Error& e) {
|
||||
TraceEvent("BARW_DoBackupSubmitBackupException", randomID).error(e).detail("Tag", printable(tag));
|
||||
|
@ -348,7 +349,8 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
// Try doing a restore without clearing the keys
|
||||
if (rowCount > 0) {
|
||||
try {
|
||||
wait(success(backupAgent->restore(cx, cx, self->backupTag, KeyRef(lastBackupContainer), true, -1, true, normalKeys, Key(), Key(), self->locked)));
|
||||
wait(success(backupAgent->restore(cx, cx, self->backupTag, KeyRef(lastBackupContainer), true, -1, true,
|
||||
normalKeys, Key(), Key(), self->locked)));
|
||||
TraceEvent(SevError, "BARW_RestoreAllowedOverwrittingDatabase", randomID);
|
||||
ASSERT(false);
|
||||
}
|
||||
|
@ -418,7 +420,9 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
if (!self->locked && BUGGIFY) {
|
||||
TraceEvent("BARW_SubmitBackup2", randomID).detail("Tag", printable(self->backupTag));
|
||||
try {
|
||||
extraBackup = backupAgent.submitBackup(cx, LiteralStringRef("file://simfdb/backups/"), deterministicRandom()->randomInt(0, 100), self->backupTag.toString(), self->backupRanges, true);
|
||||
extraBackup = backupAgent.submitBackup(
|
||||
cx, LiteralStringRef("file://simfdb/backups/"), deterministicRandom()->randomInt(0, 100),
|
||||
self->backupTag.toString(), self->backupRanges, true);
|
||||
}
|
||||
catch (Error& e) {
|
||||
TraceEvent("BARW_SubmitBackup2Exception", randomID).error(e).detail("BackupTag", printable(self->backupTag));
|
||||
|
@ -473,7 +477,9 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
printf("BackupCorrectness, restore for each range: backupAgent.restore is called for "
|
||||
"restoreIndex:%d tag:%s ranges:%s\n",
|
||||
restoreIndex, range.toString().c_str(), restoreTag.toString().c_str());
|
||||
restores.push_back(backupAgent.restore(cx, cx, restoreTag, KeyRef(lastBackupContainer->getURL()), true, targetVersion, true, range, Key(), Key(), self->locked));
|
||||
restores.push_back(
|
||||
backupAgent.restore(cx, cx, restoreTag, KeyRef(lastBackupContainer->getURL()), true,
|
||||
targetVersion, true, range, Key(), Key(), self->locked));
|
||||
}
|
||||
}
|
||||
else {
|
||||
|
@ -482,7 +488,9 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
restoreTags.push_back(restoreTag);
|
||||
printf("BackupCorrectness, backupAgent.restore is called for restoreIndex:%d tag:%s\n",
|
||||
restoreIndex, restoreTag.toString().c_str());
|
||||
restores.push_back(backupAgent.restore(cx, cx, restoreTag, KeyRef(lastBackupContainer->getURL()), self->restoreRanges, true, targetVersion, true, Key(), Key(), self->locked));
|
||||
restores.push_back(backupAgent.restore(cx, cx, restoreTag, KeyRef(lastBackupContainer->getURL()),
|
||||
self->restoreRanges, true, targetVersion, true, Key(), Key(),
|
||||
self->locked));
|
||||
}
|
||||
|
||||
// Sometimes kill and restart the restore
|
||||
|
@ -498,7 +506,9 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
tr->clear(range);
|
||||
return Void();
|
||||
}));
|
||||
restores[restoreIndex] = backupAgent.restore(cx, cx, restoreTags[restoreIndex], KeyRef(lastBackupContainer->getURL()), self->restoreRanges, true, -1, true, Key(), Key(), self->locked);
|
||||
restores[restoreIndex] = backupAgent.restore(
|
||||
cx, cx, restoreTags[restoreIndex], KeyRef(lastBackupContainer->getURL()),
|
||||
self->restoreRanges, true, -1, true, Key(), Key(), self->locked);
|
||||
}
|
||||
}
|
||||
else {
|
||||
|
|
|
@ -0,0 +1,89 @@
|
|||
/*
|
||||
* IncrementalBackup.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbclient/BackupAgent.actor.h"
|
||||
#include "fdbclient/BackupContainer.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
struct IncrementalBackupWorkload : TestWorkload {
|
||||
|
||||
Standalone<StringRef> backupDir;
|
||||
Standalone<StringRef> tag;
|
||||
FileBackupAgent backupAgent;
|
||||
bool submitOnly;
|
||||
bool restoreOnly;
|
||||
|
||||
IncrementalBackupWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
backupDir = getOption(options, LiteralStringRef("backupDir"), LiteralStringRef("file://simfdb/backups/"));
|
||||
tag = getOption(options, LiteralStringRef("tag"), LiteralStringRef("default"));
|
||||
submitOnly = getOption(options, LiteralStringRef("submitOnly"), false);
|
||||
restoreOnly = getOption(options, LiteralStringRef("restoreOnly"), false);
|
||||
}
|
||||
|
||||
virtual std::string description() { return "IncrementalBackup"; }
|
||||
|
||||
virtual Future<Void> setup(Database const& cx) { return Void(); }
|
||||
|
||||
virtual Future<Void> start(Database const& cx) {
|
||||
if (clientId) {
|
||||
return Void();
|
||||
}
|
||||
return _start(cx, this);
|
||||
}
|
||||
|
||||
virtual Future<bool> check(Database const& cx) { return true; }
|
||||
|
||||
ACTOR static Future<Void> _start(Database cx, IncrementalBackupWorkload* self) {
|
||||
// Add a commit both before the submit and restore to test that incremental backup
|
||||
// can be performed on non-empty database
|
||||
if (self->submitOnly) {
|
||||
Standalone<VectorRef<KeyRangeRef>> backupRanges;
|
||||
backupRanges.push_back_deep(backupRanges.arena(), normalKeys);
|
||||
TraceEvent("IBackupSubmitAttempt");
|
||||
try {
|
||||
wait(self->backupAgent.submitBackup(cx, self->backupDir, 1e8, self->tag.toString(), backupRanges, false,
|
||||
false, true));
|
||||
} catch (Error& e) {
|
||||
if (e.code() != error_code_backup_duplicate) {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
TraceEvent("IBackupSubmitSuccess");
|
||||
}
|
||||
if (self->restoreOnly) {
|
||||
state Reference<IBackupContainer> backupContainer;
|
||||
state UID backupUID;
|
||||
TraceEvent("IBackupRestoreAttempt");
|
||||
wait(success(self->backupAgent.waitBackup(cx, self->tag.toString(), false, &backupContainer, &backupUID)));
|
||||
// TODO: add testing scenario for atomics and beginVersion
|
||||
wait(success(self->backupAgent.restore(cx, cx, Key(self->tag.toString()), Key(backupContainer->getURL()),
|
||||
true, -1, true, normalKeys, Key(), Key(), true, true)));
|
||||
TraceEvent("IBackupRestoreSuccess");
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
virtual void getMetrics(vector<PerfMetric>& m) {}
|
||||
};
|
||||
|
||||
WorkloadFactory<IncrementalBackupWorkload> IncrementalBackupWorkloadFactory("IncrementalBackup");
|
|
@ -26,7 +26,6 @@
|
|||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
struct TagThrottleApiWorkload : TestWorkload {
|
||||
int apiVersion;
|
||||
bool autoThrottleEnabled;
|
||||
double testDuration;
|
||||
|
||||
|
@ -45,20 +44,6 @@ struct TagThrottleApiWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
virtual Future<Void> start(Database const& cx) {
|
||||
// choose a version to check compatibility.
|
||||
double choice = deterministicRandom()->random01();
|
||||
if(choice < 0.3) {
|
||||
apiVersion = 630;
|
||||
}
|
||||
else if(choice < 0.7){
|
||||
apiVersion = 700;
|
||||
}
|
||||
else {
|
||||
apiVersion = Database::API_VERSION_LATEST;
|
||||
}
|
||||
TraceEvent("VersionStampApiVersion").detail("ApiVersion", apiVersion);
|
||||
cx->apiVersion = apiVersion;
|
||||
|
||||
if (this->clientId != 0) return Void();
|
||||
return timeout(runThrottleApi(this, cx), testDuration, Void());
|
||||
}
|
||||
|
@ -164,10 +149,6 @@ struct TagThrottleApiWorkload : TestWorkload {
|
|||
else if(tag.expirationTime > now()) {
|
||||
++activeAutoThrottledTags;
|
||||
}
|
||||
|
||||
if(self->apiVersion == 630) {
|
||||
ASSERT(tag.reason == TagThrottledReason::UNSET);
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(manualThrottledTags <= SERVER_KNOBS->MAX_MANUAL_THROTTLED_TRANSACTION_TAGS);
|
||||
|
@ -191,9 +172,6 @@ struct TagThrottleApiWorkload : TestWorkload {
|
|||
|
||||
for(auto& tag : tags) {
|
||||
ASSERT(tag.throttleType == TagThrottleType::AUTO);
|
||||
if(self->apiVersion == 630) {
|
||||
ASSERT(tag.reason == TagThrottledReason::UNSET);
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -120,6 +120,7 @@ if(WITH_PYTHON)
|
|||
add_fdb_test(TEST_FILES fast/CycleTest.toml)
|
||||
add_fdb_test(TEST_FILES fast/FuzzApiCorrectness.toml)
|
||||
add_fdb_test(TEST_FILES fast/FuzzApiCorrectnessClean.toml)
|
||||
add_fdb_test(TEST_FILES fast/IncrementalBackup.toml)
|
||||
add_fdb_test(TEST_FILES fast/IncrementTest.toml)
|
||||
add_fdb_test(TEST_FILES fast/InventoryTestAlmostReadOnly.toml)
|
||||
add_fdb_test(TEST_FILES fast/InventoryTestSomeWrites.toml)
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
[[test]]
|
||||
testTitle = 'SubmitBackup'
|
||||
simBackupAgents = 'BackupToFile'
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'IncrementalBackup'
|
||||
tag = 'default'
|
||||
submitOnly = true
|
||||
|
||||
[[test]]
|
||||
testTitle = 'CycleTest'
|
||||
clearAfterTest = true
|
||||
simBackupAgents = 'BackupToFile'
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'Cycle'
|
||||
nodeCount = 3000
|
||||
transactionsPerSecond = 3000.0
|
||||
testDuration = 120.0
|
||||
expectedRate = 0
|
||||
|
||||
[[test]]
|
||||
testTitle = 'SubmitRestore'
|
||||
clearAfterTest = false
|
||||
simBackupAgents = 'BackupToFile'
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'IncrementalBackup'
|
||||
tag = 'default'
|
||||
restoreOnly = true
|
||||
|
||||
|
||||
[[test]]
|
||||
testTitle = 'VerifyCycle'
|
||||
checkOnly = true
|
||||
|
||||
[[test.workload]]
|
||||
testName = 'Cycle'
|
||||
nodeCount = 3000
|
||||
transactionsPerSecond = 3000.0
|
||||
testDuration = 10.0
|
||||
expectedRate = 0
|
Loading…
Reference in New Issue