Fix backup worker not popping up to end version

Previously, the pop version is the min of minKnownCommittedVersion and
endVersion. In the case of backup worker for previous epoch, the endVersion
should be used.
This commit is contained in:
Jingyu Zhou 2019-09-21 21:55:19 -07:00
parent 40436a4e78
commit 297da14aba
1 changed files with 51 additions and 46 deletions

View File

@ -45,7 +45,7 @@ struct BackupData {
const UID myId;
const Tag tag; // LogRouter tag for this worker, i.e., (-2, i)
const Version startVersion;
const Version endVersion; // old epoch's end version (inclusive), or numeric limit for current epoch
const Optional<Version> endVersion; // old epoch's end version (inclusive), or empty for current epoch
const LogEpoch recruitedEpoch;
const LogEpoch backupEpoch;
Version minKnownCommittedVersion;
@ -54,16 +54,17 @@ struct BackupData {
Database cx;
std::vector<VersionedMessage> messages;
Reference<IBackupContainer> container;
AsyncVar<bool> pullFinished;
CounterCollection cc;
Future<Void> logger;
explicit BackupData(UID id, Reference<AsyncVar<ServerDBInfo>> db, const InitializeBackupRequest& req)
: myId(id), tag(req.routerTag), startVersion(req.startVersion),
endVersion(req.endVersion.present() ? req.endVersion.get() : std::numeric_limits<Version>::max()),
: myId(id), tag(req.routerTag), startVersion(req.startVersion), endVersion(req.endVersion),
recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch), minKnownCommittedVersion(invalidVersion),
savedVersion(invalidVersion), lastSeenVersion(invalidVersion), cc("BackupWorker", id.toString()) {
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true);
pullFinished.set(false);
specialCounter(cc, "SavedVersion", [this]() { return this->savedVersion; });
specialCounter(cc, "MinKnownCommittedVersion", [this]() { return this->minKnownCommittedVersion; });
@ -118,15 +119,15 @@ ACTOR Future<Void> saveProgress(BackupData* self, Version backupVersion) {
// Returns true if the message is a mutation that should be backuped, i.e.,
// either key is not in system key space or is not a metadataVersionKey.
bool isBackupMessage(const VersionedMessage& msg) {
// std::cout << msg.message.printable() << std::endl;
// std::cout << "Tags: " << describe(msg.tags) << ", Version: " << msg.version.toString() << std::endl;
for (Tag tag : msg.tags) {
if (tag.locality == tagLocalitySpecial || tag.locality == tagLocalityTxs) {
return false; // skip Txs mutations
}
}
// std::cout << m.toString() << std::endl;
// std::cout << msg.message.printable() << std::endl;
// std::cout << "Tags: " << describe(msg.tags) << ", Version: " << msg.version.toString() << std::endl;
BinaryReader reader(msg.message.begin(), msg.message.size(), AssumeVersion(currentProtocolVersion));
// Return false for LogProtocolMessage.
@ -134,22 +135,45 @@ bool isBackupMessage(const VersionedMessage& msg) {
MutationRef m;
reader >> m;
// std::cout << m.toString() << std::endl;
// check for metadataVersionKey and special metadata mutations
if (!normalKeys.contains(m.param1) && m.param1 != metadataVersionKey) {
return false;
}
// std::cout << "BK: " << msg.version.version << " " << m.toString() << std::endl;
return true;
}
// Saves messages in the range of [0, numMsg) to a file and then remove these
// messages.
ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int numMsg) {
const int blockSize = 1 << 20;
state Reference<IBackupFile> logFile =
wait(self->container->writeLogFile(self->messages[0].getVersion(), popVersion, blockSize));
state int idx = 0;
for (; idx < numMsg; idx++) {
// TODO: Endianness for version.version & version.sub
if (!isBackupMessage(self->messages[idx])) continue;
wait(logFile->append((void*)&self->messages[idx].version.version, sizeof(Version)));
wait(logFile->append((void*)&self->messages[idx].version.sub, sizeof(int32_t)));
wait(logFile->append(self->messages[idx].message.begin(), self->messages[idx].message.size()));
}
self->messages.erase(self->messages.begin(), self->messages.begin() + numMsg);
wait(logFile->finish());
// TODO: save this somewhere with tag info.
// std::string logFilename = logFile->getFileName();
return Void();
}
// Uploads self->messages to cloud storage and updates poppedVersion.
ACTOR Future<Void> uploadData(BackupData* self) {
state Version popVersion = invalidVersion;
loop {
if (self->savedVersion >= self->endVersion) {
if (self->endVersion.present() && self->savedVersion >= self->endVersion.get()) {
self->messages.clear();
return Void();
}
@ -157,54 +181,33 @@ ACTOR Future<Void> uploadData(BackupData* self) {
// lag TLog might have. Changing to 20s may fail consistency check.
state Future<Void> uploadDelay = delay(10);
state std::string logFilename;
if (self->messages.empty()) {
// Even though messages is empty, we still want to advance popVersion.
popVersion = std::max(popVersion, self->lastSeenVersion);
} else {
state int numMsg = 0;
for (auto it = self->messages.begin(); it < self->messages.end(); it++) {
if (it->getVersion() > std::min(self->endVersion, self->minKnownCommittedVersion)) break;
popVersion = std::max(popVersion, it->getVersion());
const Version maxPopVersion =
self->endVersion.present() ? self->endVersion.get() : self->minKnownCommittedVersion;
int numMsg = 0;
for (const auto& message : self->messages) {
if (message.getVersion() > maxPopVersion) break;
popVersion = std::max(popVersion, message.getVersion());
numMsg++;
}
if (numMsg > 0) {
// Upload messages
const int blockSize = 1 << 20;
state Reference<IBackupFile> logFile =
wait(self->container->writeLogFile(self->messages[0].getVersion(), popVersion, blockSize));
state int idx = 0;
for (; idx < numMsg; idx++) {
// TODO: Endianness for version.version & version.sub
if (!isBackupMessage(self->messages[idx])) continue;
wait(logFile->append((void*)&self->messages[idx].version.version, sizeof(Version)));
wait(logFile->append((void*)&self->messages[idx].version.sub, sizeof(int32_t)));
wait(logFile->append(self->messages[idx].message.begin(), self->messages[idx].message.size()));
}
self->messages.erase(self->messages.begin(), self->messages.begin() + numMsg);
wait(logFile->finish());
logFilename = logFile->getFileName(); // TODO: save this somewhere with tag info.
wait(saveMutationsToFile(self, popVersion, numMsg));
}
}
state Future<Void> savedProgress = Never();
if (popVersion > self->savedVersion) {
savedProgress = saveProgress(self, popVersion);
wait(saveProgress(self, popVersion));
TraceEvent("BackupWorkerSavedProgress", self->myId)
.detail("Version", popVersion)
.detail("MsgQ", self->messages.size());
self->savedVersion = std::max(popVersion, self->savedVersion);
self->pop();
}
loop choose {
when(wait(uploadDelay)) {
// Note the progress may not be saved yet. Cancel it?
break;
}
when(wait(savedProgress)) {
TraceEvent("BackupWorkerSavedProgress", self->myId).detail("Version", popVersion);
self->savedVersion = std::max(popVersion, self->savedVersion);
self->pop();
savedProgress = Never(); // Still wait until uploadDelay expires
}
}
wait(uploadDelay || self->pullFinished.onChange());
}
}
@ -241,10 +244,12 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {
tagAt = std::max(r->version().version, lastVersion);
self->lastSeenVersion = std::max(tagAt, self->lastSeenVersion);
TraceEvent("BackupWorkerGot", self->myId).suppressFor(1.0).detail("V", tagAt);
if (tagAt > self->endVersion) {
if (self->endVersion.present() && tagAt > self->endVersion.get()) {
TraceEvent("BackupWorkerFinishPull", self->myId)
.detail("VersionGot", tagAt)
.detail("EndVersion", self->endVersion);
.detail("EndVersion", self->endVersion.get())
.detail("MsgQ", self->messages.size());
self->pullFinished.set(true);
return Void();
}
wait(yield());
@ -307,7 +312,7 @@ ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest
// Notify master so that this worker can be removed from log system, then this
// worker (for an old epoch's unfinished work) can safely exit.
wait(brokenPromiseToNever(db->get().master.notifyBackupWorkerDone.getReply(
BackupWorkerDoneRequest(self.myId, self.endVersion))));
BackupWorkerDoneRequest(self.myId, self.endVersion.get()))));
break;
}
when(wait(error)) {}