Merge remote-tracking branch 'upstream/master' into backup-agent

This commit is contained in:
RenxuanW 2021-04-20 22:54:47 -07:00
commit ba60f18ebf
21 changed files with 231 additions and 116 deletions

View File

@ -37,6 +37,7 @@ RUN sed -i -e '/enabled/d' /etc/yum.repos.d/CentOS-Base.repo && \
lz4-devel \
lz4-static \
mono-devel \
redhat-lsb-core \
rpm-build \
tcl-devel \
unzip \

View File

@ -34,6 +34,7 @@ RUN rpmkeys --import mono-project.com.rpmkey.pgp && \
lz4-devel \
lz4-static \
mono-devel \
redhat-lsb-core \
rpm-build \
tcl-devel \
unzip \

View File

@ -244,6 +244,9 @@ The ``start`` subcommand is used to start a backup. If there is already a backu
``-s <DURATION>`` or ``--snapshot_interval <DURATION>``
Specifies the duration, in seconds, of the inconsistent snapshots written to the backup in continuous mode. The default is 864000 which is 10 days.
``--initial_snapshot_interval <DURATION>``
Specifies the duration, in seconds, of the first inconsistent snapshot written to the backup. The default is 0, which means as fast as possible.
``--partitioned_log_experimental``
Specifies the backup uses the partitioned mutation logs generated by backup workers. Since FDB version 6.3, this option is experimental and requires using fast restore for restoring the database from the generated files. The default is to use non-partitioned mutation logs generated by backup agents.

View File

@ -315,7 +315,7 @@ and pass the test with ``-f``:
.. code-block:: sh
fdbserver -r simulator -f testfile.txt
fdbserver -r simulation -f testfile.txt
Running a Workload on an actual Cluster

View File

@ -106,6 +106,7 @@ enum {
// Backup constants
OPT_DESTCONTAINER,
OPT_SNAPSHOTINTERVAL,
OPT_INITIAL_SNAPSHOT_INTERVAL,
OPT_ERRORLIMIT,
OPT_NOSTOPWHENDONE,
OPT_EXPIRE_BEFORE_VERSION,
@ -233,6 +234,7 @@ CSimpleOpt::SOption g_rgBackupStartOptions[] = {
{ OPT_USE_PARTITIONED_LOG, "--partitioned_log_experimental", SO_NONE },
{ OPT_SNAPSHOTINTERVAL, "-s", SO_REQ_SEP },
{ OPT_SNAPSHOTINTERVAL, "--snapshot_interval", SO_REQ_SEP },
{ OPT_INITIAL_SNAPSHOT_INTERVAL, "--initial_snapshot_interval", SO_REQ_SEP },
{ OPT_TAGNAME, "-t", SO_REQ_SEP },
{ OPT_TAGNAME, "--tagname", SO_REQ_SEP },
{ OPT_BACKUPKEYS, "-k", SO_REQ_SEP },
@ -1880,6 +1882,7 @@ ACTOR Future<Void> submitDBBackup(Database src,
ACTOR Future<Void> submitBackup(Database db,
std::string url,
int initialSnapshotIntervalSeconds,
int snapshotIntervalSeconds,
Standalone<VectorRef<KeyRangeRef>> backupRanges,
std::string tagName,
@ -1936,6 +1939,7 @@ ACTOR Future<Void> submitBackup(Database db,
else {
wait(backupAgent.submitBackup(db,
KeyRef(url),
initialSnapshotIntervalSeconds,
snapshotIntervalSeconds,
tagName,
backupRanges,
@ -3213,6 +3217,8 @@ int main(int argc, char* argv[]) {
std::string destinationContainer;
bool describeDeep = false;
bool describeTimestamps = false;
int initialSnapshotIntervalSeconds =
0; // The initial snapshot has a desired duration of 0, meaning go as fast as possible.
int snapshotIntervalSeconds = CLIENT_KNOBS->BACKUP_DEFAULT_SNAPSHOT_INTERVAL_SEC;
std::string clusterFile;
std::string sourceClusterFile;
@ -3467,6 +3473,7 @@ int main(int argc, char* argv[]) {
modifyOptions.destURL = destinationContainer;
break;
case OPT_SNAPSHOTINTERVAL:
case OPT_INITIAL_SNAPSHOT_INTERVAL:
case OPT_MOD_ACTIVE_INTERVAL: {
const char* a = args->OptionArg();
int seconds;
@ -3478,6 +3485,8 @@ int main(int argc, char* argv[]) {
if (optId == OPT_SNAPSHOTINTERVAL) {
snapshotIntervalSeconds = seconds;
modifyOptions.snapshotIntervalSeconds = seconds;
} else if (optId == OPT_INITIAL_SNAPSHOT_INTERVAL) {
initialSnapshotIntervalSeconds = seconds;
} else if (optId == OPT_MOD_ACTIVE_INTERVAL) {
modifyOptions.activeSnapshotIntervalSeconds = seconds;
}
@ -3837,6 +3846,7 @@ int main(int argc, char* argv[]) {
openBackupContainer(argv[0], destinationContainer);
f = stopAfter(submitBackup(db,
destinationContainer,
initialSnapshotIntervalSeconds,
snapshotIntervalSeconds,
backupKeys,
tagName,

View File

@ -357,6 +357,7 @@ public:
Future<Void> submitBackup(Reference<ReadYourWritesTransaction> tr,
Key outContainer,
int initialSnapshotIntervalSeconds,
int snapshotIntervalSeconds,
std::string tagName,
Standalone<VectorRef<KeyRangeRef>> backupRanges,
@ -365,6 +366,7 @@ public:
bool incrementalBackupOnly = false);
Future<Void> submitBackup(Database cx,
Key outContainer,
int initialSnapshotIntervalSeconds,
int snapshotIntervalSeconds,
std::string tagName,
Standalone<VectorRef<KeyRangeRef>> backupRanges,
@ -374,6 +376,7 @@ public:
return runRYWTransactionFailIfLocked(cx, [=](Reference<ReadYourWritesTransaction> tr) {
return submitBackup(tr,
outContainer,
initialSnapshotIntervalSeconds,
snapshotIntervalSeconds,
tagName,
backupRanges,
@ -404,7 +407,8 @@ public:
Future<std::string> getStatus(Database cx, bool showErrors, std::string tagName);
Future<std::string> getStatusJSON(Database cx, std::string tagName);
Future<Optional<Version>> getLastRestorable(Reference<ReadYourWritesTransaction> tr, Key tagName,
Future<Optional<Version>> getLastRestorable(Reference<ReadYourWritesTransaction> tr,
Key tagName,
bool snapshot = false);
void setLastRestorable(Reference<ReadYourWritesTransaction> tr, Key tagName, Version version);
@ -488,6 +492,14 @@ public:
[=](Reference<ReadYourWritesTransaction> tr) { return unlockBackup(tr, tagName); });
}
// Specifies the action to take on the backup's destination key range
// before the backup begins.
enum PreBackupAction {
NONE = 0, // No action is taken
VERIFY = 1, // Verify the key range being restored to is empty.
CLEAR = 2 // Clear the key range being restored to.
};
Future<Void> submitBackup(Reference<ReadYourWritesTransaction> tr,
Key tagName,
Standalone<VectorRef<KeyRangeRef>> backupRanges,
@ -495,7 +507,7 @@ public:
Key addPrefix = StringRef(),
Key removePrefix = StringRef(),
bool lockDatabase = false,
bool databasesInSync = false);
PreBackupAction backupAction = PreBackupAction::VERIFY);
Future<Void> submitBackup(Database cx,
Key tagName,
Standalone<VectorRef<KeyRangeRef>> backupRanges,
@ -503,10 +515,10 @@ public:
Key addPrefix = StringRef(),
Key removePrefix = StringRef(),
bool lockDatabase = false,
bool databasesInSync = false) {
PreBackupAction backupAction = PreBackupAction::VERIFY) {
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) {
return submitBackup(
tr, tagName, backupRanges, stopWhenDone, addPrefix, removePrefix, lockDatabase, databasesInSync);
tr, tagName, backupRanges, stopWhenDone, addPrefix, removePrefix, lockDatabase, backupAction);
});
}
@ -835,6 +847,11 @@ public:
typedef KeyBackedMap<Key, bool> RangeDispatchMapT;
RangeDispatchMapT snapshotRangeDispatchMap() { return configSpace.pack(LiteralStringRef(__FUNCTION__)); }
// Interval to use for the first (initial) snapshot.
KeyBackedProperty<int64_t> initialSnapshotIntervalSeconds() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
// Interval to use for determining the target end version for new snapshots
KeyBackedProperty<int64_t> snapshotIntervalSeconds() { return configSpace.pack(LiteralStringRef(__FUNCTION__)); }
@ -864,8 +881,9 @@ public:
Future<Version> beginVersion = tr->getReadVersion();
Future<int64_t> defaultInterval = 0;
if (intervalSeconds < 0)
if (intervalSeconds < 0) {
defaultInterval = copy.snapshotIntervalSeconds().getOrThrow(tr);
}
// Make sure read version and possibly the snapshot interval value are ready, then clear/init the snapshot
// config members

View File

@ -743,6 +743,9 @@ ACTOR Future<Void> applyMutations(Database cx,
wait(coalesceKeyVersionCache(
uid, newEndVersion, keyVersion, commit, committedVersion, addActor, &commitLock));
beginVersion = newEndVersion;
if (BUGGIFY) {
wait(delay(2.0));
}
}
} catch (Error& e) {
TraceEvent(e.code() == error_code_restore_missing_data ? SevWarnAlways : SevError, "ApplyMutationsError")

View File

@ -1072,7 +1072,7 @@ struct CopyLogsTaskFunc : TaskFuncBase {
wait(waitForAll(addTaskVector) && taskBucket->finish(tr, task));
} else {
if (appliedVersion <= stopVersionData) {
if (appliedVersion < applyVersion) {
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
wait(success(CopyLogsTaskFunc::addTask(
tr, taskBucket, task, prevBeginVersion, beginVersion, TaskCompletionKey::signal(onDone))));
@ -2243,17 +2243,18 @@ struct StartFullBackupTaskFunc : TaskFuncBase {
return Void();
}
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr,
Reference<TaskBucket> taskBucket,
Key logUid,
Key backupUid,
Key keyAddPrefix,
Key keyRemovePrefix,
Key keyConfigBackupRanges,
Key tagName,
TaskCompletionKey completionKey,
Reference<TaskFuture> waitFor = Reference<TaskFuture>(),
bool databasesInSync = false) {
ACTOR static Future<Key> addTask(
Reference<ReadYourWritesTransaction> tr,
Reference<TaskBucket> taskBucket,
Key logUid,
Key backupUid,
Key keyAddPrefix,
Key keyRemovePrefix,
Key keyConfigBackupRanges,
Key tagName,
TaskCompletionKey completionKey,
Reference<TaskFuture> waitFor = Reference<TaskFuture>(),
DatabaseBackupAgent::PreBackupAction backupAction = DatabaseBackupAgent::PreBackupAction::VERIFY) {
Key doneKey = wait(completionKey.get(tr, taskBucket));
auto task = makeReference<Task>(StartFullBackupTaskFunc::name, StartFullBackupTaskFunc::version, doneKey);
@ -2264,7 +2265,7 @@ struct StartFullBackupTaskFunc : TaskFuncBase {
task->params[BackupAgentBase::keyConfigBackupRanges] = keyConfigBackupRanges;
task->params[BackupAgentBase::keyTagName] = tagName;
task->params[DatabaseBackupAgent::keyDatabasesInSync] =
databasesInSync ? LiteralStringRef("t") : LiteralStringRef("f");
backupAction == DatabaseBackupAgent::PreBackupAction::NONE ? LiteralStringRef("t") : LiteralStringRef("f");
if (!waitFor) {
return taskBucket->addTask(tr,
@ -2514,7 +2515,7 @@ public:
Key addPrefix,
Key removePrefix,
bool lockDB,
bool databasesInSync) {
DatabaseBackupAgent::PreBackupAction backupAction) {
state UID logUid = deterministicRandom()->randomUniqueID();
state Key logUidValue = BinaryWriter::toValue(logUid, Unversioned());
state UID logUidCurrent = wait(backupAgent->getLogUid(tr, tagName));
@ -2558,7 +2559,7 @@ public:
}
}
if (!databasesInSync) {
if (backupAction == DatabaseBackupAgent::PreBackupAction::VERIFY) {
// Make sure all of the ranges are empty before we backup into them.
state std::vector<Future<Standalone<RangeResultRef>>> backupIntoResults;
for (auto& backupRange : backupRanges) {
@ -2572,6 +2573,11 @@ public:
throw restore_destination_not_empty();
}
}
} else if (backupAction == DatabaseBackupAgent::PreBackupAction::CLEAR) {
// Clear out all ranges before we backup into them.
for (auto& backupRange : backupRanges) {
tr->clear(backupRange.removePrefix(removePrefix).withPrefix(addPrefix));
}
}
// Clear the backup ranges for the tag
@ -2610,7 +2616,7 @@ public:
tr->clear(KeyRangeRef(mapPrefix, mapEnd));
state Version readVersion = invalidVersion;
if (databasesInSync) {
if (backupAction == DatabaseBackupAgent::PreBackupAction::NONE) {
Transaction readTransaction(backupAgent->taskBucket->src);
readTransaction.setOption(FDBTransactionOptions::LOCK_AWARE);
Version _ = wait(readTransaction.getReadVersion());
@ -2629,7 +2635,7 @@ public:
tagName,
TaskCompletionKey::noSignal(),
Reference<TaskFuture>(),
databasesInSync));
backupAction));
if (lockDB)
wait(lockDatabase(tr, logUid));
@ -2772,8 +2778,14 @@ public:
TraceEvent("DBA_SwitchoverVersionUpgraded");
try {
wait(drAgent.submitBackup(
backupAgent->taskBucket->src, tagName, backupRanges, false, addPrefix, removePrefix, true, true));
wait(drAgent.submitBackup(backupAgent->taskBucket->src,
tagName,
backupRanges,
false,
addPrefix,
removePrefix,
true,
DatabaseBackupAgent::PreBackupAction::NONE));
} catch (Error& e) {
if (e.code() != error_code_backup_duplicate)
throw;
@ -3236,9 +3248,9 @@ Future<Void> DatabaseBackupAgent::submitBackup(Reference<ReadYourWritesTransacti
Key addPrefix,
Key removePrefix,
bool lockDatabase,
bool databasesInSync) {
PreBackupAction backupAction) {
return DatabaseBackupAgentImpl::submitBackup(
this, tr, tagName, backupRanges, stopWhenDone, addPrefix, removePrefix, lockDatabase, databasesInSync);
this, tr, tagName, backupRanges, stopWhenDone, addPrefix, removePrefix, lockDatabase, backupAction);
}
Future<Void> DatabaseBackupAgent::discontinueBackup(Reference<ReadYourWritesTransaction> tr, Key tagName) {

View File

@ -2802,9 +2802,9 @@ struct StartFullBackupTaskFunc : BackupTaskFuncBase {
state Reference<TaskFuture> backupFinished = futureBucket->future(tr);
// Initialize the initial snapshot and create tasks to continually write logs and snapshots
// The initial snapshot has a desired duration of 0, meaning go as fast as possible.
wait(config.initNewSnapshot(tr, 0));
// Initialize the initial snapshot and create tasks to continually write logs and snapshots.
state Optional<int64_t> initialSnapshotIntervalSeconds = wait(config.initialSnapshotIntervalSeconds().get(tr));
wait(config.initNewSnapshot(tr, initialSnapshotIntervalSeconds.orDefault(0)));
// Using priority 1 for both of these to at least start both tasks soon
// Do not add snapshot task if we only want the incremental backup
@ -4467,6 +4467,7 @@ public:
ACTOR static Future<Void> submitBackup(FileBackupAgent* backupAgent,
Reference<ReadYourWritesTransaction> tr,
Key outContainer,
int initialSnapshotIntervalSeconds,
int snapshotIntervalSeconds,
std::string tagName,
Standalone<VectorRef<KeyRangeRef>> backupRanges,
@ -4582,6 +4583,7 @@ public:
config.backupContainer().set(tr, bc);
config.stopWhenDone().set(tr, stopWhenDone);
config.backupRanges().set(tr, normalizedRanges);
config.initialSnapshotIntervalSeconds().set(tr, initialSnapshotIntervalSeconds);
config.snapshotIntervalSeconds().set(tr, snapshotIntervalSeconds);
config.partitionedLogEnabled().set(tr, partitionedLog);
config.incrementalBackupOnly().set(tr, incrementalBackupOnly);
@ -4615,8 +4617,9 @@ public:
restoreRanges.push_back(KeyRange(KeyRangeRef(restoreRange.range().begin, restoreRange.range().end)));
}
}
for (auto& restoreRange : restoreRanges)
ASSERT(restoreRange.contains(removePrefix) || removePrefix.size() == 0);
for (auto& restoreRange : restoreRanges) {
ASSERT(restoreRange.begin.startsWith(removePrefix) && restoreRange.end.startsWith(removePrefix));
}
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
@ -5261,6 +5264,11 @@ public:
bool incrementalBackupOnly,
Version beginVersion,
UID randomUid) {
// The restore command line tool won't allow ranges to be empty, but correctness workloads somehow might.
if (ranges.empty()) {
throw restore_error();
}
state Reference<IBackupContainer> bc = IBackupContainer::openContainer(url.toString());
state BackupDescription desc = wait(bc->describeBackup(true));
@ -5570,6 +5578,7 @@ Future<ERestoreState> FileBackupAgent::waitRestore(Database cx, Key tagName, boo
Future<Void> FileBackupAgent::submitBackup(Reference<ReadYourWritesTransaction> tr,
Key outContainer,
int initialSnapshotIntervalSeconds,
int snapshotIntervalSeconds,
std::string tagName,
Standalone<VectorRef<KeyRangeRef>> backupRanges,
@ -5579,6 +5588,7 @@ Future<Void> FileBackupAgent::submitBackup(Reference<ReadYourWritesTransaction>
return FileBackupAgentImpl::submitBackup(this,
tr,
outContainer,
initialSnapshotIntervalSeconds,
snapshotIntervalSeconds,
tagName,
backupRanges,

View File

@ -385,6 +385,7 @@ public:
waitfor.push_back(self->files[1].f->truncate(self->fileExtensionBytes));
self->files[1].size = self->fileExtensionBytes;
} else {
TEST(true); // Truncating DiskQueue file
const int64_t startingSize = self->files[1].size;
self->files[1].size -= std::min(maxShrink, self->files[1].size);
self->files[1].size = std::max(self->files[1].size, self->fileExtensionBytes);

View File

@ -83,7 +83,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( TLOG_SPILL_REFERENCE_MAX_BYTES_PER_BATCH, 16<<10 ); if ( randomize && BUGGIFY ) TLOG_SPILL_REFERENCE_MAX_BYTES_PER_BATCH = 500;
init( DISK_QUEUE_FILE_EXTENSION_BYTES, 10<<20 ); // BUGGIFYd per file within the DiskQueue
init( DISK_QUEUE_FILE_SHRINK_BYTES, 100<<20 ); // BUGGIFYd per file within the DiskQueue
init( DISK_QUEUE_MAX_TRUNCATE_BYTES, 2<<30 ); if ( randomize && BUGGIFY ) DISK_QUEUE_MAX_TRUNCATE_BYTES = 0;
init( DISK_QUEUE_MAX_TRUNCATE_BYTES, 2LL<<30 ); if ( randomize && BUGGIFY ) DISK_QUEUE_MAX_TRUNCATE_BYTES = 0;
init( TLOG_DEGRADED_DURATION, 5.0 );
init( MAX_CACHE_VERSIONS, 10e6 );
init( TLOG_IGNORE_POP_AUTO_ENABLE_DELAY, 300.0 );

View File

@ -84,7 +84,7 @@ public:
int64_t TLOG_SPILL_REFERENCE_MAX_BYTES_PER_BATCH;
int64_t DISK_QUEUE_FILE_EXTENSION_BYTES; // When we grow the disk queue, by how many bytes should it grow?
int64_t DISK_QUEUE_FILE_SHRINK_BYTES; // When we shrink the disk queue, by how many bytes should it shrink?
int DISK_QUEUE_MAX_TRUNCATE_BYTES; // A truncate larger than this will cause the file to be replaced instead.
int64_t DISK_QUEUE_MAX_TRUNCATE_BYTES; // A truncate larger than this will cause the file to be replaced instead.
double TLOG_DEGRADED_DURATION;
int64_t MAX_CACHE_VERSIONS;
double TXS_POPPED_MAX_DELAY;

View File

@ -1212,6 +1212,8 @@ ACTOR Future<Void> moveKeys(Database cx,
return Void();
}
// Called by the master server to write the very first transaction to the database
// establishing a set of shard servers and all invariants of the systemKeys.
void seedShardServers(Arena& arena, CommitTransactionRef& tr, vector<StorageServerInterface> servers) {
std::map<Optional<Value>, Tag> dcId_locality;
std::map<UID, Tag> server_tag;
@ -1232,23 +1234,27 @@ void seedShardServers(Arena& arena, CommitTransactionRef& tr, vector<StorageServ
tr.read_snapshot = 0;
tr.read_conflict_ranges.push_back_deep(arena, allKeys);
for (int s = 0; s < servers.size(); s++) {
tr.set(arena, serverTagKeyFor(servers[s].id()), serverTagValue(server_tag[servers[s].id()]));
tr.set(arena, serverListKeyFor(servers[s].id()), serverListValue(servers[s]));
for (auto& s : servers) {
tr.set(arena, serverTagKeyFor(s.id()), serverTagValue(server_tag[s.id()]));
tr.set(arena, serverListKeyFor(s.id()), serverListValue(s));
}
std::vector<Tag> serverTags;
std::vector<UID> serverSrcUID;
serverTags.reserve(servers.size());
for (int i = 0; i < servers.size(); i++)
serverTags.push_back(server_tag[servers[i].id()]);
for (auto& s : servers) {
serverTags.push_back(server_tag[s.id()]);
serverSrcUID.push_back(s.id());
}
auto ksValue = CLIENT_KNOBS->TAG_ENCODE_KEY_SERVERS ? keyServersValue(serverTags)
: keyServersValue(Standalone<RangeResultRef>(), serverSrcUID);
// We have to set this range in two blocks, because the master tracking of "keyServersLocations" depends on a change
// to a specific
// key (keyServersKeyServersKey)
krmSetPreviouslyEmptyRange(
tr, arena, keyServersPrefix, KeyRangeRef(KeyRef(), allKeys.end), keyServersValue(serverTags), Value());
krmSetPreviouslyEmptyRange(tr, arena, keyServersPrefix, KeyRangeRef(KeyRef(), allKeys.end), ksValue, Value());
for (int s = 0; s < servers.size(); s++)
krmSetPreviouslyEmptyRange(
tr, arena, serverKeysPrefixFor(servers[s].id()), allKeys, serverKeysTrue, serverKeysFalse);
for (auto& s : servers) {
krmSetPreviouslyEmptyRange(tr, arena, serverKeysPrefixFor(s.id()), allKeys, serverKeysTrue, serverKeysFalse);
}
}

View File

@ -93,6 +93,7 @@ struct AtomicRestoreWorkload : TestWorkload {
try {
wait(backupAgent.submitBackup(cx,
StringRef(backupContainer),
deterministicRandom()->randomInt(0, 60),
deterministicRandom()->randomInt(0, 100),
BackupAgentBase::getDefaultTagName(),
self->backupRanges,

View File

@ -222,6 +222,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
try {
wait(backupAgent->submitBackup(cx,
StringRef(backupContainer),
deterministicRandom()->randomInt(0, 60),
deterministicRandom()->randomInt(0, 100),
tag.toString(),
backupRanges,
@ -477,6 +478,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
// the configuration to disable backup workers before restore.
extraBackup = backupAgent.submitBackup(cx,
LiteralStringRef("file://simfdb/backups/"),
deterministicRandom()->randomInt(0, 60),
deterministicRandom()->randomInt(0, 100),
self->backupTag.toString(),
self->backupRanges,

View File

@ -33,8 +33,8 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
int backupRangesCount, backupRangeLengthMax;
bool differentialBackup, performRestore, agentRequest;
Standalone<VectorRef<KeyRangeRef>> backupRanges;
std::vector<std::string> prefixesMandatory;
Standalone<VectorRef<KeyRangeRef>> skipRestoreRanges;
std::vector<std::string> restorePrefixesToInclude;
std::vector<Standalone<KeyRangeRef>> skippedRestoreRanges;
Standalone<VectorRef<KeyRangeRef>> restoreRanges;
static int backupAgentRequests;
bool locked;
@ -68,7 +68,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
agentRequest = getOption(options, LiteralStringRef("simBackupAgents"), true);
allowPauses = getOption(options, LiteralStringRef("allowPauses"), true);
shareLogRange = getOption(options, LiteralStringRef("shareLogRange"), false);
prefixesMandatory = getOption(options, LiteralStringRef("prefixesMandatory"), std::vector<std::string>());
restorePrefixesToInclude = getOption(options, LiteralStringRef("restorePrefixesToInclude"), std::vector<std::string>());
shouldSkipRestoreRanges = deterministicRandom()->random01() < 0.3 ? true : false;
TraceEvent("BARW_ClientId").detail("Id", wcx.clientId);
@ -104,32 +104,45 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
}
}
if (performRestore && !prefixesMandatory.empty() && shouldSkipRestoreRanges) {
if (performRestore && !restorePrefixesToInclude.empty() && shouldSkipRestoreRanges) {
for (auto& range : backupRanges) {
bool intersection = false;
for (auto& prefix : prefixesMandatory) {
KeyRange mandatoryRange(KeyRangeRef(prefix, strinc(prefix)));
if (range.intersects(mandatoryRange))
for (auto& prefix : restorePrefixesToInclude) {
KeyRange prefixRange(KeyRangeRef(prefix, strinc(prefix)));
if (range.intersects(prefixRange)) {
intersection = true;
}
TraceEvent("BARW_PrefixSkipRangeDetails")
.detail("PrefixMandatory", printable(mandatoryRange))
.detail("BackUpRange", printable(range))
.detail("PrefixMandatory", printable(prefix))
.detail("BackupRange", printable(range))
.detail("Intersection", intersection);
}
if (!intersection && deterministicRandom()->random01() < 0.5)
skipRestoreRanges.push_back(skipRestoreRanges.arena(), range);
else
restoreRanges.push_back(restoreRanges.arena(), range);
// If the backup range intersects with restorePrefixesToInclude or a coin flip is true then use it as a restore
// range as well, otherwise skip it.
if (intersection || deterministicRandom()->coinflip()) {
restoreRanges.push_back_deep(restoreRanges.arena(), range);
} else {
skippedRestoreRanges.push_back(range);
}
}
} else {
restoreRanges = backupRanges;
}
// If no random backup ranges intersected with restorePrefixesToInclude or won the coin flip then restoreRanges will be
// empty, so move an item from skippedRestoreRanges to restoreRanges.
if (restoreRanges.empty()) {
ASSERT(!skippedRestoreRanges.empty());
restoreRanges.push_back_deep(restoreRanges.arena(), skippedRestoreRanges.back());
skippedRestoreRanges.pop_back();
}
for (auto& range : restoreRanges) {
TraceEvent("BARW_RestoreRange", randomID)
.detail("RangeBegin", printable(range.begin))
.detail("RangeEnd", printable(range.end));
}
for (auto& range : skipRestoreRanges) {
for (auto& range : skippedRestoreRanges) {
TraceEvent("BARW_SkipRange", randomID)
.detail("RangeBegin", printable(range.begin))
.detail("RangeEnd", printable(range.end));
@ -171,8 +184,8 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
loop {
try {
state int restoreIndex;
for (restoreIndex = 0; restoreIndex < self->skipRestoreRanges.size(); restoreIndex++) {
state KeyRangeRef range = self->skipRestoreRanges[restoreIndex];
for (restoreIndex = 0; restoreIndex < self->skippedRestoreRanges.size(); restoreIndex++) {
state KeyRangeRef range = self->skippedRestoreRanges[restoreIndex];
Standalone<StringRef> restoreTag(self->backupTag.toString() + "_" + std::to_string(restoreIndex));
Standalone<RangeResultRef> res = wait(tr.getRange(range, GetRangeLimits::ROW_LIMIT_UNLIMITED));
if (!res.empty()) {
@ -248,6 +261,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
try {
wait(backupAgent->submitBackup(cx,
StringRef(backupContainer),
deterministicRandom()->randomInt(0, 60),
deterministicRandom()->randomInt(0, 100),
tag.toString(),
backupRanges,
@ -497,6 +511,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
try {
extraBackup = backupAgent.submitBackup(cx,
LiteralStringRef("file://simfdb/backups/"),
deterministicRandom()->randomInt(0, 60),
deterministicRandom()->randomInt(0, 100),
self->backupTag.toString(),
self->backupRanges,

View File

@ -29,6 +29,7 @@ struct BackupToBlobWorkload : TestWorkload {
double backupAfter;
Key backupTag;
Standalone<StringRef> backupURL;
int initSnapshotInterval = 0;
int snapshotInterval = 100000;
static constexpr const char* DESCRIPTION = "BackupToBlob";
@ -59,8 +60,12 @@ struct BackupToBlobWorkload : TestWorkload {
backupRanges.push_back_deep(backupRanges.arena(), normalKeys);
wait(delay(self->backupAfter));
wait(backupAgent.submitBackup(
cx, self->backupURL, self->snapshotInterval, self->backupTag.toString(), backupRanges));
wait(backupAgent.submitBackup(cx,
self->backupURL,
self->initSnapshotInterval,
self->snapshotInterval,
self->backupTag.toString(),
backupRanges));
EBackupState backupStatus = wait(backupAgent.waitBackup(cx, self->backupTag.toString(), true));
TraceEvent("BackupToBlob_BackupStatus").detail("Status", BackupAgentBase::getStateText(backupStatus));
return Void();

View File

@ -24,7 +24,9 @@
#include "fdbserver/workloads/BulkSetup.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
// A workload which test the correctness of backup and restore process
// A workload which test the correctness of backup and restore process. The
// database must be idle after the restore completes, and this workload checks
// that the restore range does not change post restore.
struct BackupToDBCorrectnessWorkload : TestWorkload {
double backupAfter, abortAndRestartAfter, restoreAfter;
double backupStartAt, restoreStartAfterBackupFinished, stopDifferentialAfter;
@ -145,6 +147,30 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
void getMetrics(vector<PerfMetric>& m) override {}
// Reads a series of key ranges and returns each range.
ACTOR static Future<std::vector<Standalone<RangeResultRef>>> readRanges(Database cx,
Standalone<VectorRef<KeyRangeRef>> ranges,
StringRef removePrefix) {
loop {
state Transaction tr(cx);
try {
state std::vector<Future<Standalone<RangeResultRef>>> results;
for (auto& range : ranges) {
results.push_back(tr.getRange(range.removePrefix(removePrefix), 1000));
}
wait(waitForAll(results));
std::vector<Standalone<RangeResultRef>> ret;
for (auto result : results) {
ret.push_back(result.get());
}
return ret;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR static Future<Void> diffRanges(Standalone<VectorRef<KeyRangeRef>> ranges,
StringRef backupPrefix,
Database src,
@ -259,38 +285,33 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
wait(backupAgent->unlockBackup(cx, tag));
}
// The range clear and submitBackup is being done here in the SAME transaction (which does make SubmitBackup's
// range emptiness check pointless in this test) because separating them causes rare errors where the
// SubmitBackup commit result is indeterminite but the submission was in fact successful and the backup actually
// completes before the retry of SubmitBackup so this second call to submit fails because the destination range
// is no longer empty.
// In prior versions of submitBackup, we have seen a rare bug where
// submitBackup results in a commit_unknown_result, causing the backup
// to retry when in fact it had successfully completed. On the retry,
// the range being backed up into was checked to make sure it was
// empty, and this check was failing because the backup had succeeded
// the first time. The old solution for this was to clear the backup
// range in the same transaction as the backup, but now we have
// switched to passing a "pre-backup action" to either verify the range
// being backed up into is empty, or clearing it first.
TraceEvent("BARW_DoBackupClearAndSubmitBackup", randomID)
.detail("Tag", printable(tag))
.detail("StopWhenDone", stopDifferentialDelay ? "False" : "True");
try {
state Reference<ReadYourWritesTransaction> tr2(new ReadYourWritesTransaction(self->extraDB));
loop {
try {
for (auto r : self->backupRanges) {
if (!r.empty()) {
auto targetRange = r.withPrefix(self->backupPrefix);
printf("Clearing %s in destination\n", printable(targetRange).c_str());
tr2->addReadConflictRange(targetRange);
tr2->clear(targetRange);
}
}
wait(backupAgent->submitBackup(tr2,
tag,
backupRanges,
stopDifferentialDelay ? false : true,
self->backupPrefix,
StringRef(),
self->locked));
wait(tr2->commit());
break;
} catch (Error& e) {
wait(tr2->onError(e));
try {
wait(backupAgent->submitBackup(cx,
tag,
backupRanges,
stopDifferentialDelay ? false : true,
self->backupPrefix,
StringRef(),
self->locked,
DatabaseBackupAgent::PreBackupAction::CLEAR));
} catch (Error& e) {
TraceEvent("BARW_SubmitBackup1Exception", randomID).error(e);
if (e.code() != error_code_backup_unneeded && e.code() != error_code_backup_duplicate) {
throw;
}
}
} catch (Error& e) {
@ -600,7 +621,8 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
true,
self->extraPrefix,
StringRef(),
self->locked);
self->locked,
DatabaseBackupAgent::PreBackupAction::CLEAR);
} catch (Error& e) {
TraceEvent("BARW_SubmitBackup2Exception", randomID)
.error(e)
@ -620,27 +642,7 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
.detail("BackupTag", printable(self->restoreTag));
// wait(diffRanges(self->backupRanges, self->backupPrefix, cx, self->extraDB));
state Transaction tr3(cx);
loop {
try {
// Run on the first proxy to ensure data is cleared
// when submitting the backup request below.
tr3.setOption(FDBTransactionOptions::COMMIT_ON_FIRST_PROXY);
for (auto r : self->backupRanges) {
if (!r.empty()) {
tr3.addReadConflictRange(r);
tr3.clear(r);
}
}
wait(tr3.commit());
break;
} catch (Error& e) {
wait(tr3.onError(e));
}
}
Standalone<VectorRef<KeyRangeRef>> restoreRange;
state Standalone<VectorRef<KeyRangeRef>> restoreRange;
for (auto r : self->backupRanges) {
restoreRange.push_back_deep(
restoreRange.arena(),
@ -648,8 +650,14 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
}
try {
wait(restoreTool.submitBackup(
cx, self->restoreTag, restoreRange, true, StringRef(), self->backupPrefix, self->locked));
wait(restoreTool.submitBackup(cx,
self->restoreTag,
restoreRange,
true,
StringRef(),
self->backupPrefix,
self->locked,
DatabaseBackupAgent::PreBackupAction::CLEAR));
} catch (Error& e) {
TraceEvent("BARW_DoBackupSubmitBackupException", randomID)
.error(e)
@ -660,6 +668,22 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
wait(success(restoreTool.waitBackup(cx, self->restoreTag)));
wait(restoreTool.unlockBackup(cx, self->restoreTag));
// Make sure no more data is written to the restored range
// after the restore completes.
state std::vector<Standalone<RangeResultRef>> res1 = wait(readRanges(cx, restoreRange, self->backupPrefix));
wait(delay(5));
state std::vector<Standalone<RangeResultRef>> res2 = wait(readRanges(cx, restoreRange, self->backupPrefix));
ASSERT(res1.size() == res2.size());
for (int i = 0; i < res1.size(); ++i) {
auto range1 = res1.at(i);
auto range2 = res2.at(i);
ASSERT(range1.size() == range2.size());
for (int j = 0; j < range1.size(); ++j) {
ASSERT(range1[j].key == range2[j].key && range1[j].value == range2[j].value);
}
}
}
if (extraBackup.isValid()) {

View File

@ -151,7 +151,7 @@ struct IncrementalBackupWorkload : TestWorkload {
TraceEvent("IBackupSubmitAttempt");
try {
wait(self->backupAgent.submitBackup(
cx, self->backupDir, 1e8, self->tag.toString(), backupRanges, false, false, true));
cx, self->backupDir, 0, 1e8, self->tag.toString(), backupRanges, false, false, true));
} catch (Error& e) {
TraceEvent("IBackupSubmitError").error(e);
if (e.code() != error_code_backup_duplicate) {

View File

@ -33,6 +33,7 @@ struct SubmitBackupWorkload final : TestWorkload {
Standalone<StringRef> backupDir;
Standalone<StringRef> tag;
double delayFor;
int initSnapshotInterval;
int snapshotInterval;
bool stopWhenDone;
bool incremental;
@ -41,6 +42,7 @@ struct SubmitBackupWorkload final : TestWorkload {
backupDir = getOption(options, LiteralStringRef("backupDir"), LiteralStringRef("file://simfdb/backups/"));
tag = getOption(options, LiteralStringRef("tag"), LiteralStringRef("default"));
delayFor = getOption(options, LiteralStringRef("delayFor"), 10.0);
initSnapshotInterval = getOption(options, LiteralStringRef("initSnapshotInterval"), 0);
snapshotInterval = getOption(options, LiteralStringRef("snapshotInterval"), 1e8);
stopWhenDone = getOption(options, LiteralStringRef("stopWhenDone"), true);
incremental = getOption(options, LiteralStringRef("incremental"), false);
@ -55,6 +57,7 @@ struct SubmitBackupWorkload final : TestWorkload {
try {
wait(self->backupAgent.submitBackup(cx,
self->backupDir,
self->initSnapshotInterval,
self->snapshotInterval,
self->tag.toString(),
backupRanges,

View File

@ -50,7 +50,7 @@ simBackupAgents = 'BackupToFile'
restoreAfter = 60.0
performRestore = true
allowPauses = false
prefixesMandatory = 'a,A,m'
restorePrefixesToInclude = 'a,A,m'
[[test.workload]]
testName = 'BackupAndRestoreCorrectness'