diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index b32631f9b3..b4526d012c 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -77,8 +77,8 @@ struct BackupData { AsyncVar> logSystem; Database cx; std::vector messages; - AsyncVar pullFinished; NotifiedVersion pulledVersion; + bool pulling = false; struct PerBackupInfo { PerBackupInfo() = default; @@ -119,7 +119,6 @@ struct BackupData { minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion), cc("BackupWorker", myId.toString()), pulledVersion(0) { cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true); - pullFinished.set(false); specialCounter(cc, "SavedVersion", [this]() { return this->savedVersion; }); specialCounter(cc, "MinKnownCommittedVersion", [this]() { return this->minKnownCommittedVersion; }); @@ -128,6 +127,18 @@ struct BackupData { "BackupWorkerMetrics"); } + bool pullFinished() const { + return endVersion.present() && pulledVersion.get() > endVersion.get(); + } + + bool allMessageSaved() const { + return endVersion.present() && savedVersion >= endVersion.get(); + } + + Version maxPopVersion() const { + return endVersion.present() ? endVersion.get() : minKnownCommittedVersion; + } + // Inserts a backup's single range into rangeMap. template void insertRange(KeyRangeMap>& keyRangeMap, KeyRangeRef range, T value) { @@ -525,7 +536,7 @@ ACTOR Future uploadData(BackupData* self) { state Version popVersion = invalidVersion; loop { - if (self->endVersion.present() && self->savedVersion >= self->endVersion.get()) { + if (self->allMessageSaved()) { self->messages.clear(); return Void(); } @@ -534,27 +545,24 @@ ACTOR Future uploadData(BackupData* self) { // lag TLog might have. Changing to 20s may fail consistency check. state Future uploadDelay = delay(10); - const Version maxPopVersion = - self->endVersion.present() ? self->endVersion.get() : self->minKnownCommittedVersion; state int numMsg = 0; + Version lastPopVersion = popVersion; if (self->messages.empty()) { // Even though messages is empty, we still want to advance popVersion. - popVersion = std::max(popVersion, maxPopVersion); + if (!self->endVersion.present()) { + popVersion = std::max(popVersion, self->minKnownCommittedVersion); + } else if (self->pullFinished()) { + popVersion = self->endVersion.get(); + } } else { for (const auto& message : self->messages) { - if (message.getVersion() > maxPopVersion) break; + if (message.getVersion() > self->maxPopVersion()) break; popVersion = std::max(popVersion, message.getVersion()); numMsg++; } } - if (self->pullFinished.get() && self->messages.empty()) { - // Advance popVersion to the endVersion to avoid gap between last - // message version and the endVersion. - popVersion = self->endVersion.get(); - } - if (numMsg > 0 || self->endVersion.present()) { + if (numMsg > 0 || (popVersion > lastPopVersion && self->pulling)) { // save an empty file for old epochs so that log file versions are continuous - TraceEvent("BackupWorkerSave", self->myId).detail("PopVersion", popVersion).detail("MsgQ", self->messages.size()); wait(saveMutationsToFile(self, popVersion, numMsg)); self->messages.erase(self->messages.begin(), self->messages.begin() + numMsg); } @@ -569,8 +577,8 @@ ACTOR Future uploadData(BackupData* self) { self->pop(); } - if (!self->pullFinished.get()) { - wait(uploadDelay || self->pullFinished.onChange()); + if (!self->pullFinished()) { + wait(uploadDelay); } } } @@ -579,7 +587,7 @@ ACTOR Future uploadData(BackupData* self) { ACTOR Future pullAsyncData(BackupData* self) { state Future logSystemChange = Void(); state Reference r; - state Version tagAt = std::max(self->startVersion, self->savedVersion); + state Version tagAt = std::max(self->pulledVersion.get(), std::max(self->startVersion, self->savedVersion)); TraceEvent("BackupWorkerPull", self->myId); loop { @@ -606,18 +614,15 @@ ACTOR Future pullAsyncData(BackupData* self) { } tagAt = r->version().version; - if (tagAt > self->pulledVersion.get()) { - self->pulledVersion.set(tagAt); - } + self->pulledVersion.set(tagAt); TraceEvent("BackupWorkerGot", self->myId).suppressFor(1.0).detail("V", tagAt); - if (self->endVersion.present() && tagAt > self->endVersion.get()) { + if (self->pullFinished()) { self->eraseMessagesAfterEndVersion(); TraceEvent("BackupWorkerFinishPull", self->myId) .detail("Tag", self->tag.toString()) .detail("VersionGot", tagAt) .detail("EndVersion", self->endVersion.get()) .detail("MsgQ", self->messages.size()); - self->pullFinished.set(true); return Void(); } wait(yield()); @@ -654,8 +659,12 @@ ACTOR Future monitorBackupKeyOrPullData(BackupData* self) { Future stopped = monitorBackupStartedKeyChanges(self, false, true); pullFinished = pullAsyncData(self); + self->pulling = true; wait(success(stopped) || pullFinished); - if (pullFinished.isReady()) return Void(); // backup is done for some old epoch. + if (pullFinished.isReady()) { + self->pulling = false; + return Void(); // backup is done for some old epoch. + } // Even though the snapshot is done, mutation logs may not be written // out yet. We need to make usre mutations up to this point is written. @@ -675,6 +684,7 @@ ACTOR Future monitorBackupKeyOrPullData(BackupData* self) { } wait(self->pulledVersion.whenAtLeast(currentVersion)); pullFinished = Future(); // cancels pullAsyncData() + self->pulling = false; TraceEvent("BackupWorkerPaused", self->myId); } }