Updates lastest backup worker progress after all previous epochs are done

If workers for previous epochs are still ongoing, we may end up with a
container that miss mutations in previous epochs. So the update only happens
after there are only current epoch's backup workers.
This commit is contained in:
Jingyu Zhou 2020-03-12 20:51:10 -07:00
parent 4c75c61f39
commit 9ea549ba7d
1 changed files with 81 additions and 74 deletions

View File

@ -330,95 +330,102 @@ ACTOR Future<bool> monitorBackupStartedKeyChanges(BackupData* self, bool started
// log saved version > snapshot version. // log saved version > snapshot version.
ACTOR Future<Void> monitorAllWorkerProgress(BackupData* self) { ACTOR Future<Void> monitorAllWorkerProgress(BackupData* self) {
loop { loop {
wait(delay(SERVER_KNOBS->WORKER_LOGGING_INTERVAL / 2.0) || self->changedTrigger.onTrigger()); while (self->backups.empty() || !self->logSystem.get()) {
if (self->backups.empty()) { wait(delay(SERVER_KNOBS->WORKER_LOGGING_INTERVAL / 2.0) || self->changedTrigger.onTrigger() ||
continue; self->logSystem.onChange());
} }
// check all workers have started by checking their progress is larger // check all workers have started by checking their progress is larger
// than the backup's start version. // than the backup's start version.
state Reference<BackupProgress> progress(new BackupProgress(self->myId, {})); state Reference<BackupProgress> progress(
new BackupProgress(self->myId, self->logSystem.get()->getOldEpochTagsVersionsInfo()));
wait(getBackupProgress(self->cx, self->myId, progress)); wait(getBackupProgress(self->cx, self->myId, progress));
std::map<Tag, Version> tagVersions = progress->getEpochStatus(self->recruitedEpoch); std::map<Tag, Version> tagVersions = progress->getEpochStatus(self->recruitedEpoch);
std::map<std::tuple<LogEpoch, Version, int>, std::map<Tag, Version>> toRecruit =
progress->getUnfinishedBackup();
bool finishedPreviousEpochs =
toRecruit.empty() || std::get<0>(toRecruit.begin()->first) == self->recruitedEpoch;
state std::vector<UID> ready; state std::vector<UID> ready;
state std::map<UID, Version> savedLogVersions; state std::map<UID, Version> savedLogVersions;
if (tagVersions.size() == self->logSystem.get()->getLogRouterTags()) { if (tagVersions.size() != self->logSystem.get()->getLogRouterTags()) {
// Check every version is larger than backup's startVersion continue;
for (auto& [uid, info] : self->backups) { }
if (info.allWorkerStarted) {
// update update progress so far // Check every version is larger than backup's startVersion
Version v = std::numeric_limits<Version>::max(); for (auto& [uid, info] : self->backups) {
for (const auto [tag, version] : tagVersions) { if (info.allWorkerStarted && finishedPreviousEpochs) {
v = std::min(v, version); // update update progress so far
} Version v = std::numeric_limits<Version>::max();
savedLogVersions.emplace(uid, v); for (const auto [tag, version] : tagVersions) {
continue; v = std::min(v, version);
} }
bool saved = true; savedLogVersions.emplace(uid, v);
for (const std::pair<Tag, Version> tv : tagVersions) { continue;
if (tv.second < info.startVersion) { }
saved = false; bool saved = true;
break; for (const std::pair<Tag, Version> tv : tagVersions) {
} if (tv.second < info.startVersion) {
} saved = false;
if (saved) { break;
ready.push_back(uid);
info.allWorkerStarted = true;
} }
} }
if (ready.empty() && savedLogVersions.empty()) continue; if (saved) {
ready.push_back(uid);
info.allWorkerStarted = true;
}
}
if (ready.empty() && savedLogVersions.empty()) continue;
// Set "allWorkerStarted" key for ready backups // Set "allWorkerStarted" and "latestBackupWorkerSavedVersion" key for backups
loop { loop {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(self->cx)); state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(self->cx));
try { try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE); tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state std::vector<Future<Optional<Value>>> readyValues; state std::vector<Future<Optional<Value>>> readyValues;
state std::vector<BackupConfig> configs; state std::vector<BackupConfig> configs;
for (UID uid : ready) { for (UID uid : ready) {
configs.emplace_back(uid); configs.emplace_back(uid);
readyValues.push_back(tr->get(configs.back().allWorkerStarted().key)); readyValues.push_back(tr->get(configs.back().allWorkerStarted().key));
}
state std::vector<Future<Optional<Version>>> prevVersions;
state std::vector<BackupConfig> versionConfigs;
for (const auto [uid, version] : savedLogVersions) {
versionConfigs.emplace_back(uid);
prevVersions.push_back(versionConfigs.back().latestBackupWorkerSavedVersion().get(tr));
}
wait(waitForAll(readyValues) && waitForAll(prevVersions));
for (int i = 0; i < readyValues.size(); i++) {
if (!readyValues[i].get().present()) {
configs[i].allWorkerStarted().set(tr, true);
TraceEvent("BackupWorkerSetReady", self->myId).detail("BackupID", ready[i].toString());
}
}
for (int i = 0; i < prevVersions.size(); i++) {
const Version current = savedLogVersions[versionConfigs[i].getUid()];
if (prevVersions[i].get().present()) {
const Version prev = prevVersions[i].get().get();
TraceEvent(SevWarn, "BackupWorkerVersionInverse", self->myId)
.detail("Prev", prev)
.detail("Current", current);
}
if (!prevVersions[i].get().present() || prevVersions[i].get().get() < current) {
TraceEvent("BackupWorkerSetVersion", self->myId)
.detail("BackupID", versionConfigs[i].getUid())
.detail("Version", current);
versionConfigs[i].latestBackupWorkerSavedVersion().set(tr, current);
}
}
wait(tr->commit());
break;
} catch (Error& e) {
wait(tr->onError(e));
} }
state std::vector<Future<Optional<Version>>> prevVersions;
state std::vector<BackupConfig> versionConfigs;
for (const auto [uid, version] : savedLogVersions) {
versionConfigs.emplace_back(uid);
prevVersions.push_back(versionConfigs.back().latestBackupWorkerSavedVersion().get(tr));
}
wait(waitForAll(readyValues) && waitForAll(prevVersions));
for (int i = 0; i < readyValues.size(); i++) {
if (!readyValues[i].get().present()) {
configs[i].allWorkerStarted().set(tr, true);
TraceEvent("BackupWorkerSetReady", self->myId).detail("BackupID", ready[i].toString());
}
}
for (int i = 0; i < prevVersions.size(); i++) {
const Version current = savedLogVersions[versionConfigs[i].getUid()];
if (prevVersions[i].get().present()) {
const Version prev = prevVersions[i].get().get();
TraceEvent(SevWarn, "BackupWorkerVersionInverse", self->myId)
.detail("Prev", prev)
.detail("Current", current);
}
if (!prevVersions[i].get().present() || prevVersions[i].get().get() < current) {
TraceEvent("BackupWorkerSetVersion", self->myId)
.detail("BackupID", versionConfigs[i].getUid())
.detail("Version", current);
versionConfigs[i].latestBackupWorkerSavedVersion().set(tr, current);
}
}
wait(tr->commit());
break;
} catch (Error& e) {
wait(tr->onError(e));
} }
} }
} }