diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index 481bc2e20e..2d2a6fe259 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -74,7 +74,8 @@ struct BackupData { const LogEpoch backupEpoch; // the epoch workers should pull mutations LogEpoch oldestBackupEpoch = 0; // oldest epoch that still has data on tLogs for backup to pull Version minKnownCommittedVersion; - Version savedVersion; + Version savedVersion; // Largest version saved to blob storage + Version popVersion; // Largest version popped. Can be larger than savedVersion in NOOP mode. AsyncVar> logSystem; Database cx; std::vector messages; @@ -225,7 +226,7 @@ struct BackupData { explicit BackupData(UID id, Reference> db, const InitializeBackupRequest& req) : myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion), endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch), - minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1), + minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1), popVersion(req.startVersion - 1), cc("BackupWorker", myId.toString()), pulledVersion(0), paused(false) { cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true); @@ -291,7 +292,7 @@ struct BackupData { } ASSERT_WE_THINK(backupEpoch == oldestBackupEpoch); const Tag popTag = logSystem.get()->getPseudoPopTag(tag, ProcessClass::BackupClass); - logSystem.get()->pop(savedVersion, popTag); + logSystem.get()->pop(popVersion, popTag); } void stop() { @@ -326,11 +327,13 @@ struct BackupData { } bool modified = false; + Version minVersion = std::numeric_limits::max(); for (const auto [uid, version] : uidVersions) { auto it = backups.find(uid); if (it == backups.end()) { modified = true; backups.emplace(uid, BackupData::PerBackupInfo(this, uid, version)); + minVersion = std::min(minVersion, version); } else { stopList.erase(uid); } @@ -342,6 +345,14 @@ struct BackupData { it->second.stop(); modified = true; } + if (backupEpoch < recruitedEpoch && savedVersion + 1 == startVersion) { + // Advance savedVersion to minimize version ranges in case backupEpoch's + // progress is not saved. Master may set a very low startVersion that + // is already popped. Advance the version is safe because these + // versions are not popped -- if they are popped, their progress should + // be already recorded and Master would use a higher version than minVersion. + savedVersion = std::max(minVersion, savedVersion); + } if (modified) changedTrigger.trigger(); } @@ -390,10 +401,10 @@ struct BackupData { Future getMinKnownCommittedVersion() { return _getMinKnownCommittedVersion(this); } }; -// Monitors "backupStartedKey". If "started" is true, wait until the key is set; +// Monitors "backupStartedKey". If "present" is true, wait until the key is set; // otherwise, wait until the key is cleared. If "watch" is false, do not perform // the wait for key set/clear events. Returns if key present. -ACTOR Future monitorBackupStartedKeyChanges(BackupData* self, bool started, bool watch) { +ACTOR Future monitorBackupStartedKeyChanges(BackupData* self, bool present, bool watch) { loop { state ReadYourWritesTransaction tr(self->cx); @@ -418,13 +429,13 @@ ACTOR Future monitorBackupStartedKeyChanges(BackupData* self, bool started } self->exitEarly = shouldExit; self->onBackupChanges(uidVersions); - if (started || !watch) return true; + if (present || !watch) return true; } else { TraceEvent("BackupWorkerEmptyStartKey", self->myId); self->onBackupChanges(uidVersions); self->exitEarly = shouldExit; - if (!started || !watch) { + if (!present || !watch) { return false; } } @@ -762,6 +773,7 @@ ACTOR Future uploadData(BackupData* self) { if (((numMsg > 0 || popVersion > lastPopVersion) && self->pulling) || self->pullFinished()) { TraceEvent("BackupWorkerSave", self->myId) .detail("Version", popVersion) + .detail("SavedVersion", self->savedVersion) .detail("MsgQ", self->messages.size()); // save an empty file for old epochs so that log file versions are continuous wait(saveMutationsToFile(self, popVersion, numMsg)); @@ -769,7 +781,14 @@ ACTOR Future uploadData(BackupData* self) { } // If transition into NOOP mode, should clear messages - if (!self->pulling) self->messages.clear(); + if (!self->pulling) { + self->messages.clear(); + // Update popVersion so that save progress below can + // indicate ranges not used for future epochs. + if (self->popVersion > self->savedVersion && self->backupEpoch == self->recruitedEpoch) { + popVersion = std::max(popVersion, self->popVersion); + } + } if (popVersion > self->savedVersion) { wait(saveProgress(self, popVersion)); @@ -778,6 +797,7 @@ ACTOR Future uploadData(BackupData* self) { .detail("Version", popVersion) .detail("MsgQ", self->messages.size()); self->savedVersion = std::max(popVersion, self->savedVersion); + self->popVersion = std::max(self->savedVersion, self->popVersion); self->pop(); } @@ -872,10 +892,13 @@ ACTOR Future monitorBackupKeyOrPullData(BackupData* self, bool keyPresent) when(wait(success(present))) { break; } when(wait(success(committedVersion) || delay(SERVER_KNOBS->BACKUP_NOOP_POP_DELAY, self->cx->taskID))) { if (committedVersion.isReady()) { - self->savedVersion = std::max(committedVersion.get(), self->savedVersion); + self->popVersion = + std::max(self->popVersion, std::max(committedVersion.get(), self->savedVersion)); self->minKnownCommittedVersion = std::max(committedVersion.get(), self->minKnownCommittedVersion); - TraceEvent("BackupWorkerNoopPop", self->myId).detail("SavedVersion", self->savedVersion); + TraceEvent("BackupWorkerNoopPop", self->myId) + .detail("SavedVersion", self->savedVersion) + .detail("PopVersion", self->popVersion); self->pop(); // Pop while the worker is in this NOOP state. committedVersion = Never(); } else { @@ -884,6 +907,7 @@ ACTOR Future monitorBackupKeyOrPullData(BackupData* self, bool keyPresent) } } } + ASSERT(!keyPresent == present.get()); keyPresent = !keyPresent; } }