Include a total number of tags in partition log file names

This is needed for BackupContainer to check partitioned mutation logs are
continuous, i.e., restorable to a version.
This commit is contained in:
Jingyu Zhou 2020-02-20 16:28:27 -08:00
parent c2623b5c20
commit d8c6bf585d
7 changed files with 38 additions and 27 deletions

View File

@ -344,10 +344,11 @@ public:
}
Future<Reference<IBackupFile>> writeTaggedLogFile(Version beginVersion, Version endVersion, int blockSize,
uint16_t tagId) final {
uint16_t tagId, int totalTags) final {
return writeFile(logVersionFolderString(beginVersion, true) +
format("log,%lld,%lld,%s,%d,%d", beginVersion, endVersion,
deterministicRandom()->randomUniqueID().toString().c_str(), blockSize, tagId));
format("log,%lld,%lld,%s,%d,%d-of-%d", beginVersion, endVersion,
deterministicRandom()->randomUniqueID().toString().c_str(), blockSize, tagId,
totalTags));
}
Future<Reference<IBackupFile>> writeRangeFile(Version snapshotBeginVersion, int snapshotFileCount, Version fileVersion, int blockSize) override {
@ -398,8 +399,8 @@ public:
if(sscanf(name.c_str(), "log,%" SCNd64 ",%" SCNd64 ",%*[^,],%u%n", &f.beginVersion, &f.endVersion, &f.blockSize, &len) == 3 && len == name.size()) {
out = f;
return true;
} else if (sscanf(name.c_str(), "log,%" SCNd64 ",%" SCNd64 ",%*[^,],%u,%d%n", &f.beginVersion, &f.endVersion,
&f.blockSize, &f.tagId, &len) == 4 &&
} else if (sscanf(name.c_str(), "log,%" SCNd64 ",%" SCNd64 ",%*[^,],%u,%d-of-%d%n", &f.beginVersion,
&f.endVersion, &f.blockSize, &f.tagId, &f.totalTags, &len) == 5 &&
len == name.size() && f.tagId >= 0) {
out = f;
return true;
@ -488,7 +489,6 @@ public:
ACTOR static Future<Void> writeKeyspaceSnapshotFile_impl(Reference<BackupContainerFileSystem> bc, std::vector<std::string> fileNames, int64_t totalBytes) {
ASSERT(!fileNames.empty());
state Version minVer = std::numeric_limits<Version>::max();
state Version maxVer = 0;
state RangeFile rf;
@ -528,7 +528,7 @@ public:
return Void();
}
Future<Void> writeKeyspaceSnapshotFile(std::vector<std::string> fileNames, int64_t totalBytes) override {
Future<Void> writeKeyspaceSnapshotFile(std::vector<std::string> fileNames, int64_t totalBytes) final {
return writeKeyspaceSnapshotFile_impl(Reference<BackupContainerFileSystem>::addRef(this), fileNames, totalBytes);
};

View File

@ -75,6 +75,7 @@ struct LogFile {
std::string fileName;
int64_t fileSize;
int tagId = -1; // Log router tag. Non-negative for new backup format.
int totalTags = -1; // Total number of log router tags.
// Order by beginVersion, break ties with endVersion
bool operator< (const LogFile &rhs) const {
@ -220,7 +221,7 @@ public:
// Open a tagged log file for writing, where tagId is the log router tag's id.
virtual Future<Reference<IBackupFile>> writeTaggedLogFile(Version beginVersion, Version endVersion, int blockSize,
uint16_t tagId) = 0;
uint16_t tagId, int totalTags) = 0;
// Write a KeyspaceSnapshotFile of range file names representing a full non overlapping
// snapshot of the key ranges this backup is targeting.

View File

@ -37,8 +37,8 @@ void BackupProgress::addBackupStatus(const WorkerBackupStatus& status) {
}
}
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> BackupProgress::getUnfinishedBackup() {
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> toRecruit;
std::map<std::tuple<LogEpoch, Version, int>, std::map<Tag, Version>> BackupProgress::getUnfinishedBackup() {
std::map<std::tuple<LogEpoch, Version, int>, std::map<Tag, Version>> toRecruit;
if (!backupStartedValue.present()) return toRecruit; // No active backups
@ -68,7 +68,7 @@ std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> BackupProgress::g
.detail("EndVersion", info.epochEnd);
}
if (!tagVersions.empty()) {
toRecruit[{ epoch, info.epochEnd }] = tagVersions;
toRecruit[{ epoch, info.epochEnd, info.logRouterTags }] = tagVersions;
}
}
return toRecruit;
@ -115,11 +115,12 @@ TEST_CASE("/BackupProgress/Unfinished") {
BackupProgress progress(UID(0, 0), epochInfos);
progress.setBackupStartedValue(Optional<Value>(LiteralStringRef("1")));
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> unfinished = progress.getUnfinishedBackup();
std::map<std::tuple<LogEpoch, Version, int>, std::map<Tag, Version>> unfinished = progress.getUnfinishedBackup();
ASSERT(unfinished.size() == 1);
for (const auto [epochVersion, tagVersion] : unfinished) {
ASSERT(epochVersion.first == epoch1 && epochVersion.second == end1);
for (const auto [epochVersionCount, tagVersion] : unfinished) {
ASSERT(std::get<0>(epochVersionCount) == epoch1 && std::get<1>(epochVersionCount) == end1 &&
std::get<2>(epochVersionCount) == 1);
ASSERT(tagVersion.size() == 1 && tagVersion.begin()->first == tag1 && tagVersion.begin()->second == begin1);
}
@ -128,8 +129,9 @@ TEST_CASE("/BackupProgress/Unfinished") {
progress.addBackupStatus(status1);
unfinished = progress.getUnfinishedBackup();
ASSERT(unfinished.size() == 1);
for (const auto [epochVersion, tagVersion] : unfinished) {
ASSERT(epochVersion.first == epoch1 && epochVersion.second == end1);
for (const auto [epochVersionCount, tagVersion] : unfinished) {
ASSERT(std::get<0>(epochVersionCount) == epoch1 && std::get<1>(epochVersionCount) == end1 &&
std::get<2>(epochVersionCount) == 1);
ASSERT(tagVersion.size() == 1 && tagVersion.begin()->first == tag1 && tagVersion.begin()->second == saved1 + 1);
}

View File

@ -25,6 +25,8 @@
#define FDBSERVER_BACKUPPROGRESS_ACTOR_H
#include <map>
#include <tuple>
#include "fdbclient/FDBTypes.h"
#include "fdbserver/LogSystem.h"
#include "flow/Arena.h"
@ -41,7 +43,7 @@ public:
// savedVersion is used.
void addBackupStatus(const WorkerBackupStatus& status);
// Returns a map of pair<Epoch, endVersion> : map<tag, savedVersion>, so that
// Returns a map of tuple<Epoch, endVersion, logRouterTags> : map<tag, savedVersion>, so that
// the backup range should be [savedVersion + 1, endVersion) for the "tag" of the "Epoch".
//
// Specifically, the backup ranges for each old epoch are:
@ -49,7 +51,7 @@ public:
// backup [epochBegin, endVersion)
// else if savedVersion < endVersion - 1 = knownCommittedVersion
// backup [savedVersion + 1, endVersion)
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> getUnfinishedBackup();
std::map<std::tuple<LogEpoch, Version, int>, std::map<Tag, Version>> getUnfinishedBackup();
// Set the value for "backupStartedKey"
void setBackupStartedValue(Optional<Value> value) {

View File

@ -67,6 +67,7 @@ struct VersionedMessage {
struct BackupData {
const UID myId;
const Tag tag; // LogRouter tag for this worker, i.e., (-2, i)
const int totalTags; // Total log router tags
const Version startVersion;
const Optional<Version> endVersion; // old epoch's end version (inclusive), or empty for current epoch
const LogEpoch recruitedEpoch;
@ -102,9 +103,9 @@ struct BackupData {
Future<Void> logger;
explicit BackupData(UID id, Reference<AsyncVar<ServerDBInfo>> db, const InitializeBackupRequest& req)
: myId(id), tag(req.routerTag), startVersion(req.startVersion), endVersion(req.endVersion),
recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch), minKnownCommittedVersion(invalidVersion),
savedVersion(invalidVersion), cc("BackupWorker", myId.toString()) {
: myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion),
endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch),
minKnownCommittedVersion(invalidVersion), savedVersion(invalidVersion), cc("BackupWorker", myId.toString()) {
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true);
pullFinished.set(false);
@ -415,7 +416,7 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
it->second.lastSavedVersion = self->messages[0].getVersion();
}
logFileFutures.push_back(it->second.container.get().get()->writeTaggedLogFile(
it->second.lastSavedVersion, popVersion + 1, blockSize, self->tag.id));
it->second.lastSavedVersion, popVersion + 1, blockSize, self->tag.id, self->totalTags));
it++;
}
if (activeUids.empty()) {
@ -646,6 +647,7 @@ ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest
TraceEvent("BackupWorkerStart", self.myId)
.detail("Tag", req.routerTag.toString())
.detail("TotalTags", req.totalTags)
.detail("StartVersion", req.startVersion)
.detail("EndVersion", req.endVersion.present() ? req.endVersion.get() : -1)
.detail("LogEpoch", req.recruitedEpoch)

View File

@ -170,6 +170,7 @@ struct InitializeBackupRequest {
LogEpoch backupEpoch; // The epoch the worker should work on. If different from the recruitedEpoch, then it refers
// to some previous epoch with unfinished work.
Tag routerTag;
int totalTags;
Version startVersion;
Optional<Version> endVersion;
ReplyPromise<struct InitializeBackupReply> reply;
@ -179,7 +180,7 @@ struct InitializeBackupRequest {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, reqId, recruitedEpoch, backupEpoch, routerTag, startVersion, endVersion, reply);
serializer(ar, reqId, recruitedEpoch, backupEpoch, routerTag, totalTags, startVersion, endVersion, reply);
}
};

View File

@ -1261,6 +1261,7 @@ ACTOR static Future<Void> recruitBackupWorkers(Reference<MasterData> self, Datab
req.recruitedEpoch = epoch;
req.backupEpoch = epoch;
req.routerTag = idsTags[i].second;
req.totalTags = logRouterTags;
req.startVersion = startVersion;
TraceEvent("BackupRecruitment", self->dbgid)
.detail("BKID", req.reqId)
@ -1275,17 +1276,19 @@ ACTOR static Future<Void> recruitBackupWorkers(Reference<MasterData> self, Datab
}
wait(gotProgress);
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> toRecruit = backupProgress->getUnfinishedBackup();
for (const auto& [epochVersion, tagVersions] : toRecruit) {
std::map<std::tuple<LogEpoch, Version, int>, std::map<Tag, Version>> toRecruit =
backupProgress->getUnfinishedBackup();
for (const auto& [epochVersionCount, tagVersions] : toRecruit) {
for (const auto& [tag, version] : tagVersions) {
const auto& worker = self->backupWorkers[i % self->backupWorkers.size()];
i++;
InitializeBackupRequest req(deterministicRandom()->randomUniqueID());
req.recruitedEpoch = epoch;
req.backupEpoch = epochVersion.first;
req.backupEpoch = std::get<0>(epochVersionCount);
req.routerTag = tag;
req.totalTags = std::get<2>(epochVersionCount);
req.startVersion = version; // savedVersion + 1
req.endVersion = epochVersion.second - 1;
req.endVersion = std::get<1>(epochVersionCount) - 1;
TraceEvent("BackupRecruitment", self->dbgid)
.detail("BKID", req.reqId)
.detail("Tag", req.routerTag.toString())