Filter out mutations before the true-up version

When a mutation log's begin version is true-uped, we must filter out mutations
less than such a version.
This commit is contained in:
Jingyu Zhou 2020-05-15 20:06:47 -07:00
parent 89ae1200dd
commit caca31d05a
1 changed files with 17 additions and 7 deletions

View File

@ -668,6 +668,7 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
state std::vector<Reference<IBackupFile>> logFiles;
state std::vector<int64_t> blockEnds;
state std::vector<UID> activeUids; // active Backups' UIDs
state std::vector<Version> beginVersions; // logFiles' begin versions
state KeyRangeMap<std::set<int>> keyRangeMap; // range to index in logFileFutures, logFiles, & blockEnds
state std::vector<Standalone<StringRef>> mutations;
state int idx;
@ -686,18 +687,22 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
const int index = logFileFutures.size();
activeUids.push_back(it->first);
self->insertRanges(keyRangeMap, it->second.ranges.get(), index);
if (it->second.lastSavedVersion == invalidVersion) {
if (it->second.startVersion > self->startVersion && !self->messages.empty()) {
// True-up first mutation log's begin version
it->second.lastSavedVersion = self->messages[0].getVersion();
} else {
it->second.lastSavedVersion =
std::max(self->popVersion, std::max(self->savedVersion, self->startVersion));
it->second.lastSavedVersion = std::max({ self->popVersion, self->savedVersion, self->startVersion });
}
TraceEvent("BackupWorkerTrueUp", self->myId).detail("LastSavedVersion", it->second.lastSavedVersion);
}
// The true-up version can be larger than first message version, so keep
// the begin versions for later muation filtering.
beginVersions.push_back(it->second.lastSavedVersion);
logFileFutures.push_back(it->second.container.get().get()->writeTaggedLogFile(
it->second.lastSavedVersion, popVersion + 1, blockSize, self->tag.id, self->totalTags));
// ASSERT(self->messages.empty() || self->messages[0].getVersion() >= it->second.lastSavedVersion);
it++;
}
@ -707,7 +712,7 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
std::transform(logFileFutures.begin(), logFileFutures.end(), std::back_inserter(logFiles),
[](const Future<Reference<IBackupFile>>& f) { return f.get(); });
ASSERT(activeUids.size() == logFiles.size());
ASSERT(activeUids.size() == logFiles.size() && beginVersions.size() == logFiles.size());
for (int i = 0; i < logFiles.size(); i++) {
TraceEvent("OpenMutationFile", self->myId)
.detail("BackupID", activeUids[i])
@ -732,7 +737,10 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
std::vector<Future<Void>> adds;
if (m.type != MutationRef::Type::ClearRange) {
for (int index : keyRangeMap[m.param1]) {
adds.push_back(addMutation(logFiles[index], message, message.message, &blockEnds[index], blockSize));
if (message.getVersion() >= beginVersions[index]) {
adds.push_back(
addMutation(logFiles[index], message, message.message, &blockEnds[index], blockSize));
}
}
} else {
KeyRangeRef mutationRange(m.param1, m.param2);
@ -747,8 +755,10 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
wr << subm;
mutations.push_back(wr.toValue());
for (int index : range.value()) {
adds.push_back(
addMutation(logFiles[index], message, mutations.back(), &blockEnds[index], blockSize));
if (message.getVersion() >= beginVersions[index]) {
adds.push_back(
addMutation(logFiles[index], message, mutations.back(), &blockEnds[index], blockSize));
}
}
}
}