From 76d90ac6d7bd8ae37bb9bd0ccf24897ffba2e173 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Fri, 17 Apr 2020 22:55:09 -0700 Subject: [PATCH] Limit the version range for old epochs When the Master recruits a backup worker for previous epochs, the Master may set the begin version to a very low number, because the backup progress for that epoch is not saved. This can cause problem for the log file, since these low versions have been popped. The fix here is to advance savedVersion to the minimum of backup's starting version if it is higher than the begin version set by the Master. This is safe because these versions are not popped. If they are popped, their progress should already be recorded and Master would use a higher version than the backup's starting version. --- fdbserver/BackupWorker.actor.cpp | 44 ++++++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index 481bc2e20e..2d2a6fe259 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -74,7 +74,8 @@ struct BackupData { const LogEpoch backupEpoch; // the epoch workers should pull mutations LogEpoch oldestBackupEpoch = 0; // oldest epoch that still has data on tLogs for backup to pull Version minKnownCommittedVersion; - Version savedVersion; + Version savedVersion; // Largest version saved to blob storage + Version popVersion; // Largest version popped. Can be larger than savedVersion in NOOP mode. AsyncVar> logSystem; Database cx; std::vector messages; @@ -225,7 +226,7 @@ struct BackupData { explicit BackupData(UID id, Reference> db, const InitializeBackupRequest& req) : myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion), endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch), - minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1), + minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1), popVersion(req.startVersion - 1), cc("BackupWorker", myId.toString()), pulledVersion(0), paused(false) { cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true); @@ -291,7 +292,7 @@ struct BackupData { } ASSERT_WE_THINK(backupEpoch == oldestBackupEpoch); const Tag popTag = logSystem.get()->getPseudoPopTag(tag, ProcessClass::BackupClass); - logSystem.get()->pop(savedVersion, popTag); + logSystem.get()->pop(popVersion, popTag); } void stop() { @@ -326,11 +327,13 @@ struct BackupData { } bool modified = false; + Version minVersion = std::numeric_limits::max(); for (const auto [uid, version] : uidVersions) { auto it = backups.find(uid); if (it == backups.end()) { modified = true; backups.emplace(uid, BackupData::PerBackupInfo(this, uid, version)); + minVersion = std::min(minVersion, version); } else { stopList.erase(uid); } @@ -342,6 +345,14 @@ struct BackupData { it->second.stop(); modified = true; } + if (backupEpoch < recruitedEpoch && savedVersion + 1 == startVersion) { + // Advance savedVersion to minimize version ranges in case backupEpoch's + // progress is not saved. Master may set a very low startVersion that + // is already popped. Advance the version is safe because these + // versions are not popped -- if they are popped, their progress should + // be already recorded and Master would use a higher version than minVersion. + savedVersion = std::max(minVersion, savedVersion); + } if (modified) changedTrigger.trigger(); } @@ -390,10 +401,10 @@ struct BackupData { Future getMinKnownCommittedVersion() { return _getMinKnownCommittedVersion(this); } }; -// Monitors "backupStartedKey". If "started" is true, wait until the key is set; +// Monitors "backupStartedKey". If "present" is true, wait until the key is set; // otherwise, wait until the key is cleared. If "watch" is false, do not perform // the wait for key set/clear events. Returns if key present. -ACTOR Future monitorBackupStartedKeyChanges(BackupData* self, bool started, bool watch) { +ACTOR Future monitorBackupStartedKeyChanges(BackupData* self, bool present, bool watch) { loop { state ReadYourWritesTransaction tr(self->cx); @@ -418,13 +429,13 @@ ACTOR Future monitorBackupStartedKeyChanges(BackupData* self, bool started } self->exitEarly = shouldExit; self->onBackupChanges(uidVersions); - if (started || !watch) return true; + if (present || !watch) return true; } else { TraceEvent("BackupWorkerEmptyStartKey", self->myId); self->onBackupChanges(uidVersions); self->exitEarly = shouldExit; - if (!started || !watch) { + if (!present || !watch) { return false; } } @@ -762,6 +773,7 @@ ACTOR Future uploadData(BackupData* self) { if (((numMsg > 0 || popVersion > lastPopVersion) && self->pulling) || self->pullFinished()) { TraceEvent("BackupWorkerSave", self->myId) .detail("Version", popVersion) + .detail("SavedVersion", self->savedVersion) .detail("MsgQ", self->messages.size()); // save an empty file for old epochs so that log file versions are continuous wait(saveMutationsToFile(self, popVersion, numMsg)); @@ -769,7 +781,14 @@ ACTOR Future uploadData(BackupData* self) { } // If transition into NOOP mode, should clear messages - if (!self->pulling) self->messages.clear(); + if (!self->pulling) { + self->messages.clear(); + // Update popVersion so that save progress below can + // indicate ranges not used for future epochs. + if (self->popVersion > self->savedVersion && self->backupEpoch == self->recruitedEpoch) { + popVersion = std::max(popVersion, self->popVersion); + } + } if (popVersion > self->savedVersion) { wait(saveProgress(self, popVersion)); @@ -778,6 +797,7 @@ ACTOR Future uploadData(BackupData* self) { .detail("Version", popVersion) .detail("MsgQ", self->messages.size()); self->savedVersion = std::max(popVersion, self->savedVersion); + self->popVersion = std::max(self->savedVersion, self->popVersion); self->pop(); } @@ -872,10 +892,13 @@ ACTOR Future monitorBackupKeyOrPullData(BackupData* self, bool keyPresent) when(wait(success(present))) { break; } when(wait(success(committedVersion) || delay(SERVER_KNOBS->BACKUP_NOOP_POP_DELAY, self->cx->taskID))) { if (committedVersion.isReady()) { - self->savedVersion = std::max(committedVersion.get(), self->savedVersion); + self->popVersion = + std::max(self->popVersion, std::max(committedVersion.get(), self->savedVersion)); self->minKnownCommittedVersion = std::max(committedVersion.get(), self->minKnownCommittedVersion); - TraceEvent("BackupWorkerNoopPop", self->myId).detail("SavedVersion", self->savedVersion); + TraceEvent("BackupWorkerNoopPop", self->myId) + .detail("SavedVersion", self->savedVersion) + .detail("PopVersion", self->popVersion); self->pop(); // Pop while the worker is in this NOOP state. committedVersion = Never(); } else { @@ -884,6 +907,7 @@ ACTOR Future monitorBackupKeyOrPullData(BackupData* self, bool keyPresent) } } } + ASSERT(!keyPresent == present.get()); keyPresent = !keyPresent; } }