Fix all review comments
This commit is contained in:
parent
d6559b144f
commit
1f2602d2b3
|
@ -58,6 +58,7 @@ public:
|
|||
static const Key keyEndKey;
|
||||
static const Key destUid;
|
||||
static const Key backupDone;
|
||||
static const Key backupStartVersion;
|
||||
|
||||
static const Key keyTagName;
|
||||
static const Key keyStates;
|
||||
|
@ -420,7 +421,7 @@ bool copyParameter(Reference<Task> source, Reference<Task> dest, Key key);
|
|||
Version getVersionFromString(std::string const& value);
|
||||
Standalone<VectorRef<KeyRangeRef>> getLogRanges(Version beginVersion, Version endVersion, Key destUidValue, int blockSize = CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE);
|
||||
Standalone<VectorRef<KeyRangeRef>> getApplyRanges(Version beginVersion, Version endVersion, Key backupUid);
|
||||
Future<Void> clearLogRanges(Reference<ReadYourWritesTransaction> tr, bool clearVersionHistory, Key logUidValue, Key destUidValue, Version beginVersion, Version endVersion);
|
||||
Future<Void> eraseLogData(Database cx, Key logUidValue, Key destUidValue, bool backupDone, Version beginVersion, Version endVersion, bool checkBackupUid = false, Version backupUid = 0);
|
||||
Key getApplyKey( Version version, Key backupUid );
|
||||
std::pair<uint64_t, uint32_t> decodeBKMutationLogKey(Key key);
|
||||
Standalone<VectorRef<MutationRef>> decodeBackupLogValue(StringRef value);
|
||||
|
@ -719,7 +720,7 @@ public:
|
|||
|
||||
void startMutationLogs(Reference<ReadYourWritesTransaction> tr, KeyRangeRef backupRange, Key destUidValue) {
|
||||
Key mutationLogsDestKey = destUidValue.withPrefix(backupLogKeys.begin);
|
||||
tr->set(logRangesEncodeKey(backupRange.begin, getUid()), logRangesEncodeValue(backupRange.end, mutationLogsDestKey));
|
||||
tr->set(logRangesEncodeKey(backupRange.begin, BinaryReader::fromStringRef<UID>(destUidValue, Unversioned())), logRangesEncodeValue(backupRange.end, mutationLogsDestKey));
|
||||
}
|
||||
|
||||
Future<Void> logError(Database cx, Error e, std::string details, void *taskInstance = nullptr) {
|
||||
|
|
|
@ -37,6 +37,7 @@ const Key BackupAgentBase::keyBeginKey = LiteralStringRef("beginKey");
|
|||
const Key BackupAgentBase::keyEndKey = LiteralStringRef("endKey");
|
||||
const Key BackupAgentBase::destUid = LiteralStringRef("destUid");
|
||||
const Key BackupAgentBase::backupDone = LiteralStringRef("backupDone");
|
||||
const Key BackupAgentBase::backupStartVersion = LiteralStringRef("backupStartVersion");
|
||||
|
||||
const Key BackupAgentBase::keyTagName = LiteralStringRef("tagname");
|
||||
const Key BackupAgentBase::keyStates = LiteralStringRef("state");
|
||||
|
@ -719,28 +720,41 @@ ACTOR Future<Void> _clearLogRanges(Reference<ReadYourWritesTransaction> tr, bool
|
|||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
Optional<Key> v = wait(tr->get(backupLatestVersionsKey));
|
||||
if (!v.present()) {
|
||||
state Standalone<RangeResultRef> backupVersions = wait(tr->getRange(KeyRangeRef(backupLatestVersionsPath, strinc(backupLatestVersionsPath)), CLIENT_KNOBS->TOO_MANY));
|
||||
|
||||
// Make sure version history key does exist and lower the beginVersion if needed
|
||||
bool foundSelf = false;
|
||||
for (auto backupVersion : backupVersions) {
|
||||
Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
|
||||
|
||||
if (currLogUidValue == logUidValue) {
|
||||
foundSelf = true;
|
||||
beginVersion = std::min(beginVersion, BinaryReader::fromStringRef<Version>(backupVersion.value, Unversioned()));
|
||||
}
|
||||
}
|
||||
|
||||
// Do not clear anything if version history key cannot be found
|
||||
if (!foundSelf) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
state Standalone<RangeResultRef> backupVersions = wait(tr->getRange(KeyRangeRef(backupLatestVersionsPath, strinc(backupLatestVersionsPath)), CLIENT_KNOBS->TOO_MANY));
|
||||
Version nextSmallestVersion = endVersion;
|
||||
// If clear version history is required, then we need to clear log ranges up to next smallest version which might be greater than endVersion
|
||||
// If size of backupVersions is greater than 1, we can definitely find a version less than INTMAX_MAX, otherwise we clear all log ranges without calling getLogRanges()
|
||||
Version nextSmallestVersion = clearVersionHistory ? INTMAX_MAX : endVersion;
|
||||
bool clearLogRangesRequired = true;
|
||||
|
||||
// More than one backup/DR with the same range
|
||||
if (backupVersions.size() > 1) {
|
||||
bool countSelf = false;
|
||||
|
||||
for (auto backupVersion : backupVersions) {
|
||||
Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
|
||||
Version currVersion = BinaryReader::fromStringRef<Version>(backupVersion.value, Unversioned());
|
||||
if (currVersion > beginVersion) {
|
||||
if (currVersion < nextSmallestVersion) {
|
||||
nextSmallestVersion = currVersion;
|
||||
}
|
||||
} else if (currVersion == beginVersion && !countSelf) {
|
||||
countSelf = true;
|
||||
|
||||
if (currLogUidValue == logUidValue) {
|
||||
continue;
|
||||
} else if (currVersion > beginVersion) {
|
||||
nextSmallestVersion = std::min(currVersion, nextSmallestVersion);
|
||||
} else {
|
||||
// If we can find a version less than or equal to beginVersion, clearing log ranges is not required
|
||||
clearLogRangesRequired = false;
|
||||
break;
|
||||
}
|
||||
|
@ -748,8 +762,14 @@ ACTOR Future<Void> _clearLogRanges(Reference<ReadYourWritesTransaction> tr, bool
|
|||
}
|
||||
|
||||
if (clearVersionHistory && backupVersions.size() == 1) {
|
||||
// Clear version history
|
||||
tr->clear(prefixRange(backupLatestVersionsPath));
|
||||
|
||||
// Clear everything under blog/[destUid]
|
||||
tr->clear(prefixRange(destUidValue.withPrefix(backupLogKeys.begin)));
|
||||
|
||||
// Disable committing mutations into blog
|
||||
tr->clear(prefixRange(destUidValue.withPrefix(logRangesRange.begin)));
|
||||
} else {
|
||||
if (clearVersionHistory) {
|
||||
// Clear current backup version history
|
||||
|
@ -771,6 +791,52 @@ ACTOR Future<Void> _clearLogRanges(Reference<ReadYourWritesTransaction> tr, bool
|
|||
return Void();
|
||||
}
|
||||
|
||||
// The difference between beginVersion and endVersion should not be too large
|
||||
Future<Void> clearLogRanges(Reference<ReadYourWritesTransaction> tr, bool clearVersionHistory, Key logUidValue, Key destUidValue, Version beginVersion, Version endVersion) {
|
||||
return _clearLogRanges(tr, clearVersionHistory, logUidValue, destUidValue, beginVersion, endVersion);
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> _eraseLogData(Database cx, Key logUidValue, Key destUidValue, bool backupDone, Version beginVersion, Version endVersion, bool checkBackupUid, Version backupUid) {
|
||||
if (endVersion <= beginVersion)
|
||||
return Void();
|
||||
|
||||
state Version currBeginVersion = beginVersion;
|
||||
state Version currEndVersion;
|
||||
state bool clearVersionHistory = false;
|
||||
|
||||
while (currBeginVersion < endVersion) {
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
|
||||
loop{
|
||||
try {
|
||||
currEndVersion = std::min(currBeginVersion + CLIENT_KNOBS->CLEAR_LOG_RANGE_COUNT * CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE, endVersion);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
||||
if (checkBackupUid) {
|
||||
Subspace sourceStates = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceStates).get(logUidValue);
|
||||
Optional<Value> v = wait( tr->get( sourceStates.pack(DatabaseBackupAgent::keyFolderId) ) );
|
||||
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) > backupUid)
|
||||
return Void();
|
||||
}
|
||||
|
||||
if (backupDone && currEndVersion == endVersion) {
|
||||
clearVersionHistory = true;
|
||||
}
|
||||
|
||||
Void _ = wait(clearLogRanges(tr, clearVersionHistory, logUidValue, destUidValue, currBeginVersion, currEndVersion));
|
||||
Void _ = wait(tr->commit());
|
||||
currBeginVersion = currEndVersion;
|
||||
break;
|
||||
} catch (Error &e) {
|
||||
Void _ = wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> eraseLogData(Database cx, Key logUidValue, Key destUidValue, bool backupDone, Version beginVersion, Version endVersion, bool checkBackupUid, Version backupUid) {
|
||||
return _eraseLogData(cx, logUidValue, destUidValue, backupDone, beginVersion, endVersion, checkBackupUid, backupUid);
|
||||
}
|
|
@ -470,40 +470,6 @@ namespace dbBackup {
|
|||
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _execute(cx, tb, fb, task); };
|
||||
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
|
||||
|
||||
ACTOR static Future<Void> eraseLogData(Database cx, Reference<Task> task, bool backupDone, Version beginVersion, Version endVersion) {
|
||||
if (endVersion <= beginVersion)
|
||||
return Void();
|
||||
|
||||
state Version currBeginVersion = beginVersion;
|
||||
state Version currEndVersion;
|
||||
state bool clearVersionHistory = false;
|
||||
|
||||
while (currBeginVersion < endVersion) {
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
|
||||
loop{
|
||||
try {
|
||||
currEndVersion = std::min(currBeginVersion + CLIENT_KNOBS->CLEAR_LOG_RANGE_COUNT * CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE, endVersion);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
||||
if (backupDone && currEndVersion == endVersion) {
|
||||
clearVersionHistory = true;
|
||||
}
|
||||
|
||||
Void _ = wait(clearLogRanges(tr, clearVersionHistory, task->params[BackupAgentBase::keyConfigLogUid], task->params[BackupAgentBase::destUid], currBeginVersion, currEndVersion));
|
||||
Void _ = wait(tr->commit());
|
||||
currBeginVersion = currEndVersion;
|
||||
break;
|
||||
} catch (Error &e) {
|
||||
Void _ = wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
|
||||
state FlowLock lock(CLIENT_KNOBS->BACKUP_LOCK_BYTES);
|
||||
|
||||
|
@ -513,7 +479,7 @@ namespace dbBackup {
|
|||
Version endVersion = BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyEndVersion], Unversioned());
|
||||
bool backupDone = BinaryReader::fromStringRef<bool>(task->params[DatabaseBackupAgent::backupDone], Unversioned());
|
||||
|
||||
Void _ = wait(eraseLogData(taskBucket->src, task, backupDone, beginVersion, endVersion));
|
||||
Void _ = wait(eraseLogData(taskBucket->src, task->params[BackupAgentBase::keyConfigLogUid], task->params[BackupAgentBase::destUid], backupDone, beginVersion, endVersion, false, BinaryReader::fromStringRef<Version>(task->params[BackupAgentBase::keyFolderId], Unversioned())));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -875,9 +841,6 @@ namespace dbBackup {
|
|||
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) > BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyFolderId], Unversioned()))
|
||||
return Void();
|
||||
|
||||
Key configPath = logUidValue.withPrefix(logRangesRange.begin);
|
||||
tr->clear(KeyRangeRef(configPath, strinc(configPath)));
|
||||
|
||||
state Key latestVersionKey = logUidValue.withPrefix(task->params[BackupAgentBase::destUid].withPrefix(backupLatestVersionsPrefix));
|
||||
state Optional<Key> bVersion = wait(tr->get(latestVersionKey));
|
||||
|
||||
|
@ -886,48 +849,15 @@ namespace dbBackup {
|
|||
}
|
||||
beginVersion = BinaryReader::fromStringRef<Version>(bVersion.get(), Unversioned());
|
||||
|
||||
Void _ = wait(tr->commit());
|
||||
endVersion = tr->getCommittedVersion();
|
||||
endVersion = tr->getReadVersion().get();
|
||||
break;
|
||||
} catch(Error &e) {
|
||||
Void _ = wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
state bool clearVersionHistory = false;
|
||||
state Version currBeginVersion = beginVersion;
|
||||
state Version currEndVersion;
|
||||
|
||||
while (currBeginVersion < endVersion) {
|
||||
state Reference<ReadYourWritesTransaction> clearSrcTr(new ReadYourWritesTransaction(taskBucket->src));
|
||||
|
||||
loop{
|
||||
try {
|
||||
clearSrcTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
clearSrcTr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
Optional<Value> v = wait( clearSrcTr->get( sourceStates.pack(DatabaseBackupAgent::keyFolderId) ) );
|
||||
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) > BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyFolderId], Unversioned()))
|
||||
return Void();
|
||||
|
||||
currEndVersion = std::min(currBeginVersion + CLIENT_KNOBS->CLEAR_LOG_RANGE_COUNT * CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE, endVersion);
|
||||
|
||||
if (currEndVersion == endVersion) {
|
||||
clearVersionHistory = true;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
Void _ = wait(clearLogRanges(clearSrcTr, clearVersionHistory, logUidValue, destUidValue, currBeginVersion, currEndVersion));
|
||||
Void _ = wait(clearSrcTr->commit());
|
||||
currBeginVersion = currEndVersion;
|
||||
break;
|
||||
} catch (Error &e) {
|
||||
Void _ = wait(clearSrcTr->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Version backupUid = BinaryReader::fromStringRef<Version>(task->params[BackupAgentBase::keyFolderId], Unversioned());
|
||||
Void _ = wait(eraseLogData(taskBucket->src, logUidValue, destUidValue, true, beginVersion, endVersion, true, backupUid));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -1072,6 +1002,15 @@ namespace dbBackup {
|
|||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr.addReadConflictRange(singleKeyRange(sourceStates.pack(DatabaseBackupAgent::keyStateStatus)));
|
||||
tr.set(sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_DIFFERENTIAL)));
|
||||
|
||||
Key versionKey = task->params[DatabaseBackupAgent::keyConfigLogUid].withPrefix(task->params[BackupAgentBase::destUid]).withPrefix(backupLatestVersionsPrefix);
|
||||
Optional<Key> prevBeginVersion = wait(tr.get(versionKey));
|
||||
if (!prevBeginVersion.present()) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
task->params[DatabaseBackupAgent::keyPrevBeginVersion] = prevBeginVersion.get();
|
||||
|
||||
Void _ = wait(tr.commit());
|
||||
return Void();
|
||||
}
|
||||
|
@ -1107,7 +1046,9 @@ namespace dbBackup {
|
|||
tr->set(states.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_DIFFERENTIAL)));
|
||||
|
||||
allPartsDone = futureBucket->future(tr);
|
||||
Key _ = wait(CopyDiffLogsTaskFunc::addTask(tr, taskBucket, task, 0, restoreVersion, TaskCompletionKey::joinWith(allPartsDone)));
|
||||
|
||||
Version prevBeginVersion = BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyPrevBeginVersion], Unversioned());
|
||||
Key _ = wait(CopyDiffLogsTaskFunc::addTask(tr, taskBucket, task, prevBeginVersion, restoreVersion, TaskCompletionKey::joinWith(allPartsDone)));
|
||||
|
||||
// After the Backup completes, clear the backup subspace and update the status
|
||||
Key _ = wait(FinishedFullBackupTaskFunc::addTask(tr, taskBucket, task, TaskCompletionKey::noSignal(), allPartsDone));
|
||||
|
@ -1153,8 +1094,9 @@ namespace dbBackup {
|
|||
state UID logUid = BinaryReader::fromStringRef<UID>(logUidValue, Unversioned());
|
||||
|
||||
state Standalone<VectorRef<KeyRangeRef>> backupRanges = BinaryReader::fromStringRef<Standalone<VectorRef<KeyRangeRef>>>(task->params[DatabaseBackupAgent::keyConfigBackupRanges], IncludeVersion());
|
||||
state Key beginVersionKey;
|
||||
|
||||
state Reference<ReadYourWritesTransaction> srcTr(new ReadYourWritesTransaction(taskBucket->src));
|
||||
state Reference<ReadYourWritesTransaction> srcTr(new ReadYourWritesTransaction(taskBucket->src));
|
||||
loop {
|
||||
try {
|
||||
srcTr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
@ -1172,6 +1114,9 @@ namespace dbBackup {
|
|||
}
|
||||
}
|
||||
|
||||
Version bVersion = wait(srcTr->getReadVersion());
|
||||
beginVersionKey = BinaryWriter::toValue(bVersion, Unversioned());
|
||||
|
||||
task->params[BackupAgentBase::destUid] = destUidValue;
|
||||
|
||||
Void _ = wait(srcTr->commit());
|
||||
|
@ -1189,12 +1134,24 @@ namespace dbBackup {
|
|||
state Future<Void> verified = taskBucket->keepRunning(tr, task);
|
||||
Void _ = wait(verified);
|
||||
|
||||
Subspace config = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keyConfig).get(logUidValue);
|
||||
tr->set(config.get(logUidValue).pack(BackupAgentBase::destUid), task->params[BackupAgentBase::destUid]);
|
||||
// Set destUid at destination side
|
||||
state Subspace config = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keyConfig).get(logUidValue);
|
||||
tr->set(config.pack(BackupAgentBase::destUid), task->params[BackupAgentBase::destUid]);
|
||||
|
||||
// Use existing beginVersion if we already have one
|
||||
Optional<Key> backupStartVersion = wait(tr->get(config.pack(BackupAgentBase::backupStartVersion)));
|
||||
if (backupStartVersion.present()) {
|
||||
beginVersionKey = backupStartVersion.get();
|
||||
} else {
|
||||
tr->set(config.pack(BackupAgentBase::backupStartVersion), beginVersionKey);
|
||||
}
|
||||
|
||||
task->params[BackupAgentBase::keyBeginVersion] = beginVersionKey;
|
||||
|
||||
Void _ = wait(tr->commit());
|
||||
break;
|
||||
} catch (Error &e) {
|
||||
Void _ = wait(srcTr->onError(e));
|
||||
Void _ = wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1206,14 +1163,11 @@ namespace dbBackup {
|
|||
|
||||
state Optional<Value> v = wait( srcTr2->get( sourceStates.pack(DatabaseBackupAgent::keyFolderId) ) );
|
||||
|
||||
state Standalone<StringRef> beginVersion = BinaryWriter::toValue(srcTr2->getReadVersion().get(), Unversioned());
|
||||
task->params[BackupAgentBase::keyBeginVersion] = beginVersion;
|
||||
|
||||
if(v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) >= BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyFolderId], Unversioned()))
|
||||
return Void();
|
||||
|
||||
Key versionKey = logUidValue.withPrefix(destUidValue).withPrefix(backupLatestVersionsPrefix);
|
||||
srcTr2->set(versionKey, beginVersion);
|
||||
srcTr2->set(versionKey, beginVersionKey);
|
||||
|
||||
srcTr2->set( Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceTagName).pack(task->params[BackupAgentBase::keyTagName]), logUidValue );
|
||||
srcTr2->set( sourceStates.pack(DatabaseBackupAgent::keyFolderId), task->params[DatabaseBackupAgent::keyFolderId] );
|
||||
|
@ -1222,7 +1176,7 @@ namespace dbBackup {
|
|||
state Key destPath = destUidValue.withPrefix(backupLogKeys.begin);
|
||||
// Start logging the mutations for the specified ranges of the tag
|
||||
for (auto &backupRange : backupRanges) {
|
||||
srcTr2->set(logRangesEncodeKey(backupRange.begin, logUid), logRangesEncodeValue(backupRange.end, destPath));
|
||||
srcTr2->set(logRangesEncodeKey(backupRange.begin, BinaryReader::fromStringRef<UID>(destUidValue, Unversioned())), logRangesEncodeValue(backupRange.end, destPath));
|
||||
}
|
||||
|
||||
Void _ = wait(srcTr2->commit());
|
||||
|
@ -1273,7 +1227,7 @@ namespace dbBackup {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Key logUid, Key backupUid, /*Key destUid,*/ Key keyAddPrefix, Key keyRemovePrefix, Key keyConfigBackupRanges, Key tagName, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>(), bool databasesInSync=false)
|
||||
ACTOR static Future<Key> addTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Key logUid, Key backupUid, Key keyAddPrefix, Key keyRemovePrefix, Key keyConfigBackupRanges, Key tagName, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor = Reference<TaskFuture>(), bool databasesInSync=false)
|
||||
{
|
||||
Key doneKey = wait(completionKey.get(tr, taskBucket));
|
||||
Reference<Task> task(new Task(StartFullBackupTaskFunc::name, StartFullBackupTaskFunc::version, doneKey));
|
||||
|
@ -1745,10 +1699,9 @@ public:
|
|||
|
||||
srcTr->set( backupAgent->sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(DatabaseBackupAgent::getStateText(BackupAgentBase::STATE_PARTIALLY_ABORTED) ));
|
||||
srcTr->set( backupAgent->sourceStates.get(logUidValue).pack(DatabaseBackupAgent::keyFolderId), backupUid );
|
||||
srcTr->clear(prefixRange(logUidValue.withPrefix(logRangesRange.begin)));
|
||||
|
||||
Void _ = wait(srcTr->commit());
|
||||
endVersion = srcTr->getCommittedVersion();
|
||||
endVersion = srcTr->getCommittedVersion() + 1;
|
||||
|
||||
break;
|
||||
}
|
||||
|
@ -1758,33 +1711,7 @@ public:
|
|||
}
|
||||
|
||||
if (clearSrcDb) {
|
||||
state bool clearVersionHistory = false;
|
||||
state Version currBeginVersion = beginVersion;
|
||||
state Version currEndVersion;
|
||||
|
||||
while (currBeginVersion < endVersion) {
|
||||
state Reference<ReadYourWritesTransaction> clearSrcTr(new ReadYourWritesTransaction(backupAgent->taskBucket->src));
|
||||
|
||||
loop{
|
||||
try {
|
||||
clearSrcTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
clearSrcTr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
currEndVersion = std::min(currBeginVersion + CLIENT_KNOBS->CLEAR_LOG_RANGE_COUNT * CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE, endVersion);
|
||||
|
||||
if (currEndVersion == endVersion) {
|
||||
clearSrcTr->set( backupAgent->sourceStates.pack(DatabaseBackupAgent::keyStateStatus), StringRef(BackupAgentBase::getStateText(BackupAgentBase::STATE_ABORTED) ));
|
||||
clearVersionHistory = true;
|
||||
}
|
||||
|
||||
Void _ = wait(clearLogRanges(clearSrcTr, clearVersionHistory, logUidValue, destUidValue, currBeginVersion, currEndVersion));
|
||||
Void _ = wait(clearSrcTr->commit());
|
||||
currBeginVersion = currEndVersion;
|
||||
break;
|
||||
} catch (Error &e) {
|
||||
Void _ = wait(clearSrcTr->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
Void _ = wait(eraseLogData(backupAgent->taskBucket->src, logUidValue, destUidValue, true, beginVersion, endVersion));
|
||||
}
|
||||
|
||||
tr = Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(cx));
|
||||
|
|
|
@ -1497,7 +1497,7 @@ namespace fileBackup {
|
|||
.detail("ScheduledVersion", scheduledVersion)
|
||||
.detail("BeginKey", range.begin.printable())
|
||||
.detail("EndKey", range.end.printable())
|
||||
.suppressFor(2, true);
|
||||
.suppressFor(2);
|
||||
}
|
||||
else {
|
||||
// This shouldn't happen because if the transaction was already done or if another execution
|
||||
|
@ -1792,39 +1792,6 @@ namespace fileBackup {
|
|||
}
|
||||
} Params;
|
||||
|
||||
ACTOR static Future<Void> eraseLogData(Database cx, Key logUidValue, Key destUidValue, bool backupDone, Version beginVersion, Version endVersion) {
|
||||
if (endVersion <= beginVersion)
|
||||
return Void();
|
||||
|
||||
state Version currBeginVersion = beginVersion;
|
||||
state Version currEndVersion;
|
||||
state bool clearVersionHistory = false;
|
||||
|
||||
while (currBeginVersion < endVersion) {
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
|
||||
|
||||
loop{
|
||||
try {
|
||||
currEndVersion = std::min(currBeginVersion + CLIENT_KNOBS->CLEAR_LOG_RANGE_COUNT * CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE, endVersion);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
||||
if (backupDone && currEndVersion == endVersion) {
|
||||
clearVersionHistory = true;
|
||||
}
|
||||
Void _ = wait(clearLogRanges(tr, clearVersionHistory, logUidValue, destUidValue, currBeginVersion, currEndVersion));
|
||||
Void _ = wait(tr->commit());
|
||||
currBeginVersion = currEndVersion;
|
||||
break;
|
||||
} catch (Error &e) {
|
||||
Void _ = wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
|
||||
state Reference<FlowLock> lock(new FlowLock(CLIENT_KNOBS->BACKUP_LOCK_BYTES));
|
||||
Void _ = wait(checkTaskVersion(cx, task, EraseLogRangeTaskFunc::name, EraseLogRangeTaskFunc::version));
|
||||
|
@ -2025,10 +1992,7 @@ namespace fileBackup {
|
|||
state BackupConfig backup(task);
|
||||
state UID uid = backup.getUid();
|
||||
|
||||
state Key configPath = uidPrefixKey(logRangesRange.begin, uid);
|
||||
|
||||
tr->setOption(FDBTransactionOptions::COMMIT_ON_FIRST_PROXY);
|
||||
tr->clear(KeyRangeRef(configPath, strinc(configPath)));
|
||||
state Key destUidValue = wait(backup.destUidValue().getOrThrow(tr));
|
||||
Key _ = wait(EraseLogRangeTaskFunc::addTask(tr, taskBucket, backup.getUid(), TaskCompletionKey::noSignal(), true, destUidValue));
|
||||
|
||||
|
@ -3579,10 +3543,7 @@ public:
|
|||
// Cancel all backup tasks through tag
|
||||
Void _ = wait(tag.cancel(tr));
|
||||
|
||||
Key configPath = uidPrefixKey(logRangesRange.begin, config.getUid());
|
||||
|
||||
tr->setOption(FDBTransactionOptions::COMMIT_ON_FIRST_PROXY);
|
||||
tr->clear(KeyRangeRef(configPath, strinc(configPath)));
|
||||
|
||||
state Key destUidValue = wait(config.destUidValue().getOrThrow(tr));
|
||||
state Version endVersion = wait(tr->getReadVersion());
|
||||
|
@ -3627,9 +3588,6 @@ public:
|
|||
// Cancel backup task through tag
|
||||
Void _ = wait(tag.cancel(tr));
|
||||
|
||||
Key configPath = uidPrefixKey(logRangesRange.begin, config.getUid());
|
||||
|
||||
tr->clear(KeyRangeRef(configPath, strinc(configPath)));
|
||||
Key _ = wait(fileBackup::EraseLogRangeTaskFunc::addTask(tr, backupAgent->taskBucket, config.getUid(), TaskCompletionKey::noSignal(), true, destUidValue));
|
||||
|
||||
config.stateEnum().set(tr, EBackupState::STATE_ABORTED);
|
||||
|
|
|
@ -61,13 +61,11 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
UID randomID = g_nondeterministic_random->randomUniqueID();
|
||||
|
||||
if (shareLogRange) {
|
||||
if (g_random->random01() < 0.5) {
|
||||
backupRanges.push_back_deep(backupRanges.arena(), normalKeys);
|
||||
} else if (g_random->random01() < 0.75) {
|
||||
backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef(normalKeys.begin, LiteralStringRef("\x7f")));
|
||||
} else {
|
||||
backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef(LiteralStringRef("\x7f"), normalKeys.end));
|
||||
}
|
||||
bool beforePrefix = sharedRandomNumber & 1;
|
||||
if (beforePrefix)
|
||||
backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef(normalKeys.begin, LiteralStringRef("\xfe\xff\xfe")));
|
||||
else
|
||||
backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef(strinc(LiteralStringRef("\x00\x00\x01")), normalKeys.end));
|
||||
} else if (backupRangesCount <= 0) {
|
||||
backupRanges.push_back_deep(backupRanges.arena(), normalKeys);
|
||||
} else {
|
||||
|
|
|
@ -58,11 +58,12 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
|
|||
agentRequest = getOption(options, LiteralStringRef("simDrAgents"), true);
|
||||
shareLogRange = getOption(options, LiteralStringRef("shareLogRange"), false);
|
||||
|
||||
beforePrefix = g_random->random01() < 0.5;
|
||||
// Use sharedRandomNumber if shareLogRange is true so that we can ensure backup and DR both backup the same range
|
||||
beforePrefix = shareLogRange ? (sharedRandomNumber & 1) : (g_random->random01() < 0.5);
|
||||
|
||||
if (beforePrefix) {
|
||||
extraPrefix = backupPrefix.withPrefix(LiteralStringRef("\xfe\xff\xfe"));
|
||||
backupPrefix = backupPrefix.withPrefix(LiteralStringRef("\xfe\xff\xff"));
|
||||
|
||||
}
|
||||
else {
|
||||
extraPrefix = backupPrefix.withPrefix(LiteralStringRef("\x00\x00\x01"));
|
||||
|
@ -76,13 +77,10 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
|
|||
UID randomID = g_nondeterministic_random->randomUniqueID();
|
||||
|
||||
if (shareLogRange) {
|
||||
if (g_random->random01() < 0.5) {
|
||||
backupRanges.push_back_deep(backupRanges.arena(), normalKeys);
|
||||
} else if (g_random->random01() < 0.75) {
|
||||
backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef(normalKeys.begin, LiteralStringRef("\x7f")));
|
||||
} else {
|
||||
backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef(LiteralStringRef("\x7f"), normalKeys.end));
|
||||
}
|
||||
if (beforePrefix)
|
||||
backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef(normalKeys.begin, LiteralStringRef("\xfe\xff\xfe")));
|
||||
else
|
||||
backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef(strinc(LiteralStringRef("\x00\x00\x01")), normalKeys.end));
|
||||
} else if(backupRangesCount <= 0) {
|
||||
if (beforePrefix)
|
||||
backupRanges.push_back_deep(backupRanges.arena(), KeyRangeRef(normalKeys.begin, std::min(backupPrefix, extraPrefix)));
|
||||
|
|
|
@ -7,19 +7,22 @@ testTitle=BackupAndRestore
|
|||
clearAfterTest=false
|
||||
|
||||
testName=BackupAndRestoreCorrectness
|
||||
backupTag=backup2
|
||||
backupAfter=20.0
|
||||
backupTag=backup1
|
||||
backupAfter=10.0
|
||||
restoreAfter=60.0
|
||||
clearAfterTest=false
|
||||
simBackupAgents=BackupToFileAndDB
|
||||
shareLogRange=true
|
||||
performRestore=false
|
||||
performRestore=true
|
||||
allowPauses=false
|
||||
|
||||
testName=BackupToDBCorrectness
|
||||
backupTag=backup3
|
||||
backupTag=backup2
|
||||
backupPrefix=b1
|
||||
backupAfter=15.0
|
||||
restoreAfter=60.0
|
||||
performRestore=false
|
||||
clearAfterTest=false
|
||||
simBackupAgents=BackupToFileAndDB
|
||||
shareLogRange=true
|
||||
shareLogRange=true
|
||||
extraDB=1
|
|
@ -0,0 +1,27 @@
|
|||
testTitle=BackupAndRestore
|
||||
testName=Cycle
|
||||
nodeCount=3000
|
||||
transactionsPerSecond=500.0
|
||||
testDuration=30.0
|
||||
expectedRate=0
|
||||
clearAfterTest=false
|
||||
|
||||
testName=BackupAndRestoreCorrectness
|
||||
backupTag=backup1
|
||||
backupAfter=10.0
|
||||
clearAfterTest=false
|
||||
simBackupAgents=BackupToFileAndDB
|
||||
shareLogRange=true
|
||||
performRestore=false
|
||||
allowPauses=false
|
||||
|
||||
testName=BackupToDBCorrectness
|
||||
backupTag=backup2
|
||||
backupPrefix=b2
|
||||
backupAfter=15.0
|
||||
restoreAfter=60.0
|
||||
performRestore=true
|
||||
clearAfterTest=false
|
||||
simBackupAgents=BackupToFileAndDB
|
||||
shareLogRange=true
|
||||
extraDB=1
|
Loading…
Reference in New Issue