Limit the version range for old epochs
When the Master recruits a backup worker for previous epochs, the Master may set the begin version to a very low number, because the backup progress for that epoch is not saved. This can cause problem for the log file, since these low versions have been popped. The fix here is to advance savedVersion to the minimum of backup's starting version if it is higher than the begin version set by the Master. This is safe because these versions are not popped. If they are popped, their progress should already be recorded and Master would use a higher version than the backup's starting version.
This commit is contained in:
parent
5528857934
commit
76d90ac6d7
|
@ -74,7 +74,8 @@ struct BackupData {
|
||||||
const LogEpoch backupEpoch; // the epoch workers should pull mutations
|
const LogEpoch backupEpoch; // the epoch workers should pull mutations
|
||||||
LogEpoch oldestBackupEpoch = 0; // oldest epoch that still has data on tLogs for backup to pull
|
LogEpoch oldestBackupEpoch = 0; // oldest epoch that still has data on tLogs for backup to pull
|
||||||
Version minKnownCommittedVersion;
|
Version minKnownCommittedVersion;
|
||||||
Version savedVersion;
|
Version savedVersion; // Largest version saved to blob storage
|
||||||
|
Version popVersion; // Largest version popped. Can be larger than savedVersion in NOOP mode.
|
||||||
AsyncVar<Reference<ILogSystem>> logSystem;
|
AsyncVar<Reference<ILogSystem>> logSystem;
|
||||||
Database cx;
|
Database cx;
|
||||||
std::vector<VersionedMessage> messages;
|
std::vector<VersionedMessage> messages;
|
||||||
|
@ -225,7 +226,7 @@ struct BackupData {
|
||||||
explicit BackupData(UID id, Reference<AsyncVar<ServerDBInfo>> db, const InitializeBackupRequest& req)
|
explicit BackupData(UID id, Reference<AsyncVar<ServerDBInfo>> db, const InitializeBackupRequest& req)
|
||||||
: myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion),
|
: myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion),
|
||||||
endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch),
|
endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch),
|
||||||
minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1),
|
minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1), popVersion(req.startVersion - 1),
|
||||||
cc("BackupWorker", myId.toString()), pulledVersion(0), paused(false) {
|
cc("BackupWorker", myId.toString()), pulledVersion(0), paused(false) {
|
||||||
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true);
|
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true);
|
||||||
|
|
||||||
|
@ -291,7 +292,7 @@ struct BackupData {
|
||||||
}
|
}
|
||||||
ASSERT_WE_THINK(backupEpoch == oldestBackupEpoch);
|
ASSERT_WE_THINK(backupEpoch == oldestBackupEpoch);
|
||||||
const Tag popTag = logSystem.get()->getPseudoPopTag(tag, ProcessClass::BackupClass);
|
const Tag popTag = logSystem.get()->getPseudoPopTag(tag, ProcessClass::BackupClass);
|
||||||
logSystem.get()->pop(savedVersion, popTag);
|
logSystem.get()->pop(popVersion, popTag);
|
||||||
}
|
}
|
||||||
|
|
||||||
void stop() {
|
void stop() {
|
||||||
|
@ -326,11 +327,13 @@ struct BackupData {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool modified = false;
|
bool modified = false;
|
||||||
|
Version minVersion = std::numeric_limits<Version>::max();
|
||||||
for (const auto [uid, version] : uidVersions) {
|
for (const auto [uid, version] : uidVersions) {
|
||||||
auto it = backups.find(uid);
|
auto it = backups.find(uid);
|
||||||
if (it == backups.end()) {
|
if (it == backups.end()) {
|
||||||
modified = true;
|
modified = true;
|
||||||
backups.emplace(uid, BackupData::PerBackupInfo(this, uid, version));
|
backups.emplace(uid, BackupData::PerBackupInfo(this, uid, version));
|
||||||
|
minVersion = std::min(minVersion, version);
|
||||||
} else {
|
} else {
|
||||||
stopList.erase(uid);
|
stopList.erase(uid);
|
||||||
}
|
}
|
||||||
|
@ -342,6 +345,14 @@ struct BackupData {
|
||||||
it->second.stop();
|
it->second.stop();
|
||||||
modified = true;
|
modified = true;
|
||||||
}
|
}
|
||||||
|
if (backupEpoch < recruitedEpoch && savedVersion + 1 == startVersion) {
|
||||||
|
// Advance savedVersion to minimize version ranges in case backupEpoch's
|
||||||
|
// progress is not saved. Master may set a very low startVersion that
|
||||||
|
// is already popped. Advance the version is safe because these
|
||||||
|
// versions are not popped -- if they are popped, their progress should
|
||||||
|
// be already recorded and Master would use a higher version than minVersion.
|
||||||
|
savedVersion = std::max(minVersion, savedVersion);
|
||||||
|
}
|
||||||
if (modified) changedTrigger.trigger();
|
if (modified) changedTrigger.trigger();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -390,10 +401,10 @@ struct BackupData {
|
||||||
Future<Version> getMinKnownCommittedVersion() { return _getMinKnownCommittedVersion(this); }
|
Future<Version> getMinKnownCommittedVersion() { return _getMinKnownCommittedVersion(this); }
|
||||||
};
|
};
|
||||||
|
|
||||||
// Monitors "backupStartedKey". If "started" is true, wait until the key is set;
|
// Monitors "backupStartedKey". If "present" is true, wait until the key is set;
|
||||||
// otherwise, wait until the key is cleared. If "watch" is false, do not perform
|
// otherwise, wait until the key is cleared. If "watch" is false, do not perform
|
||||||
// the wait for key set/clear events. Returns if key present.
|
// the wait for key set/clear events. Returns if key present.
|
||||||
ACTOR Future<bool> monitorBackupStartedKeyChanges(BackupData* self, bool started, bool watch) {
|
ACTOR Future<bool> monitorBackupStartedKeyChanges(BackupData* self, bool present, bool watch) {
|
||||||
loop {
|
loop {
|
||||||
state ReadYourWritesTransaction tr(self->cx);
|
state ReadYourWritesTransaction tr(self->cx);
|
||||||
|
|
||||||
|
@ -418,13 +429,13 @@ ACTOR Future<bool> monitorBackupStartedKeyChanges(BackupData* self, bool started
|
||||||
}
|
}
|
||||||
self->exitEarly = shouldExit;
|
self->exitEarly = shouldExit;
|
||||||
self->onBackupChanges(uidVersions);
|
self->onBackupChanges(uidVersions);
|
||||||
if (started || !watch) return true;
|
if (present || !watch) return true;
|
||||||
} else {
|
} else {
|
||||||
TraceEvent("BackupWorkerEmptyStartKey", self->myId);
|
TraceEvent("BackupWorkerEmptyStartKey", self->myId);
|
||||||
self->onBackupChanges(uidVersions);
|
self->onBackupChanges(uidVersions);
|
||||||
|
|
||||||
self->exitEarly = shouldExit;
|
self->exitEarly = shouldExit;
|
||||||
if (!started || !watch) {
|
if (!present || !watch) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -762,6 +773,7 @@ ACTOR Future<Void> uploadData(BackupData* self) {
|
||||||
if (((numMsg > 0 || popVersion > lastPopVersion) && self->pulling) || self->pullFinished()) {
|
if (((numMsg > 0 || popVersion > lastPopVersion) && self->pulling) || self->pullFinished()) {
|
||||||
TraceEvent("BackupWorkerSave", self->myId)
|
TraceEvent("BackupWorkerSave", self->myId)
|
||||||
.detail("Version", popVersion)
|
.detail("Version", popVersion)
|
||||||
|
.detail("SavedVersion", self->savedVersion)
|
||||||
.detail("MsgQ", self->messages.size());
|
.detail("MsgQ", self->messages.size());
|
||||||
// save an empty file for old epochs so that log file versions are continuous
|
// save an empty file for old epochs so that log file versions are continuous
|
||||||
wait(saveMutationsToFile(self, popVersion, numMsg));
|
wait(saveMutationsToFile(self, popVersion, numMsg));
|
||||||
|
@ -769,7 +781,14 @@ ACTOR Future<Void> uploadData(BackupData* self) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// If transition into NOOP mode, should clear messages
|
// If transition into NOOP mode, should clear messages
|
||||||
if (!self->pulling) self->messages.clear();
|
if (!self->pulling) {
|
||||||
|
self->messages.clear();
|
||||||
|
// Update popVersion so that save progress below can
|
||||||
|
// indicate ranges not used for future epochs.
|
||||||
|
if (self->popVersion > self->savedVersion && self->backupEpoch == self->recruitedEpoch) {
|
||||||
|
popVersion = std::max(popVersion, self->popVersion);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (popVersion > self->savedVersion) {
|
if (popVersion > self->savedVersion) {
|
||||||
wait(saveProgress(self, popVersion));
|
wait(saveProgress(self, popVersion));
|
||||||
|
@ -778,6 +797,7 @@ ACTOR Future<Void> uploadData(BackupData* self) {
|
||||||
.detail("Version", popVersion)
|
.detail("Version", popVersion)
|
||||||
.detail("MsgQ", self->messages.size());
|
.detail("MsgQ", self->messages.size());
|
||||||
self->savedVersion = std::max(popVersion, self->savedVersion);
|
self->savedVersion = std::max(popVersion, self->savedVersion);
|
||||||
|
self->popVersion = std::max(self->savedVersion, self->popVersion);
|
||||||
self->pop();
|
self->pop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -872,10 +892,13 @@ ACTOR Future<Void> monitorBackupKeyOrPullData(BackupData* self, bool keyPresent)
|
||||||
when(wait(success(present))) { break; }
|
when(wait(success(present))) { break; }
|
||||||
when(wait(success(committedVersion) || delay(SERVER_KNOBS->BACKUP_NOOP_POP_DELAY, self->cx->taskID))) {
|
when(wait(success(committedVersion) || delay(SERVER_KNOBS->BACKUP_NOOP_POP_DELAY, self->cx->taskID))) {
|
||||||
if (committedVersion.isReady()) {
|
if (committedVersion.isReady()) {
|
||||||
self->savedVersion = std::max(committedVersion.get(), self->savedVersion);
|
self->popVersion =
|
||||||
|
std::max(self->popVersion, std::max(committedVersion.get(), self->savedVersion));
|
||||||
self->minKnownCommittedVersion =
|
self->minKnownCommittedVersion =
|
||||||
std::max(committedVersion.get(), self->minKnownCommittedVersion);
|
std::max(committedVersion.get(), self->minKnownCommittedVersion);
|
||||||
TraceEvent("BackupWorkerNoopPop", self->myId).detail("SavedVersion", self->savedVersion);
|
TraceEvent("BackupWorkerNoopPop", self->myId)
|
||||||
|
.detail("SavedVersion", self->savedVersion)
|
||||||
|
.detail("PopVersion", self->popVersion);
|
||||||
self->pop(); // Pop while the worker is in this NOOP state.
|
self->pop(); // Pop while the worker is in this NOOP state.
|
||||||
committedVersion = Never();
|
committedVersion = Never();
|
||||||
} else {
|
} else {
|
||||||
|
@ -884,6 +907,7 @@ ACTOR Future<Void> monitorBackupKeyOrPullData(BackupData* self, bool keyPresent)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
ASSERT(!keyPresent == present.get());
|
||||||
keyPresent = !keyPresent;
|
keyPresent = !keyPresent;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue