From be1d36bed3f7aa763cc2da7e0b9cfb0df8b97e9c Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Fri, 6 Mar 2020 11:58:10 -0800 Subject: [PATCH] Backup worker updates latest log versions in BackupConfig If backup worker is enabled, the current epoch's worker of tag (-2,0) will be responsible for monitoring the backup progress of all workers and update the BackupConfig with the latest saved log version, which is the minimum version of all tags. This change has been incorporated in the getLatestRestorableVersion() so that it is transparent to clients. --- fdbclient/BackupAgent.actor.h | 20 +++++++++-- fdbclient/FileBackupAgent.actor.cpp | 6 +++- fdbserver/BackupWorker.actor.cpp | 53 ++++++++++++++++++++++++----- 3 files changed, 66 insertions(+), 13 deletions(-) diff --git a/fdbclient/BackupAgent.actor.h b/fdbclient/BackupAgent.actor.h index 9ef90976d1..896cd32509 100644 --- a/fdbclient/BackupAgent.actor.h +++ b/fdbclient/BackupAgent.actor.h @@ -787,6 +787,16 @@ public: return configSpace.pack(LiteralStringRef(__FUNCTION__)); } + // Set to true if backup worker is enabled. + KeyBackedProperty backupWorkerEnabled() { + return configSpace.pack(LiteralStringRef(__FUNCTION__)); + } + + // Latest version for which all prior versions have saved by backup workers. + KeyBackedProperty latestBackupWorkerSavedVersion() { + return configSpace.pack(LiteralStringRef(__FUNCTION__)); + } + // Stop differntial logging if already started or don't start after completing KV ranges KeyBackedProperty stopWhenDone() { return configSpace.pack(LiteralStringRef(__FUNCTION__)); @@ -816,10 +826,14 @@ public: tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE); auto lastLog = latestLogEndVersion().get(tr); auto firstSnapshot = firstSnapshotEndVersion().get(tr); - return map(success(lastLog) && success(firstSnapshot), [=](Void) -> Optional { + auto enabled = backupWorkerEnabled().get(tr); + auto workerVersion = latestBackupWorkerSavedVersion().get(tr); + return map(success(lastLog) && success(firstSnapshot) && success(enabled) && success(workerVersion), [=](Void) -> Optional { // The latest log greater than the oldest snapshot is the restorable version - if(lastLog.get().present() && firstSnapshot.get().present() && lastLog.get().get() > firstSnapshot.get().get()) { - return std::max(lastLog.get().get() - 1, firstSnapshot.get().get()); + Optional logVersion = + enabled.get().present() && enabled.get().get() ? workerVersion.get() : lastLog.get(); + if (logVersion.present() && firstSnapshot.get().present() && logVersion.get() > firstSnapshot.get().get()) { + return std::max(logVersion.get() - 1, firstSnapshot.get().get()); } return {}; }); diff --git a/fdbclient/FileBackupAgent.actor.cpp b/fdbclient/FileBackupAgent.actor.cpp index 8c58cbc162..66f584e085 100644 --- a/fdbclient/FileBackupAgent.actor.cpp +++ b/fdbclient/FileBackupAgent.actor.cpp @@ -2388,7 +2388,8 @@ namespace fileBackup { // Check if backup worker is enabled DatabaseConfiguration dbConfig = wait(getDatabaseConfiguration(cx)); - if (!dbConfig.backupWorkerEnabled) { + state bool backupWorkerEnabled = dbConfig.backupWorkerEnabled; + if (!backupWorkerEnabled) { wait(success(changeConfig(cx, "backup_worker_enabled:=1", true))); } @@ -2420,6 +2421,9 @@ namespace fileBackup { } tr->set(backupStartedKey, encodeBackupStartedValue(ids)); + if (backupWorkerEnabled) { + config.backupWorkerEnabled().set(tr, true); + } // The task may be restarted. Set the watch if started key has NOT been set. if (!taskStarted.get().present()) { diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index cc482d7984..fb8226c5fb 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -305,7 +305,10 @@ ACTOR Future monitorBackupStartedKeyChanges(BackupData* self, bool started // set the "allWorkerStarted" key of the BackupConfig to true, which in turn // unblocks StartFullBackupTaskFunc::_execute. Note only worker with Tag (-2,0) // runs this actor so that the key is set by one process. -ACTOR Future monitorAllWorkerStarted(BackupData* self) { +// Additionally, this actor updates the saved version for each BackupConfig in +// the system space so that the client can know if a backup is restorable -- +// log saved version > snapshot version. +ACTOR Future monitorAllWorkerProgress(BackupData* self) { loop { wait(delay(SERVER_KNOBS->WORKER_LOGGING_INTERVAL / 2.0) || self->changedTrigger.onTrigger()); if (self->backups.empty()) { @@ -319,23 +322,32 @@ ACTOR Future monitorAllWorkerStarted(BackupData* self) { std::map tagVersions = progress->getEpochStatus(self->recruitedEpoch); state std::vector ready; + state std::map savedLogVersions; if (tagVersions.size() == self->logSystem.get()->getLogRouterTags()) { // Check every version is larger than backup's startVersion - for (auto& uidInfo : self->backups) { - if (uidInfo.second.allWorkerStarted) continue; + for (auto& [uid, info] : self->backups) { + if (info.allWorkerStarted) { + // update update progress so far + Version v = std::numeric_limits::max(); + for (const auto [tag, version] : tagVersions) { + v = std::min(v, version); + } + savedLogVersions.emplace(uid, v); + continue; + } bool saved = true; for (const std::pair tv : tagVersions) { - if (tv.second < uidInfo.second.startVersion) { + if (tv.second < info.startVersion) { saved = false; break; } } if (saved) { - ready.push_back(uidInfo.first); - uidInfo.second.allWorkerStarted = true; + ready.push_back(uid); + info.allWorkerStarted = true; } } - if (ready.empty()) continue; + if (ready.empty() && savedLogVersions.empty()) continue; // Set "allWorkerStarted" key for ready backups loop { @@ -350,13 +362,36 @@ ACTOR Future monitorAllWorkerStarted(BackupData* self) { configs.emplace_back(uid); readyValues.push_back(tr->get(configs.back().allWorkerStarted().key)); } - wait(waitForAll(readyValues)); + + state std::vector>> prevVersions; + state std::vector versionConfigs; + for (const auto [uid, version] : savedLogVersions) { + versionConfigs.emplace_back(uid); + prevVersions.push_back(versionConfigs.back().latestBackupWorkerSavedVersion().get(tr)); + } + + wait(waitForAll(readyValues) && waitForAll(prevVersions)); + for (int i = 0; i < readyValues.size(); i++) { if (!readyValues[i].get().present()) { configs[i].allWorkerStarted().set(tr, true); TraceEvent("BackupWorkerSetReady", self->myId).detail("BackupID", ready[i].toString()); } } + + for (int i = 0; i < prevVersions.size(); i++) { + const Version current = savedLogVersions[versionConfigs[i].getUid()]; + if (prevVersions[i].get().present()) { + const Version prev = prevVersions[i].get().get(); + ASSERT(prev <= current); + } + if (!prevVersions[i].get().present() || prevVersions[i].get().get() < current) { + TraceEvent("BackupWorkerSetVersion", self->myId) + .detail("BackupID", versionConfigs[i].getUid()) + .detail("Version", current); + versionConfigs[i].latestBackupWorkerSavedVersion().set(tr, current); + } + } wait(tr->commit()); break; } catch (Error& e) { @@ -735,7 +770,7 @@ ACTOR Future backupWorker(BackupInterface interf, InitializeBackupRequest addActor.send(checkRemoved(db, req.recruitedEpoch, &self)); addActor.send(waitFailureServer(interf.waitFailure.getFuture())); if (req.recruitedEpoch == req.backupEpoch && req.routerTag.id == 0) { - addActor.send(monitorAllWorkerStarted(&self)); + addActor.send(monitorAllWorkerProgress(&self)); } // Check if backup key is present to avoid race between this check and