Consider previously pulled version for pulling version

Saving files only happens if we are not pulling, i.e., not in NOOP mode.
This commit is contained in:
Jingyu Zhou 2020-03-02 13:29:42 -08:00
parent de9362748e
commit 96eab2f3ec
1 changed files with 33 additions and 23 deletions

View File

@ -77,8 +77,8 @@ struct BackupData {
AsyncVar<Reference<ILogSystem>> logSystem;
Database cx;
std::vector<VersionedMessage> messages;
AsyncVar<bool> pullFinished;
NotifiedVersion pulledVersion;
bool pulling = false;
struct PerBackupInfo {
PerBackupInfo() = default;
@ -119,7 +119,6 @@ struct BackupData {
minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion), cc("BackupWorker", myId.toString()),
pulledVersion(0) {
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true);
pullFinished.set(false);
specialCounter(cc, "SavedVersion", [this]() { return this->savedVersion; });
specialCounter(cc, "MinKnownCommittedVersion", [this]() { return this->minKnownCommittedVersion; });
@ -128,6 +127,18 @@ struct BackupData {
"BackupWorkerMetrics");
}
bool pullFinished() const {
return endVersion.present() && pulledVersion.get() > endVersion.get();
}
bool allMessageSaved() const {
return endVersion.present() && savedVersion >= endVersion.get();
}
Version maxPopVersion() const {
return endVersion.present() ? endVersion.get() : minKnownCommittedVersion;
}
// Inserts a backup's single range into rangeMap.
template <class T>
void insertRange(KeyRangeMap<std::set<T>>& keyRangeMap, KeyRangeRef range, T value) {
@ -525,7 +536,7 @@ ACTOR Future<Void> uploadData(BackupData* self) {
state Version popVersion = invalidVersion;
loop {
if (self->endVersion.present() && self->savedVersion >= self->endVersion.get()) {
if (self->allMessageSaved()) {
self->messages.clear();
return Void();
}
@ -534,27 +545,24 @@ ACTOR Future<Void> uploadData(BackupData* self) {
// lag TLog might have. Changing to 20s may fail consistency check.
state Future<Void> uploadDelay = delay(10);
const Version maxPopVersion =
self->endVersion.present() ? self->endVersion.get() : self->minKnownCommittedVersion;
state int numMsg = 0;
Version lastPopVersion = popVersion;
if (self->messages.empty()) {
// Even though messages is empty, we still want to advance popVersion.
popVersion = std::max(popVersion, maxPopVersion);
if (!self->endVersion.present()) {
popVersion = std::max(popVersion, self->minKnownCommittedVersion);
} else if (self->pullFinished()) {
popVersion = self->endVersion.get();
}
} else {
for (const auto& message : self->messages) {
if (message.getVersion() > maxPopVersion) break;
if (message.getVersion() > self->maxPopVersion()) break;
popVersion = std::max(popVersion, message.getVersion());
numMsg++;
}
}
if (self->pullFinished.get() && self->messages.empty()) {
// Advance popVersion to the endVersion to avoid gap between last
// message version and the endVersion.
popVersion = self->endVersion.get();
}
if (numMsg > 0 || self->endVersion.present()) {
if (numMsg > 0 || (popVersion > lastPopVersion && self->pulling)) {
// save an empty file for old epochs so that log file versions are continuous
TraceEvent("BackupWorkerSave", self->myId).detail("PopVersion", popVersion).detail("MsgQ", self->messages.size());
wait(saveMutationsToFile(self, popVersion, numMsg));
self->messages.erase(self->messages.begin(), self->messages.begin() + numMsg);
}
@ -569,8 +577,8 @@ ACTOR Future<Void> uploadData(BackupData* self) {
self->pop();
}
if (!self->pullFinished.get()) {
wait(uploadDelay || self->pullFinished.onChange());
if (!self->pullFinished()) {
wait(uploadDelay);
}
}
}
@ -579,7 +587,7 @@ ACTOR Future<Void> uploadData(BackupData* self) {
ACTOR Future<Void> pullAsyncData(BackupData* self) {
state Future<Void> logSystemChange = Void();
state Reference<ILogSystem::IPeekCursor> r;
state Version tagAt = std::max(self->startVersion, self->savedVersion);
state Version tagAt = std::max(self->pulledVersion.get(), std::max(self->startVersion, self->savedVersion));
TraceEvent("BackupWorkerPull", self->myId);
loop {
@ -606,18 +614,15 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {
}
tagAt = r->version().version;
if (tagAt > self->pulledVersion.get()) {
self->pulledVersion.set(tagAt);
}
self->pulledVersion.set(tagAt);
TraceEvent("BackupWorkerGot", self->myId).suppressFor(1.0).detail("V", tagAt);
if (self->endVersion.present() && tagAt > self->endVersion.get()) {
if (self->pullFinished()) {
self->eraseMessagesAfterEndVersion();
TraceEvent("BackupWorkerFinishPull", self->myId)
.detail("Tag", self->tag.toString())
.detail("VersionGot", tagAt)
.detail("EndVersion", self->endVersion.get())
.detail("MsgQ", self->messages.size());
self->pullFinished.set(true);
return Void();
}
wait(yield());
@ -654,8 +659,12 @@ ACTOR Future<Void> monitorBackupKeyOrPullData(BackupData* self) {
Future<bool> stopped = monitorBackupStartedKeyChanges(self, false, true);
pullFinished = pullAsyncData(self);
self->pulling = true;
wait(success(stopped) || pullFinished);
if (pullFinished.isReady()) return Void(); // backup is done for some old epoch.
if (pullFinished.isReady()) {
self->pulling = false;
return Void(); // backup is done for some old epoch.
}
// Even though the snapshot is done, mutation logs may not be written
// out yet. We need to make usre mutations up to this point is written.
@ -675,6 +684,7 @@ ACTOR Future<Void> monitorBackupKeyOrPullData(BackupData* self) {
}
wait(self->pulledVersion.whenAtLeast(currentVersion));
pullFinished = Future<Void>(); // cancels pullAsyncData()
self->pulling = false;
TraceEvent("BackupWorkerPaused", self->myId);
}
}