Backup worker should aggressively advance versions
Separate popping logic into an actor with shorter interval than the upload interval. More critically, even if there is no mutations (e.g., in quiet database period), the popped version should still be advanced.
This commit is contained in:
parent
6c6a553dcc
commit
d3f14699c4
|
@ -36,7 +36,7 @@ struct BackupData {
|
||||||
const LogEpoch recruitedEpoch;
|
const LogEpoch recruitedEpoch;
|
||||||
const LogEpoch backupEpoch;
|
const LogEpoch backupEpoch;
|
||||||
Version minKnownCommittedVersion;
|
Version minKnownCommittedVersion;
|
||||||
Version poppedVersion;
|
Version savedVersion, lastSeenVersion;
|
||||||
AsyncVar<Reference<ILogSystem>> logSystem;
|
AsyncVar<Reference<ILogSystem>> logSystem;
|
||||||
Database cx;
|
Database cx;
|
||||||
std::vector<TagsAndMessage> messages;
|
std::vector<TagsAndMessage> messages;
|
||||||
|
@ -51,10 +51,11 @@ struct BackupData {
|
||||||
: myId(id), tag(req.routerTag), startVersion(req.startVersion),
|
: myId(id), tag(req.routerTag), startVersion(req.startVersion),
|
||||||
endVersion(req.endVersion.present() ? req.endVersion.get() : std::numeric_limits<Version>::max()),
|
endVersion(req.endVersion.present() ? req.endVersion.get() : std::numeric_limits<Version>::max()),
|
||||||
recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch), minKnownCommittedVersion(invalidVersion),
|
recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch), minKnownCommittedVersion(invalidVersion),
|
||||||
poppedVersion(invalidVersion), version(req.startVersion - 1), cc("BackupWorker", id.toString()) {
|
savedVersion(invalidVersion), lastSeenVersion(invalidVersion), version(req.startVersion - 1),
|
||||||
|
cc("BackupWorker", id.toString()) {
|
||||||
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true);
|
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true);
|
||||||
|
|
||||||
specialCounter(cc, "PoppedVersion", [this]() { return this->poppedVersion; });
|
specialCounter(cc, "SavedVersion", [this]() { return this->savedVersion; });
|
||||||
specialCounter(cc, "MinKnownCommittedVersion", [this]() { return this->minKnownCommittedVersion; });
|
specialCounter(cc, "MinKnownCommittedVersion", [this]() { return this->minKnownCommittedVersion; });
|
||||||
logger = traceCounters("BackupWorkerMetrics", myId, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc,
|
logger = traceCounters("BackupWorkerMetrics", myId, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc,
|
||||||
"BackupWorkerMetrics");
|
"BackupWorkerMetrics");
|
||||||
|
@ -80,38 +81,51 @@ ACTOR Future<Void> saveProgress(BackupData* self, Version backupVersion) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Uploads self->messages to cloud storage.
|
// Uploads self->messages to cloud storage and updates poppedVersion.
|
||||||
ACTOR Future<Void> uploadData(BackupData* self) {
|
ACTOR Future<Void> uploadData(BackupData* self) {
|
||||||
state Version popVersion = invalidVersion;
|
state Version popVersion = invalidVersion;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
ASSERT(self->messages.size() == self->versions.size());
|
ASSERT(self->messages.size() == self->versions.size());
|
||||||
while (!self->messages.empty()) {
|
if (self->savedVersion >= self->endVersion) {
|
||||||
// if (self->versions[0] > self->minKnownCommittedVersion) break;
|
|
||||||
popVersion = std::max(popVersion, self->versions[0]);
|
|
||||||
// TODO: consume the messages
|
|
||||||
self->messages.erase(self->messages.begin());
|
|
||||||
self->versions.erase(self->versions.begin());
|
|
||||||
}
|
|
||||||
// TODO: upload messages
|
|
||||||
Future<Void> savedProgress = Void();
|
|
||||||
if (self->logSystem.get() && popVersion > self->poppedVersion) {
|
|
||||||
savedProgress = saveProgress(self, popVersion);
|
|
||||||
}
|
|
||||||
wait(delay(30) && savedProgress); // TODO: knobify the delay of 30s
|
|
||||||
if (self->logSystem.get() && popVersion > self->poppedVersion) {
|
|
||||||
const Tag popTag = self->logSystem.get()->getPseudoPopTag(self->tag, ProcessClass::BackupClass);
|
|
||||||
self->logSystem.get()->pop(popVersion, popTag);
|
|
||||||
self->poppedVersion = popVersion;
|
|
||||||
TraceEvent("BackupWorkerPop", self->myId)
|
|
||||||
.detail("V", popVersion)
|
|
||||||
.detail("Tag", self->tag.toString())
|
|
||||||
.detail("PopTag", popTag.toString());
|
|
||||||
}
|
|
||||||
if (self->poppedVersion >= self->endVersion) {
|
|
||||||
self->backupDone.trigger();
|
self->backupDone.trigger();
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (self->messages.empty()) {
|
||||||
|
// Even though messages is empty, we still want to advance popVersion.
|
||||||
|
popVersion = std::max(popVersion, self->lastSeenVersion);
|
||||||
|
} else {
|
||||||
|
while (!self->messages.empty()) {
|
||||||
|
// if (self->versions[0] > self->minKnownCommittedVersion) break;
|
||||||
|
popVersion = std::max(popVersion, self->versions[0]);
|
||||||
|
// TODO: consume the messages
|
||||||
|
self->messages.erase(self->messages.begin());
|
||||||
|
self->versions.erase(self->versions.begin());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: upload messages
|
||||||
|
|
||||||
|
state Future<Void> savedProgress = Never();
|
||||||
|
if (popVersion > self->savedVersion) {
|
||||||
|
savedProgress = saveProgress(self, popVersion);
|
||||||
|
}
|
||||||
|
// TODO: knobify the delay of 20s. This delay is sensitive, as it is the
|
||||||
|
// lag TLog might have.
|
||||||
|
state Future<Void> uploadDelay = delay(20);
|
||||||
|
|
||||||
|
loop choose {
|
||||||
|
when(wait(uploadDelay)) {
|
||||||
|
// Note the progress may not be saved yet. Cancel it?
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
when(wait(savedProgress)) {
|
||||||
|
TraceEvent("BackupWorkerSavedProgress", self->myId).detail("Version", popVersion);
|
||||||
|
self->savedVersion = std::max(popVersion, self->savedVersion);
|
||||||
|
savedProgress = Never(); // Still wait until uploadDelay expires
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -149,8 +163,22 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tagAt = std::max(r->version().version, lastVersion);
|
tagAt = std::max(r->version().version, lastVersion);
|
||||||
|
self->lastSeenVersion = std::max(tagAt, self->lastSeenVersion);
|
||||||
TraceEvent("BackupWorkerGot", self->myId).suppressFor(1.0).detail("V", tagAt);
|
TraceEvent("BackupWorkerGot", self->myId).suppressFor(1.0).detail("V", tagAt);
|
||||||
if (tagAt > self->endVersion) return Void();
|
if (tagAt > self->endVersion) return Void();
|
||||||
|
wait(yield());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We need to pop data faster than the upload interval to avoid accumulate
|
||||||
|
// mutations in TLogs.
|
||||||
|
ACTOR Future<Void> popData(BackupData* self) {
|
||||||
|
loop {
|
||||||
|
if (self->logSystem.get()) {
|
||||||
|
const Tag popTag = self->logSystem.get()->getPseudoPopTag(self->tag, ProcessClass::BackupClass);
|
||||||
|
self->logSystem.get()->pop(self->savedVersion, popTag);
|
||||||
|
}
|
||||||
|
wait(delay(2.0)); // TODO: knobify this delay
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,7 +192,7 @@ ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db, LogEpoch r
|
||||||
if (isDisplaced) {
|
if (isDisplaced) {
|
||||||
TraceEvent("BackupWorkerDisplaced", self->myId)
|
TraceEvent("BackupWorkerDisplaced", self->myId)
|
||||||
.detail("RecoveryCount", recoveryCount)
|
.detail("RecoveryCount", recoveryCount)
|
||||||
.detail("PoppedVersion", self->poppedVersion)
|
.detail("SavedVersion", self->savedVersion)
|
||||||
.detail("DBRecoveryCount", db->get().recoveryCount)
|
.detail("DBRecoveryCount", db->get().recoveryCount)
|
||||||
.detail("RecoveryState", (int)db->get().recoveryState);
|
.detail("RecoveryState", (int)db->get().recoveryState);
|
||||||
throw worker_removed();
|
throw worker_removed();
|
||||||
|
@ -184,11 +212,13 @@ ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest
|
||||||
TraceEvent("BackupWorkerStart", interf.id())
|
TraceEvent("BackupWorkerStart", interf.id())
|
||||||
.detail("Tag", req.routerTag.toString())
|
.detail("Tag", req.routerTag.toString())
|
||||||
.detail("StartVersion", req.startVersion)
|
.detail("StartVersion", req.startVersion)
|
||||||
|
.detail("EndVersion", req.endVersion.present() ? req.endVersion.get() : -1)
|
||||||
.detail("LogEpoch", req.recruitedEpoch)
|
.detail("LogEpoch", req.recruitedEpoch)
|
||||||
.detail("BackupEpoch", req.backupEpoch);
|
.detail("BackupEpoch", req.backupEpoch);
|
||||||
try {
|
try {
|
||||||
addActor.send(pullAsyncData(&self));
|
addActor.send(pullAsyncData(&self));
|
||||||
addActor.send(uploadData(&self));
|
addActor.send(uploadData(&self));
|
||||||
|
addActor.send(popData(&self));
|
||||||
addActor.send(waitFailureServer(interf.waitFailure.getFuture()));
|
addActor.send(waitFailureServer(interf.waitFailure.getFuture()));
|
||||||
|
|
||||||
loop choose {
|
loop choose {
|
||||||
|
|
Loading…
Reference in New Issue