Merge pull request #2901 from jzhou77/backup-cmd
Add pause/resume for new backups
This commit is contained in:
commit
867f734d8f
|
@ -2051,8 +2051,8 @@ ACTOR Future<Void> discontinueBackup(Database db, std::string tagName, bool wait
|
|||
|
||||
ACTOR Future<Void> changeBackupResumed(Database db, bool pause) {
|
||||
try {
|
||||
state FileBackupAgent backupAgent;
|
||||
wait(backupAgent.taskBucket->changePause(db, pause));
|
||||
FileBackupAgent backupAgent;
|
||||
wait(backupAgent.changePause(db, pause));
|
||||
printf("All backup agents have been %s.\n", pause ? "paused" : "resumed");
|
||||
}
|
||||
catch (Error& e) {
|
||||
|
|
|
@ -362,6 +362,9 @@ public:
|
|||
|
||||
Future<bool> checkActive(Database cx) { return taskBucket->checkActive(cx); }
|
||||
|
||||
// If "pause" is true, pause all backups; otherwise, resume all.
|
||||
Future<Void> changePause(Database db, bool pause);
|
||||
|
||||
friend class FileBackupAgentImpl;
|
||||
static const int dataFooterSize;
|
||||
|
||||
|
|
|
@ -4049,6 +4049,28 @@ public:
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> changePause(FileBackupAgent* backupAgent, Database db, bool pause) {
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(db));
|
||||
state Future<Void> change = backupAgent->taskBucket->changePause(db, pause);
|
||||
|
||||
loop {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
||||
try {
|
||||
tr->set(backupPausedKey, pause ? LiteralStringRef("1") : LiteralStringRef("0"));
|
||||
wait(tr->commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
wait(change);
|
||||
TraceEvent("FileBackupAgentChangePaused").detail("Action", pause ? "Paused" : "Resumed");
|
||||
return Void();
|
||||
}
|
||||
|
||||
struct TimestampedVersion {
|
||||
Optional<Version> version;
|
||||
Optional<int64_t> epochs;
|
||||
|
@ -4652,3 +4674,7 @@ void FileBackupAgent::setLastRestorable(Reference<ReadYourWritesTransaction> tr,
|
|||
Future<int> FileBackupAgent::waitBackup(Database cx, std::string tagName, bool stopWhenDone, Reference<IBackupContainer> *pContainer, UID *pUID) {
|
||||
return FileBackupAgentImpl::waitBackup(this, cx, tagName, stopWhenDone, pContainer, pUID);
|
||||
}
|
||||
|
||||
Future<Void> FileBackupAgent::changePause(Database db, bool pause) {
|
||||
return FileBackupAgentImpl::changePause(this, db, pause);
|
||||
}
|
||||
|
|
|
@ -501,6 +501,7 @@ const KeyRangeRef backupProgressKeys(LiteralStringRef("\xff\x02/backupProgress/"
|
|||
LiteralStringRef("\xff\x02/backupProgress0"));
|
||||
const KeyRef backupProgressPrefix = backupProgressKeys.begin;
|
||||
const KeyRef backupStartedKey = LiteralStringRef("\xff\x02/backupStarted");
|
||||
extern const KeyRef backupPausedKey = LiteralStringRef("\xff\x02/backupPaused");
|
||||
|
||||
const Key backupProgressKeyFor(UID workerID) {
|
||||
BinaryWriter wr(Unversioned());
|
||||
|
|
|
@ -179,7 +179,7 @@ const Value workerListValue( ProcessData const& );
|
|||
Key decodeWorkerListKey( KeyRef const& );
|
||||
ProcessData decodeWorkerListValue( ValueRef const& );
|
||||
|
||||
// "\xff/backupProgress/[[workerID]]" := "[[WorkerBackupStatus]]"
|
||||
// "\xff\x02/backupProgress/[[workerID]]" := "[[WorkerBackupStatus]]"
|
||||
extern const KeyRangeRef backupProgressKeys;
|
||||
extern const KeyRef backupProgressPrefix;
|
||||
const Key backupProgressKeyFor(UID workerID);
|
||||
|
@ -187,11 +187,16 @@ const Value backupProgressValue(const WorkerBackupStatus& status);
|
|||
UID decodeBackupProgressKey(const KeyRef& key);
|
||||
WorkerBackupStatus decodeBackupProgressValue(const ValueRef& value);
|
||||
|
||||
// "\xff/backupStarted" := "[[vector<UID,Version1>]]"
|
||||
// The key to signal backup workers a new backup job is submitted.
|
||||
// "\xff\x02/backupStarted" := "[[vector<UID,Version1>]]"
|
||||
extern const KeyRef backupStartedKey;
|
||||
Value encodeBackupStartedValue(const std::vector<std::pair<UID, Version>>& ids);
|
||||
std::vector<std::pair<UID, Version>> decodeBackupStartedValue(const ValueRef& value);
|
||||
|
||||
// The key to signal backup workers that they should pause or resume.
|
||||
// "\xff\x02/backupPaused" := "[[0|1]]"
|
||||
extern const KeyRef backupPausedKey;
|
||||
|
||||
extern const KeyRef coordinatorsKey;
|
||||
extern const KeyRef logsKey;
|
||||
extern const KeyRef minRequiredCommitVersionKey;
|
||||
|
|
|
@ -82,6 +82,7 @@ struct BackupData {
|
|||
bool pulling = false;
|
||||
bool stopped = false;
|
||||
bool exitEarly = false; // If the worker is on an old epoch and all backups starts a version >= the endVersion
|
||||
AsyncVar<bool> paused; // Track if "backupPausedKey" is set.
|
||||
|
||||
struct PerBackupInfo {
|
||||
PerBackupInfo() = default;
|
||||
|
@ -223,7 +224,7 @@ struct BackupData {
|
|||
: myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion),
|
||||
endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch),
|
||||
minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1),
|
||||
cc("BackupWorker", myId.toString()), pulledVersion(0) {
|
||||
cc("BackupWorker", myId.toString()), pulledVersion(0), paused(false) {
|
||||
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true);
|
||||
|
||||
specialCounter(cc, "SavedVersion", [this]() { return this->savedVersion; });
|
||||
|
@ -797,6 +798,10 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {
|
|||
|
||||
TraceEvent("BackupWorkerPull", self->myId);
|
||||
loop {
|
||||
while (self->paused.get()) {
|
||||
wait(self->paused.onChange());
|
||||
}
|
||||
|
||||
loop choose {
|
||||
when (wait(r ? r->getMore(TaskPriority::TLogCommit) : Never())) {
|
||||
break;
|
||||
|
@ -856,7 +861,7 @@ ACTOR Future<Void> monitorBackupKeyOrPullData(BackupData* self, bool keyPresent)
|
|||
wait(self->pulledVersion.whenAtLeast(currentVersion));
|
||||
pullFinished = Future<Void>(); // cancels pullAsyncData()
|
||||
self->pulling = false;
|
||||
TraceEvent("BackupWorkerPaused", self->myId);
|
||||
TraceEvent("BackupWorkerPaused", self->myId).detail("Reson", "NoBackup");
|
||||
} else {
|
||||
// Backup key is not present, enter this NOOP POP mode.
|
||||
state Future<Version> committedVersion = self->getMinKnownCommittedVersion();
|
||||
|
@ -899,6 +904,33 @@ ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db, LogEpoch r
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> monitorWorkerPause(BackupData* self) {
|
||||
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(self->cx));
|
||||
state Future<Void> watch;
|
||||
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
||||
Optional<Value> value = wait(tr->get(backupPausedKey));
|
||||
bool paused = value.present() && value.get() == LiteralStringRef("1");
|
||||
if (self->paused.get() != paused) {
|
||||
TraceEvent(paused ? "BackupWorkerPaused" : "BackupWorkerResumed", self->myId);
|
||||
self->paused.set(paused);
|
||||
}
|
||||
|
||||
watch = tr->watch(backupPausedKey);
|
||||
wait(tr->commit());
|
||||
wait(watch);
|
||||
tr->reset();
|
||||
} catch (Error& e) {
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest req,
|
||||
Reference<AsyncVar<ServerDBInfo>> db) {
|
||||
state BackupData self(interf.id(), db, req);
|
||||
|
@ -921,6 +953,7 @@ ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest
|
|||
if (req.recruitedEpoch == req.backupEpoch && req.routerTag.id == 0) {
|
||||
addActor.send(monitorBackupProgress(&self));
|
||||
}
|
||||
addActor.send(monitorWorkerPause(&self));
|
||||
|
||||
// Check if backup key is present to avoid race between this check and
|
||||
// noop pop as well as upload data: pop or skip upload before knowing
|
||||
|
|
|
@ -136,9 +136,9 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
|
|||
|
||||
ACTOR static Future<Void> changePaused(Database cx, FileBackupAgent* backupAgent) {
|
||||
loop {
|
||||
wait(backupAgent->taskBucket->changePause(cx, true));
|
||||
wait(backupAgent->changePause(cx, true));
|
||||
wait(delay(30 * deterministicRandom()->random01()));
|
||||
wait(backupAgent->taskBucket->changePause(cx, false));
|
||||
wait(backupAgent->changePause(cx, false));
|
||||
wait(delay(120 * deterministicRandom()->random01()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -180,10 +180,10 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
|
|||
|
||||
ACTOR static Future<Void> changePaused(Database cx, FileBackupAgent* backupAgent) {
|
||||
loop {
|
||||
wait( backupAgent->taskBucket->changePause(cx, true) );
|
||||
wait( delay(30*deterministicRandom()->random01()) );
|
||||
wait( backupAgent->taskBucket->changePause(cx, false) );
|
||||
wait( delay(120*deterministicRandom()->random01()) );
|
||||
wait(backupAgent->changePause(cx, true));
|
||||
wait(delay(30 * deterministicRandom()->random01()));
|
||||
wait(backupAgent->changePause(cx, false));
|
||||
wait(delay(120 * deterministicRandom()->random01()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue