Backup worker updates latest log versions in BackupConfig

If backup worker is enabled, the current epoch's worker of tag (-2,0) will be
responsible for monitoring the backup progress of all workers and update the
BackupConfig with the latest saved log version, which is the minimum version
of all tags.

This change has been incorporated in the getLatestRestorableVersion() so that
it is transparent to clients.
This commit is contained in:
Jingyu Zhou 2020-03-06 11:58:10 -08:00
parent 15437ffb53
commit be1d36bed3
3 changed files with 66 additions and 13 deletions

View File

@ -787,6 +787,16 @@ public:
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
// Set to true if backup worker is enabled.
KeyBackedProperty<bool> backupWorkerEnabled() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
// Latest version for which all prior versions have saved by backup workers.
KeyBackedProperty<Version> latestBackupWorkerSavedVersion() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
}
// Stop differntial logging if already started or don't start after completing KV ranges
KeyBackedProperty<bool> stopWhenDone() {
return configSpace.pack(LiteralStringRef(__FUNCTION__));
@ -816,10 +826,14 @@ public:
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
auto lastLog = latestLogEndVersion().get(tr);
auto firstSnapshot = firstSnapshotEndVersion().get(tr);
return map(success(lastLog) && success(firstSnapshot), [=](Void) -> Optional<Version> {
auto enabled = backupWorkerEnabled().get(tr);
auto workerVersion = latestBackupWorkerSavedVersion().get(tr);
return map(success(lastLog) && success(firstSnapshot) && success(enabled) && success(workerVersion), [=](Void) -> Optional<Version> {
// The latest log greater than the oldest snapshot is the restorable version
if(lastLog.get().present() && firstSnapshot.get().present() && lastLog.get().get() > firstSnapshot.get().get()) {
return std::max(lastLog.get().get() - 1, firstSnapshot.get().get());
Optional<Version> logVersion =
enabled.get().present() && enabled.get().get() ? workerVersion.get() : lastLog.get();
if (logVersion.present() && firstSnapshot.get().present() && logVersion.get() > firstSnapshot.get().get()) {
return std::max(logVersion.get() - 1, firstSnapshot.get().get());
}
return {};
});

View File

@ -2388,7 +2388,8 @@ namespace fileBackup {
// Check if backup worker is enabled
DatabaseConfiguration dbConfig = wait(getDatabaseConfiguration(cx));
if (!dbConfig.backupWorkerEnabled) {
state bool backupWorkerEnabled = dbConfig.backupWorkerEnabled;
if (!backupWorkerEnabled) {
wait(success(changeConfig(cx, "backup_worker_enabled:=1", true)));
}
@ -2420,6 +2421,9 @@ namespace fileBackup {
}
tr->set(backupStartedKey, encodeBackupStartedValue(ids));
if (backupWorkerEnabled) {
config.backupWorkerEnabled().set(tr, true);
}
// The task may be restarted. Set the watch if started key has NOT been set.
if (!taskStarted.get().present()) {

View File

@ -305,7 +305,10 @@ ACTOR Future<bool> monitorBackupStartedKeyChanges(BackupData* self, bool started
// set the "allWorkerStarted" key of the BackupConfig to true, which in turn
// unblocks StartFullBackupTaskFunc::_execute. Note only worker with Tag (-2,0)
// runs this actor so that the key is set by one process.
ACTOR Future<Void> monitorAllWorkerStarted(BackupData* self) {
// Additionally, this actor updates the saved version for each BackupConfig in
// the system space so that the client can know if a backup is restorable --
// log saved version > snapshot version.
ACTOR Future<Void> monitorAllWorkerProgress(BackupData* self) {
loop {
wait(delay(SERVER_KNOBS->WORKER_LOGGING_INTERVAL / 2.0) || self->changedTrigger.onTrigger());
if (self->backups.empty()) {
@ -319,23 +322,32 @@ ACTOR Future<Void> monitorAllWorkerStarted(BackupData* self) {
std::map<Tag, Version> tagVersions = progress->getEpochStatus(self->recruitedEpoch);
state std::vector<UID> ready;
state std::map<UID, Version> savedLogVersions;
if (tagVersions.size() == self->logSystem.get()->getLogRouterTags()) {
// Check every version is larger than backup's startVersion
for (auto& uidInfo : self->backups) {
if (uidInfo.second.allWorkerStarted) continue;
for (auto& [uid, info] : self->backups) {
if (info.allWorkerStarted) {
// update update progress so far
Version v = std::numeric_limits<Version>::max();
for (const auto [tag, version] : tagVersions) {
v = std::min(v, version);
}
savedLogVersions.emplace(uid, v);
continue;
}
bool saved = true;
for (const std::pair<Tag, Version> tv : tagVersions) {
if (tv.second < uidInfo.second.startVersion) {
if (tv.second < info.startVersion) {
saved = false;
break;
}
}
if (saved) {
ready.push_back(uidInfo.first);
uidInfo.second.allWorkerStarted = true;
ready.push_back(uid);
info.allWorkerStarted = true;
}
}
if (ready.empty()) continue;
if (ready.empty() && savedLogVersions.empty()) continue;
// Set "allWorkerStarted" key for ready backups
loop {
@ -350,13 +362,36 @@ ACTOR Future<Void> monitorAllWorkerStarted(BackupData* self) {
configs.emplace_back(uid);
readyValues.push_back(tr->get(configs.back().allWorkerStarted().key));
}
wait(waitForAll(readyValues));
state std::vector<Future<Optional<Version>>> prevVersions;
state std::vector<BackupConfig> versionConfigs;
for (const auto [uid, version] : savedLogVersions) {
versionConfigs.emplace_back(uid);
prevVersions.push_back(versionConfigs.back().latestBackupWorkerSavedVersion().get(tr));
}
wait(waitForAll(readyValues) && waitForAll(prevVersions));
for (int i = 0; i < readyValues.size(); i++) {
if (!readyValues[i].get().present()) {
configs[i].allWorkerStarted().set(tr, true);
TraceEvent("BackupWorkerSetReady", self->myId).detail("BackupID", ready[i].toString());
}
}
for (int i = 0; i < prevVersions.size(); i++) {
const Version current = savedLogVersions[versionConfigs[i].getUid()];
if (prevVersions[i].get().present()) {
const Version prev = prevVersions[i].get().get();
ASSERT(prev <= current);
}
if (!prevVersions[i].get().present() || prevVersions[i].get().get() < current) {
TraceEvent("BackupWorkerSetVersion", self->myId)
.detail("BackupID", versionConfigs[i].getUid())
.detail("Version", current);
versionConfigs[i].latestBackupWorkerSavedVersion().set(tr, current);
}
}
wait(tr->commit());
break;
} catch (Error& e) {
@ -735,7 +770,7 @@ ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest
addActor.send(checkRemoved(db, req.recruitedEpoch, &self));
addActor.send(waitFailureServer(interf.waitFailure.getFuture()));
if (req.recruitedEpoch == req.backupEpoch && req.routerTag.id == 0) {
addActor.send(monitorAllWorkerStarted(&self));
addActor.send(monitorAllWorkerProgress(&self));
}
// Check if backup key is present to avoid race between this check and