diff --git a/fdbbackup/backup.actor.cpp b/fdbbackup/backup.actor.cpp index 617730a871..73d447774e 100644 --- a/fdbbackup/backup.actor.cpp +++ b/fdbbackup/backup.actor.cpp @@ -2051,8 +2051,8 @@ ACTOR Future discontinueBackup(Database db, std::string tagName, bool wait ACTOR Future changeBackupResumed(Database db, bool pause) { try { - state FileBackupAgent backupAgent; - wait(backupAgent.taskBucket->changePause(db, pause)); + FileBackupAgent backupAgent; + wait(backupAgent.changePause(db, pause)); printf("All backup agents have been %s.\n", pause ? "paused" : "resumed"); } catch (Error& e) { diff --git a/fdbclient/BackupAgent.actor.h b/fdbclient/BackupAgent.actor.h index 671f7e3a4d..e308aa43a4 100644 --- a/fdbclient/BackupAgent.actor.h +++ b/fdbclient/BackupAgent.actor.h @@ -362,6 +362,9 @@ public: Future checkActive(Database cx) { return taskBucket->checkActive(cx); } + // If "pause" is true, pause all backups; otherwise, resume all. + Future changePause(Database db, bool pause); + friend class FileBackupAgentImpl; static const int dataFooterSize; diff --git a/fdbclient/FileBackupAgent.actor.cpp b/fdbclient/FileBackupAgent.actor.cpp index e19cda2718..e16863791f 100644 --- a/fdbclient/FileBackupAgent.actor.cpp +++ b/fdbclient/FileBackupAgent.actor.cpp @@ -4049,6 +4049,28 @@ public: return Void(); } + ACTOR static Future changePause(FileBackupAgent* backupAgent, Database db, bool pause) { + state Reference tr(new ReadYourWritesTransaction(db)); + state Future change = backupAgent->taskBucket->changePause(db, pause); + + loop { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + + try { + tr->set(backupPausedKey, pause ? LiteralStringRef("1") : LiteralStringRef("0")); + wait(tr->commit()); + break; + } catch (Error& e) { + wait(tr->onError(e)); + } + } + wait(change); + TraceEvent("FileBackupAgentChangePaused").detail("Action", pause ? "Paused" : "Resumed"); + return Void(); + } + struct TimestampedVersion { Optional version; Optional epochs; @@ -4652,3 +4674,7 @@ void FileBackupAgent::setLastRestorable(Reference tr, Future FileBackupAgent::waitBackup(Database cx, std::string tagName, bool stopWhenDone, Reference *pContainer, UID *pUID) { return FileBackupAgentImpl::waitBackup(this, cx, tagName, stopWhenDone, pContainer, pUID); } + +Future FileBackupAgent::changePause(Database db, bool pause) { + return FileBackupAgentImpl::changePause(this, db, pause); +} diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 7168f3605f..f393dd83ca 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -501,6 +501,7 @@ const KeyRangeRef backupProgressKeys(LiteralStringRef("\xff\x02/backupProgress/" LiteralStringRef("\xff\x02/backupProgress0")); const KeyRef backupProgressPrefix = backupProgressKeys.begin; const KeyRef backupStartedKey = LiteralStringRef("\xff\x02/backupStarted"); +extern const KeyRef backupPausedKey = LiteralStringRef("\xff\x02/backupPaused"); const Key backupProgressKeyFor(UID workerID) { BinaryWriter wr(Unversioned()); diff --git a/fdbclient/SystemData.h b/fdbclient/SystemData.h index 6b9de0fbf1..3b55731ac0 100644 --- a/fdbclient/SystemData.h +++ b/fdbclient/SystemData.h @@ -179,7 +179,7 @@ const Value workerListValue( ProcessData const& ); Key decodeWorkerListKey( KeyRef const& ); ProcessData decodeWorkerListValue( ValueRef const& ); -// "\xff/backupProgress/[[workerID]]" := "[[WorkerBackupStatus]]" +// "\xff\x02/backupProgress/[[workerID]]" := "[[WorkerBackupStatus]]" extern const KeyRangeRef backupProgressKeys; extern const KeyRef backupProgressPrefix; const Key backupProgressKeyFor(UID workerID); @@ -187,11 +187,16 @@ const Value backupProgressValue(const WorkerBackupStatus& status); UID decodeBackupProgressKey(const KeyRef& key); WorkerBackupStatus decodeBackupProgressValue(const ValueRef& value); -// "\xff/backupStarted" := "[[vector]]" +// The key to signal backup workers a new backup job is submitted. +// "\xff\x02/backupStarted" := "[[vector]]" extern const KeyRef backupStartedKey; Value encodeBackupStartedValue(const std::vector>& ids); std::vector> decodeBackupStartedValue(const ValueRef& value); +// The key to signal backup workers that they should pause or resume. +// "\xff\x02/backupPaused" := "[[0|1]]" +extern const KeyRef backupPausedKey; + extern const KeyRef coordinatorsKey; extern const KeyRef logsKey; extern const KeyRef minRequiredCommitVersionKey; diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index 4fd8810b22..20f6e363f3 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -82,6 +82,7 @@ struct BackupData { bool pulling = false; bool stopped = false; bool exitEarly = false; // If the worker is on an old epoch and all backups starts a version >= the endVersion + AsyncVar paused; // Track if "backupPausedKey" is set. struct PerBackupInfo { PerBackupInfo() = default; @@ -223,7 +224,7 @@ struct BackupData { : myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion), endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch), minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1), - cc("BackupWorker", myId.toString()), pulledVersion(0) { + cc("BackupWorker", myId.toString()), pulledVersion(0), paused(false) { cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true); specialCounter(cc, "SavedVersion", [this]() { return this->savedVersion; }); @@ -797,6 +798,10 @@ ACTOR Future pullAsyncData(BackupData* self) { TraceEvent("BackupWorkerPull", self->myId); loop { + while (self->paused.get()) { + wait(self->paused.onChange()); + } + loop choose { when (wait(r ? r->getMore(TaskPriority::TLogCommit) : Never())) { break; @@ -856,7 +861,7 @@ ACTOR Future monitorBackupKeyOrPullData(BackupData* self, bool keyPresent) wait(self->pulledVersion.whenAtLeast(currentVersion)); pullFinished = Future(); // cancels pullAsyncData() self->pulling = false; - TraceEvent("BackupWorkerPaused", self->myId); + TraceEvent("BackupWorkerPaused", self->myId).detail("Reson", "NoBackup"); } else { // Backup key is not present, enter this NOOP POP mode. state Future committedVersion = self->getMinKnownCommittedVersion(); @@ -899,6 +904,33 @@ ACTOR Future checkRemoved(Reference> db, LogEpoch r } } +ACTOR static Future monitorWorkerPause(BackupData* self) { + state Reference tr(new ReadYourWritesTransaction(self->cx)); + state Future watch; + + loop { + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::LOCK_AWARE); + tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + + Optional value = wait(tr->get(backupPausedKey)); + bool paused = value.present() && value.get() == LiteralStringRef("1"); + if (self->paused.get() != paused) { + TraceEvent(paused ? "BackupWorkerPaused" : "BackupWorkerResumed", self->myId); + self->paused.set(paused); + } + + watch = tr->watch(backupPausedKey); + wait(tr->commit()); + wait(watch); + tr->reset(); + } catch (Error& e) { + wait(tr->onError(e)); + } + } +} + ACTOR Future backupWorker(BackupInterface interf, InitializeBackupRequest req, Reference> db) { state BackupData self(interf.id(), db, req); @@ -921,6 +953,7 @@ ACTOR Future backupWorker(BackupInterface interf, InitializeBackupRequest if (req.recruitedEpoch == req.backupEpoch && req.routerTag.id == 0) { addActor.send(monitorBackupProgress(&self)); } + addActor.send(monitorWorkerPause(&self)); // Check if backup key is present to avoid race between this check and // noop pop as well as upload data: pop or skip upload before knowing diff --git a/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp b/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp index 8d90980eb6..51f1b0ea3e 100644 --- a/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp +++ b/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp @@ -136,9 +136,9 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { ACTOR static Future changePaused(Database cx, FileBackupAgent* backupAgent) { loop { - wait(backupAgent->taskBucket->changePause(cx, true)); + wait(backupAgent->changePause(cx, true)); wait(delay(30 * deterministicRandom()->random01())); - wait(backupAgent->taskBucket->changePause(cx, false)); + wait(backupAgent->changePause(cx, false)); wait(delay(120 * deterministicRandom()->random01())); } } diff --git a/fdbserver/workloads/BackupCorrectness.actor.cpp b/fdbserver/workloads/BackupCorrectness.actor.cpp index e0bc309ab1..73c958b328 100644 --- a/fdbserver/workloads/BackupCorrectness.actor.cpp +++ b/fdbserver/workloads/BackupCorrectness.actor.cpp @@ -180,10 +180,10 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload { ACTOR static Future changePaused(Database cx, FileBackupAgent* backupAgent) { loop { - wait( backupAgent->taskBucket->changePause(cx, true) ); - wait( delay(30*deterministicRandom()->random01()) ); - wait( backupAgent->taskBucket->changePause(cx, false) ); - wait( delay(120*deterministicRandom()->random01()) ); + wait(backupAgent->changePause(cx, true)); + wait(delay(30 * deterministicRandom()->random01())); + wait(backupAgent->changePause(cx, false)); + wait(delay(120 * deterministicRandom()->random01())); } }