Include a total number of tags in partition log file names
This is needed for BackupContainer to check partitioned mutation logs are continuous, i.e., restorable to a version.
This commit is contained in:
parent
64859467e4
commit
fda6c08640
|
@ -344,10 +344,11 @@ public:
|
|||
}
|
||||
|
||||
Future<Reference<IBackupFile>> writeTaggedLogFile(Version beginVersion, Version endVersion, int blockSize,
|
||||
uint16_t tagId) final {
|
||||
uint16_t tagId, int totalTags) final {
|
||||
return writeFile(logVersionFolderString(beginVersion, true) +
|
||||
format("log,%lld,%lld,%s,%d,%d", beginVersion, endVersion,
|
||||
deterministicRandom()->randomUniqueID().toString().c_str(), blockSize, tagId));
|
||||
format("log,%lld,%lld,%s,%d,%d-of-%d", beginVersion, endVersion,
|
||||
deterministicRandom()->randomUniqueID().toString().c_str(), blockSize, tagId,
|
||||
totalTags));
|
||||
}
|
||||
|
||||
Future<Reference<IBackupFile>> writeRangeFile(Version snapshotBeginVersion, int snapshotFileCount, Version fileVersion, int blockSize) override {
|
||||
|
@ -398,8 +399,8 @@ public:
|
|||
if(sscanf(name.c_str(), "log,%" SCNd64 ",%" SCNd64 ",%*[^,],%u%n", &f.beginVersion, &f.endVersion, &f.blockSize, &len) == 3 && len == name.size()) {
|
||||
out = f;
|
||||
return true;
|
||||
} else if (sscanf(name.c_str(), "log,%" SCNd64 ",%" SCNd64 ",%*[^,],%u,%d%n", &f.beginVersion, &f.endVersion,
|
||||
&f.blockSize, &f.tagId, &len) == 4 &&
|
||||
} else if (sscanf(name.c_str(), "log,%" SCNd64 ",%" SCNd64 ",%*[^,],%u,%d-of-%d%n", &f.beginVersion,
|
||||
&f.endVersion, &f.blockSize, &f.tagId, &f.totalTags, &len) == 5 &&
|
||||
len == name.size() && f.tagId >= 0) {
|
||||
out = f;
|
||||
return true;
|
||||
|
@ -488,7 +489,6 @@ public:
|
|||
ACTOR static Future<Void> writeKeyspaceSnapshotFile_impl(Reference<BackupContainerFileSystem> bc, std::vector<std::string> fileNames, int64_t totalBytes) {
|
||||
ASSERT(!fileNames.empty());
|
||||
|
||||
|
||||
state Version minVer = std::numeric_limits<Version>::max();
|
||||
state Version maxVer = 0;
|
||||
state RangeFile rf;
|
||||
|
@ -528,7 +528,7 @@ public:
|
|||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> writeKeyspaceSnapshotFile(std::vector<std::string> fileNames, int64_t totalBytes) override {
|
||||
Future<Void> writeKeyspaceSnapshotFile(std::vector<std::string> fileNames, int64_t totalBytes) final {
|
||||
return writeKeyspaceSnapshotFile_impl(Reference<BackupContainerFileSystem>::addRef(this), fileNames, totalBytes);
|
||||
};
|
||||
|
||||
|
|
|
@ -75,6 +75,7 @@ struct LogFile {
|
|||
std::string fileName;
|
||||
int64_t fileSize;
|
||||
int tagId = -1; // Log router tag. Non-negative for new backup format.
|
||||
int totalTags = -1; // Total number of log router tags.
|
||||
|
||||
// Order by beginVersion, break ties with endVersion
|
||||
bool operator< (const LogFile &rhs) const {
|
||||
|
@ -220,7 +221,7 @@ public:
|
|||
|
||||
// Open a tagged log file for writing, where tagId is the log router tag's id.
|
||||
virtual Future<Reference<IBackupFile>> writeTaggedLogFile(Version beginVersion, Version endVersion, int blockSize,
|
||||
uint16_t tagId) = 0;
|
||||
uint16_t tagId, int totalTags) = 0;
|
||||
|
||||
// Write a KeyspaceSnapshotFile of range file names representing a full non overlapping
|
||||
// snapshot of the key ranges this backup is targeting.
|
||||
|
|
|
@ -37,8 +37,8 @@ void BackupProgress::addBackupStatus(const WorkerBackupStatus& status) {
|
|||
}
|
||||
}
|
||||
|
||||
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> BackupProgress::getUnfinishedBackup() {
|
||||
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> toRecruit;
|
||||
std::map<std::tuple<LogEpoch, Version, int>, std::map<Tag, Version>> BackupProgress::getUnfinishedBackup() {
|
||||
std::map<std::tuple<LogEpoch, Version, int>, std::map<Tag, Version>> toRecruit;
|
||||
|
||||
if (!backupStartedValue.present()) return toRecruit; // No active backups
|
||||
|
||||
|
@ -68,7 +68,7 @@ std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> BackupProgress::g
|
|||
.detail("EndVersion", info.epochEnd);
|
||||
}
|
||||
if (!tagVersions.empty()) {
|
||||
toRecruit[{ epoch, info.epochEnd }] = tagVersions;
|
||||
toRecruit[{ epoch, info.epochEnd, info.logRouterTags }] = tagVersions;
|
||||
}
|
||||
}
|
||||
return toRecruit;
|
||||
|
@ -115,11 +115,12 @@ TEST_CASE("/BackupProgress/Unfinished") {
|
|||
BackupProgress progress(UID(0, 0), epochInfos);
|
||||
progress.setBackupStartedValue(Optional<Value>(LiteralStringRef("1")));
|
||||
|
||||
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> unfinished = progress.getUnfinishedBackup();
|
||||
std::map<std::tuple<LogEpoch, Version, int>, std::map<Tag, Version>> unfinished = progress.getUnfinishedBackup();
|
||||
|
||||
ASSERT(unfinished.size() == 1);
|
||||
for (const auto [epochVersion, tagVersion] : unfinished) {
|
||||
ASSERT(epochVersion.first == epoch1 && epochVersion.second == end1);
|
||||
for (const auto [epochVersionCount, tagVersion] : unfinished) {
|
||||
ASSERT(std::get<0>(epochVersionCount) == epoch1 && std::get<1>(epochVersionCount) == end1 &&
|
||||
std::get<2>(epochVersionCount) == 1);
|
||||
ASSERT(tagVersion.size() == 1 && tagVersion.begin()->first == tag1 && tagVersion.begin()->second == begin1);
|
||||
}
|
||||
|
||||
|
@ -128,8 +129,9 @@ TEST_CASE("/BackupProgress/Unfinished") {
|
|||
progress.addBackupStatus(status1);
|
||||
unfinished = progress.getUnfinishedBackup();
|
||||
ASSERT(unfinished.size() == 1);
|
||||
for (const auto [epochVersion, tagVersion] : unfinished) {
|
||||
ASSERT(epochVersion.first == epoch1 && epochVersion.second == end1);
|
||||
for (const auto [epochVersionCount, tagVersion] : unfinished) {
|
||||
ASSERT(std::get<0>(epochVersionCount) == epoch1 && std::get<1>(epochVersionCount) == end1 &&
|
||||
std::get<2>(epochVersionCount) == 1);
|
||||
ASSERT(tagVersion.size() == 1 && tagVersion.begin()->first == tag1 && tagVersion.begin()->second == saved1 + 1);
|
||||
}
|
||||
|
||||
|
|
|
@ -25,6 +25,8 @@
|
|||
#define FDBSERVER_BACKUPPROGRESS_ACTOR_H
|
||||
|
||||
#include <map>
|
||||
#include <tuple>
|
||||
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbserver/LogSystem.h"
|
||||
#include "flow/Arena.h"
|
||||
|
@ -41,7 +43,7 @@ public:
|
|||
// savedVersion is used.
|
||||
void addBackupStatus(const WorkerBackupStatus& status);
|
||||
|
||||
// Returns a map of pair<Epoch, endVersion> : map<tag, savedVersion>, so that
|
||||
// Returns a map of tuple<Epoch, endVersion, logRouterTags> : map<tag, savedVersion>, so that
|
||||
// the backup range should be [savedVersion + 1, endVersion) for the "tag" of the "Epoch".
|
||||
//
|
||||
// Specifically, the backup ranges for each old epoch are:
|
||||
|
@ -49,7 +51,7 @@ public:
|
|||
// backup [epochBegin, endVersion)
|
||||
// else if savedVersion < endVersion - 1 = knownCommittedVersion
|
||||
// backup [savedVersion + 1, endVersion)
|
||||
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> getUnfinishedBackup();
|
||||
std::map<std::tuple<LogEpoch, Version, int>, std::map<Tag, Version>> getUnfinishedBackup();
|
||||
|
||||
// Set the value for "backupStartedKey"
|
||||
void setBackupStartedValue(Optional<Value> value) {
|
||||
|
|
|
@ -67,6 +67,7 @@ struct VersionedMessage {
|
|||
struct BackupData {
|
||||
const UID myId;
|
||||
const Tag tag; // LogRouter tag for this worker, i.e., (-2, i)
|
||||
const int totalTags; // Total log router tags
|
||||
const Version startVersion;
|
||||
const Optional<Version> endVersion; // old epoch's end version (inclusive), or empty for current epoch
|
||||
const LogEpoch recruitedEpoch;
|
||||
|
@ -102,9 +103,9 @@ struct BackupData {
|
|||
Future<Void> logger;
|
||||
|
||||
explicit BackupData(UID id, Reference<AsyncVar<ServerDBInfo>> db, const InitializeBackupRequest& req)
|
||||
: myId(id), tag(req.routerTag), startVersion(req.startVersion), endVersion(req.endVersion),
|
||||
recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch), minKnownCommittedVersion(invalidVersion),
|
||||
savedVersion(invalidVersion), cc("BackupWorker", myId.toString()) {
|
||||
: myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion),
|
||||
endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch),
|
||||
minKnownCommittedVersion(invalidVersion), savedVersion(invalidVersion), cc("BackupWorker", myId.toString()) {
|
||||
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true);
|
||||
pullFinished.set(false);
|
||||
|
||||
|
@ -417,7 +418,7 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
|
|||
it->second.lastSavedVersion = self->messages[0].getVersion();
|
||||
}
|
||||
logFileFutures.push_back(it->second.container.get().get()->writeTaggedLogFile(
|
||||
it->second.lastSavedVersion, popVersion + 1, blockSize, self->tag.id));
|
||||
it->second.lastSavedVersion, popVersion + 1, blockSize, self->tag.id, self->totalTags));
|
||||
it++;
|
||||
}
|
||||
if (activeUids.empty()) {
|
||||
|
@ -648,6 +649,7 @@ ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest
|
|||
|
||||
TraceEvent("BackupWorkerStart", self.myId)
|
||||
.detail("Tag", req.routerTag.toString())
|
||||
.detail("TotalTags", req.totalTags)
|
||||
.detail("StartVersion", req.startVersion)
|
||||
.detail("EndVersion", req.endVersion.present() ? req.endVersion.get() : -1)
|
||||
.detail("LogEpoch", req.recruitedEpoch)
|
||||
|
|
|
@ -170,6 +170,7 @@ struct InitializeBackupRequest {
|
|||
LogEpoch backupEpoch; // The epoch the worker should work on. If different from the recruitedEpoch, then it refers
|
||||
// to some previous epoch with unfinished work.
|
||||
Tag routerTag;
|
||||
int totalTags;
|
||||
Version startVersion;
|
||||
Optional<Version> endVersion;
|
||||
ReplyPromise<struct InitializeBackupReply> reply;
|
||||
|
@ -179,7 +180,7 @@ struct InitializeBackupRequest {
|
|||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, reqId, recruitedEpoch, backupEpoch, routerTag, startVersion, endVersion, reply);
|
||||
serializer(ar, reqId, recruitedEpoch, backupEpoch, routerTag, totalTags, startVersion, endVersion, reply);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -1261,6 +1261,7 @@ ACTOR static Future<Void> recruitBackupWorkers(Reference<MasterData> self, Datab
|
|||
req.recruitedEpoch = epoch;
|
||||
req.backupEpoch = epoch;
|
||||
req.routerTag = idsTags[i].second;
|
||||
req.totalTags = logRouterTags;
|
||||
req.startVersion = startVersion;
|
||||
TraceEvent("BackupRecruitment", self->dbgid)
|
||||
.detail("BKID", req.reqId)
|
||||
|
@ -1275,17 +1276,19 @@ ACTOR static Future<Void> recruitBackupWorkers(Reference<MasterData> self, Datab
|
|||
}
|
||||
|
||||
wait(gotProgress);
|
||||
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> toRecruit = backupProgress->getUnfinishedBackup();
|
||||
for (const auto& [epochVersion, tagVersions] : toRecruit) {
|
||||
std::map<std::tuple<LogEpoch, Version, int>, std::map<Tag, Version>> toRecruit =
|
||||
backupProgress->getUnfinishedBackup();
|
||||
for (const auto& [epochVersionCount, tagVersions] : toRecruit) {
|
||||
for (const auto& [tag, version] : tagVersions) {
|
||||
const auto& worker = self->backupWorkers[i % self->backupWorkers.size()];
|
||||
i++;
|
||||
InitializeBackupRequest req(deterministicRandom()->randomUniqueID());
|
||||
req.recruitedEpoch = epoch;
|
||||
req.backupEpoch = epochVersion.first;
|
||||
req.backupEpoch = std::get<0>(epochVersionCount);
|
||||
req.routerTag = tag;
|
||||
req.totalTags = std::get<2>(epochVersionCount);
|
||||
req.startVersion = version; // savedVersion + 1
|
||||
req.endVersion = epochVersion.second - 1;
|
||||
req.endVersion = std::get<1>(epochVersionCount) - 1;
|
||||
TraceEvent("BackupRecruitment", self->dbgid)
|
||||
.detail("BKID", req.reqId)
|
||||
.detail("Tag", req.routerTag.toString())
|
||||
|
|
Loading…
Reference in New Issue