diff --git a/fdbclient/BackupContainer.actor.cpp b/fdbclient/BackupContainer.actor.cpp index 9397be7b1d..3b9ce2389e 100644 --- a/fdbclient/BackupContainer.actor.cpp +++ b/fdbclient/BackupContainer.actor.cpp @@ -344,10 +344,11 @@ public: } Future> writeTaggedLogFile(Version beginVersion, Version endVersion, int blockSize, - uint16_t tagId) final { + uint16_t tagId, int totalTags) final { return writeFile(logVersionFolderString(beginVersion, true) + - format("log,%lld,%lld,%s,%d,%d", beginVersion, endVersion, - deterministicRandom()->randomUniqueID().toString().c_str(), blockSize, tagId)); + format("log,%lld,%lld,%s,%d,%d-of-%d", beginVersion, endVersion, + deterministicRandom()->randomUniqueID().toString().c_str(), blockSize, tagId, + totalTags)); } Future> writeRangeFile(Version snapshotBeginVersion, int snapshotFileCount, Version fileVersion, int blockSize) override { @@ -398,8 +399,8 @@ public: if(sscanf(name.c_str(), "log,%" SCNd64 ",%" SCNd64 ",%*[^,],%u%n", &f.beginVersion, &f.endVersion, &f.blockSize, &len) == 3 && len == name.size()) { out = f; return true; - } else if (sscanf(name.c_str(), "log,%" SCNd64 ",%" SCNd64 ",%*[^,],%u,%d%n", &f.beginVersion, &f.endVersion, - &f.blockSize, &f.tagId, &len) == 4 && + } else if (sscanf(name.c_str(), "log,%" SCNd64 ",%" SCNd64 ",%*[^,],%u,%d-of-%d%n", &f.beginVersion, + &f.endVersion, &f.blockSize, &f.tagId, &f.totalTags, &len) == 5 && len == name.size() && f.tagId >= 0) { out = f; return true; @@ -488,7 +489,6 @@ public: ACTOR static Future writeKeyspaceSnapshotFile_impl(Reference bc, std::vector fileNames, int64_t totalBytes) { ASSERT(!fileNames.empty()); - state Version minVer = std::numeric_limits::max(); state Version maxVer = 0; state RangeFile rf; @@ -528,7 +528,7 @@ public: return Void(); } - Future writeKeyspaceSnapshotFile(std::vector fileNames, int64_t totalBytes) override { + Future writeKeyspaceSnapshotFile(std::vector fileNames, int64_t totalBytes) final { return writeKeyspaceSnapshotFile_impl(Reference::addRef(this), fileNames, totalBytes); }; diff --git a/fdbclient/BackupContainer.h b/fdbclient/BackupContainer.h index 9c4526e6f4..437f6e3eaa 100644 --- a/fdbclient/BackupContainer.h +++ b/fdbclient/BackupContainer.h @@ -75,6 +75,7 @@ struct LogFile { std::string fileName; int64_t fileSize; int tagId = -1; // Log router tag. Non-negative for new backup format. + int totalTags = -1; // Total number of log router tags. // Order by beginVersion, break ties with endVersion bool operator< (const LogFile &rhs) const { @@ -220,7 +221,7 @@ public: // Open a tagged log file for writing, where tagId is the log router tag's id. virtual Future> writeTaggedLogFile(Version beginVersion, Version endVersion, int blockSize, - uint16_t tagId) = 0; + uint16_t tagId, int totalTags) = 0; // Write a KeyspaceSnapshotFile of range file names representing a full non overlapping // snapshot of the key ranges this backup is targeting. diff --git a/fdbserver/BackupProgress.actor.cpp b/fdbserver/BackupProgress.actor.cpp index 0e7ccbaaa9..5492db7aa8 100644 --- a/fdbserver/BackupProgress.actor.cpp +++ b/fdbserver/BackupProgress.actor.cpp @@ -37,8 +37,8 @@ void BackupProgress::addBackupStatus(const WorkerBackupStatus& status) { } } -std::map, std::map> BackupProgress::getUnfinishedBackup() { - std::map, std::map> toRecruit; +std::map, std::map> BackupProgress::getUnfinishedBackup() { + std::map, std::map> toRecruit; if (!backupStartedValue.present()) return toRecruit; // No active backups @@ -68,7 +68,7 @@ std::map, std::map> BackupProgress::g .detail("EndVersion", info.epochEnd); } if (!tagVersions.empty()) { - toRecruit[{ epoch, info.epochEnd }] = tagVersions; + toRecruit[{ epoch, info.epochEnd, info.logRouterTags }] = tagVersions; } } return toRecruit; @@ -115,11 +115,12 @@ TEST_CASE("/BackupProgress/Unfinished") { BackupProgress progress(UID(0, 0), epochInfos); progress.setBackupStartedValue(Optional(LiteralStringRef("1"))); - std::map, std::map> unfinished = progress.getUnfinishedBackup(); + std::map, std::map> unfinished = progress.getUnfinishedBackup(); ASSERT(unfinished.size() == 1); - for (const auto [epochVersion, tagVersion] : unfinished) { - ASSERT(epochVersion.first == epoch1 && epochVersion.second == end1); + for (const auto [epochVersionCount, tagVersion] : unfinished) { + ASSERT(std::get<0>(epochVersionCount) == epoch1 && std::get<1>(epochVersionCount) == end1 && + std::get<2>(epochVersionCount) == 1); ASSERT(tagVersion.size() == 1 && tagVersion.begin()->first == tag1 && tagVersion.begin()->second == begin1); } @@ -128,8 +129,9 @@ TEST_CASE("/BackupProgress/Unfinished") { progress.addBackupStatus(status1); unfinished = progress.getUnfinishedBackup(); ASSERT(unfinished.size() == 1); - for (const auto [epochVersion, tagVersion] : unfinished) { - ASSERT(epochVersion.first == epoch1 && epochVersion.second == end1); + for (const auto [epochVersionCount, tagVersion] : unfinished) { + ASSERT(std::get<0>(epochVersionCount) == epoch1 && std::get<1>(epochVersionCount) == end1 && + std::get<2>(epochVersionCount) == 1); ASSERT(tagVersion.size() == 1 && tagVersion.begin()->first == tag1 && tagVersion.begin()->second == saved1 + 1); } diff --git a/fdbserver/BackupProgress.actor.h b/fdbserver/BackupProgress.actor.h index f12002dbfe..90e93fc95e 100644 --- a/fdbserver/BackupProgress.actor.h +++ b/fdbserver/BackupProgress.actor.h @@ -25,6 +25,8 @@ #define FDBSERVER_BACKUPPROGRESS_ACTOR_H #include +#include + #include "fdbclient/FDBTypes.h" #include "fdbserver/LogSystem.h" #include "flow/Arena.h" @@ -41,7 +43,7 @@ public: // savedVersion is used. void addBackupStatus(const WorkerBackupStatus& status); - // Returns a map of pair : map, so that + // Returns a map of tuple : map, so that // the backup range should be [savedVersion + 1, endVersion) for the "tag" of the "Epoch". // // Specifically, the backup ranges for each old epoch are: @@ -49,7 +51,7 @@ public: // backup [epochBegin, endVersion) // else if savedVersion < endVersion - 1 = knownCommittedVersion // backup [savedVersion + 1, endVersion) - std::map, std::map> getUnfinishedBackup(); + std::map, std::map> getUnfinishedBackup(); // Set the value for "backupStartedKey" void setBackupStartedValue(Optional value) { diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index 9cbbbc0659..f2f4d675f8 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -67,6 +67,7 @@ struct VersionedMessage { struct BackupData { const UID myId; const Tag tag; // LogRouter tag for this worker, i.e., (-2, i) + const int totalTags; // Total log router tags const Version startVersion; const Optional endVersion; // old epoch's end version (inclusive), or empty for current epoch const LogEpoch recruitedEpoch; @@ -102,9 +103,9 @@ struct BackupData { Future logger; explicit BackupData(UID id, Reference> db, const InitializeBackupRequest& req) - : myId(id), tag(req.routerTag), startVersion(req.startVersion), endVersion(req.endVersion), - recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch), minKnownCommittedVersion(invalidVersion), - savedVersion(invalidVersion), cc("BackupWorker", myId.toString()) { + : myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion), + endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch), + minKnownCommittedVersion(invalidVersion), savedVersion(invalidVersion), cc("BackupWorker", myId.toString()) { cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true); pullFinished.set(false); @@ -417,7 +418,7 @@ ACTOR Future saveMutationsToFile(BackupData* self, Version popVersion, int it->second.lastSavedVersion = self->messages[0].getVersion(); } logFileFutures.push_back(it->second.container.get().get()->writeTaggedLogFile( - it->second.lastSavedVersion, popVersion + 1, blockSize, self->tag.id)); + it->second.lastSavedVersion, popVersion + 1, blockSize, self->tag.id, self->totalTags)); it++; } if (activeUids.empty()) { @@ -648,6 +649,7 @@ ACTOR Future backupWorker(BackupInterface interf, InitializeBackupRequest TraceEvent("BackupWorkerStart", self.myId) .detail("Tag", req.routerTag.toString()) + .detail("TotalTags", req.totalTags) .detail("StartVersion", req.startVersion) .detail("EndVersion", req.endVersion.present() ? req.endVersion.get() : -1) .detail("LogEpoch", req.recruitedEpoch) diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index ee613912a1..c8885cb4a0 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -170,6 +170,7 @@ struct InitializeBackupRequest { LogEpoch backupEpoch; // The epoch the worker should work on. If different from the recruitedEpoch, then it refers // to some previous epoch with unfinished work. Tag routerTag; + int totalTags; Version startVersion; Optional endVersion; ReplyPromise reply; @@ -179,7 +180,7 @@ struct InitializeBackupRequest { template void serialize(Ar& ar) { - serializer(ar, reqId, recruitedEpoch, backupEpoch, routerTag, startVersion, endVersion, reply); + serializer(ar, reqId, recruitedEpoch, backupEpoch, routerTag, totalTags, startVersion, endVersion, reply); } }; diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index a556c2fec2..7acf67b72a 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -1261,6 +1261,7 @@ ACTOR static Future recruitBackupWorkers(Reference self, Datab req.recruitedEpoch = epoch; req.backupEpoch = epoch; req.routerTag = idsTags[i].second; + req.totalTags = logRouterTags; req.startVersion = startVersion; TraceEvent("BackupRecruitment", self->dbgid) .detail("BKID", req.reqId) @@ -1275,17 +1276,19 @@ ACTOR static Future recruitBackupWorkers(Reference self, Datab } wait(gotProgress); - std::map, std::map> toRecruit = backupProgress->getUnfinishedBackup(); - for (const auto& [epochVersion, tagVersions] : toRecruit) { + std::map, std::map> toRecruit = + backupProgress->getUnfinishedBackup(); + for (const auto& [epochVersionCount, tagVersions] : toRecruit) { for (const auto& [tag, version] : tagVersions) { const auto& worker = self->backupWorkers[i % self->backupWorkers.size()]; i++; InitializeBackupRequest req(deterministicRandom()->randomUniqueID()); req.recruitedEpoch = epoch; - req.backupEpoch = epochVersion.first; + req.backupEpoch = std::get<0>(epochVersionCount); req.routerTag = tag; + req.totalTags = std::get<2>(epochVersionCount); req.startVersion = version; // savedVersion + 1 - req.endVersion = epochVersion.second - 1; + req.endVersion = std::get<1>(epochVersionCount) - 1; TraceEvent("BackupRecruitment", self->dbgid) .detail("BKID", req.reqId) .detail("Tag", req.routerTag.toString())