Make sure only current epoch's backup workers update all workers
So that backup workers from old epochs don't mess with the list of all workers.
This commit is contained in:
parent
44c1996950
commit
33ea027f84
|
@ -90,7 +90,10 @@ struct BackupData {
|
|||
BackupConfig config(uid);
|
||||
container = config.backupContainer().get(data->cx);
|
||||
ranges = config.backupRanges().get(data->cx);
|
||||
updateWorker = _updateStartedWorkers(this, data, uid);
|
||||
if (self->backupEpoch == self->recruitedEpoch) {
|
||||
// Only current epoch's worker update the number of backup workers.
|
||||
updateWorker = _updateStartedWorkers(this, data, uid);
|
||||
}
|
||||
TraceEvent("BackupWorkerAddJob", data->myId).detail("BackupID", uid).detail("Version", v);
|
||||
}
|
||||
|
||||
|
@ -112,11 +115,12 @@ struct BackupData {
|
|||
// writes (epoch, tag.id) into the key. Worker 0 monitors the key and once
|
||||
// all workers have updated the key, this backup is considered as started
|
||||
// (i.e., the "submitBackup" call is successful). Worker 0 then sets
|
||||
// the "allWorkerStarted" flag.
|
||||
// the "allWorkerStarted" flag, which in turn unblocks
|
||||
// StartFullBackupTaskFunc::_execute.
|
||||
ACTOR static Future<Void> _updateStartedWorkers(PerBackupInfo* info, BackupData* self, UID uid) {
|
||||
state BackupConfig config(uid);
|
||||
state Future<Void> watchFuture;
|
||||
state bool updated = false; // worker 0 has updated
|
||||
state bool updated = false;
|
||||
state bool firstWorker = info->self->tag.id == 0;
|
||||
state bool allUpdated = false;
|
||||
state Optional<std::vector<std::pair<int64_t, int64_t>>> workers;
|
||||
|
@ -141,7 +145,7 @@ struct BackupData {
|
|||
if (firstWorker) {
|
||||
std::vector<std::pair<int64_t, int64_t>>& v = workers.get();
|
||||
v.erase(std::remove_if(v.begin(), v.end(),
|
||||
[epoch = self->recruitedEpoch](const std::pair<LogEpoch, int32_t>& p) {
|
||||
[epoch = self->recruitedEpoch](const std::pair<int64_t, int64_t>& p) {
|
||||
return p.first != epoch;
|
||||
}),
|
||||
v.end());
|
||||
|
@ -152,7 +156,10 @@ struct BackupData {
|
|||
// monitor all workers' updates
|
||||
watchFuture = tr->watch(config.startedBackupWorkers().key);
|
||||
}
|
||||
config.startedBackupWorkers().set(tr, workers.get());
|
||||
ASSERT(workers.present() && workers.get().size() > 0);
|
||||
if (!updated) {
|
||||
config.startedBackupWorkers().set(tr, workers.get());
|
||||
}
|
||||
wait(tr->commit());
|
||||
|
||||
updated = true; // Only set to true after commit.
|
||||
|
@ -161,6 +168,7 @@ struct BackupData {
|
|||
}
|
||||
wait(watchFuture);
|
||||
} else {
|
||||
ASSERT(workers.present() && workers.get().size() > 0);
|
||||
config.startedBackupWorkers().set(tr, workers.get());
|
||||
wait(tr->commit());
|
||||
break;
|
||||
|
@ -170,7 +178,7 @@ struct BackupData {
|
|||
allUpdated = false;
|
||||
}
|
||||
}
|
||||
TraceEvent("BackupWorkerSetReady", self->myId).detail("BackupID", uid.toString());
|
||||
TraceEvent("BackupWorkerSetReady", self->myId).detail("BackupID", uid).detail("TagId", self->tag.id);
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -444,14 +452,11 @@ ACTOR Future<Void> setBackupKeys(BackupData* self, std::map<UID, Version> savedL
|
|||
}
|
||||
}
|
||||
|
||||
// Monitor all backup worker in the recruited epoch has been started. If so,
|
||||
// set the "allWorkerStarted" key of the BackupConfig to true, which in turn
|
||||
// unblocks StartFullBackupTaskFunc::_execute. Note only worker with Tag (-2,0)
|
||||
// runs this actor so that the key is set by one process.
|
||||
// Additionally, this actor updates the saved version for each BackupConfig in
|
||||
// the system space so that the client can know if a backup is restorable --
|
||||
// Note only worker with Tag (-2,0) runs this actor so that the latest saved
|
||||
// version key is set by one process, which is stored in each BackupConfig in
|
||||
// the system space. The client can know if a backup is restorable by checking
|
||||
// log saved version > snapshot version.
|
||||
ACTOR Future<Void> monitorAllWorkerProgress(BackupData* self) {
|
||||
ACTOR Future<Void> monitorBackupProgress(BackupData* self) {
|
||||
state Future<Void> interval;
|
||||
|
||||
loop {
|
||||
|
@ -474,9 +479,6 @@ ACTOR Future<Void> monitorAllWorkerProgress(BackupData* self) {
|
|||
|
||||
// Check every version is larger than backup's startVersion
|
||||
for (auto& [uid, info] : self->backups) {
|
||||
TraceEvent("BackupWorkerSavedBackupVersion", self->myId)
|
||||
.detail("BackupID", uid.toString())
|
||||
.detail("Done", finishedPreviousEpochs);
|
||||
if (finishedPreviousEpochs) {
|
||||
// update update progress so far
|
||||
Version v = std::numeric_limits<Version>::max();
|
||||
|
@ -487,7 +489,6 @@ ACTOR Future<Void> monitorAllWorkerProgress(BackupData* self) {
|
|||
TraceEvent("BackupWorkerSavedBackupVersion", self->myId).detail("BackupID", uid).detail("Version", v);
|
||||
}
|
||||
}
|
||||
TraceEvent("BackupWorkerSavedBackupVersion", self->myId).detail("Size", savedLogVersions.size());
|
||||
Future<Void> setKeys = savedLogVersions.empty() ? Void() : setBackupKeys(self, savedLogVersions);
|
||||
|
||||
wait(interval && setKeys);
|
||||
|
@ -860,7 +861,7 @@ ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest
|
|||
addActor.send(checkRemoved(db, req.recruitedEpoch, &self));
|
||||
addActor.send(waitFailureServer(interf.waitFailure.getFuture()));
|
||||
if (req.recruitedEpoch == req.backupEpoch && req.routerTag.id == 0) {
|
||||
addActor.send(monitorAllWorkerProgress(&self));
|
||||
addActor.send(monitorBackupProgress(&self));
|
||||
}
|
||||
|
||||
// Check if backup key is present to avoid race between this check and
|
||||
|
|
Loading…
Reference in New Issue