BackupProgress uses old epoch's begin version if no progress found
Get rid of the complex logic of choosing the largest saved version from previous epoch for the oldest epoch. Instead, use the begin version now available from log system.
This commit is contained in:
parent
42430e8f5e
commit
4ed75e37f3
|
@ -20,65 +20,38 @@ void BackupProgress::addBackupStatus(const WorkerBackupStatus& status) {
|
|||
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> BackupProgress::getUnfinishedBackup() {
|
||||
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> toRecruit;
|
||||
|
||||
Version lastEpochEndVersion = invalidVersion;
|
||||
for (const auto& [epoch, tagsAndEndVersion] : epochTagsEndVersions) {
|
||||
if (lastEpochEndVersion == invalidVersion) {
|
||||
lastEpochEndVersion = getLastEpochEndVersion(epoch);
|
||||
TraceEvent("BW", dbgid).detail("Epoch", epoch).detail("LastEndVersion", lastEpochEndVersion);
|
||||
}
|
||||
std::set<Tag> tags = enumerateLogRouterTags(tagsAndEndVersion.first);
|
||||
const Version& endVersion = tagsAndEndVersion.second;
|
||||
for (const auto& [epoch, info] : epochInfos) {
|
||||
std::set<Tag> tags = enumerateLogRouterTags(info.logRouterTags);
|
||||
std::map<Tag, Version> tagVersions;
|
||||
auto progressIt = progress.find(epoch);
|
||||
if (progressIt != progress.end()) {
|
||||
for (const auto& [tag, savedVersion] : progressIt->second) {
|
||||
tags.erase(tag);
|
||||
if (savedVersion < endVersion - 1) {
|
||||
if (savedVersion < info.epochEnd - 1) {
|
||||
tagVersions.insert({ tag, savedVersion });
|
||||
TraceEvent("BW", dbgid)
|
||||
.detail("OldEpoch", epoch)
|
||||
.detail("Tag", tag.toString())
|
||||
.detail("Version", savedVersion)
|
||||
.detail("EpochEndVersion", endVersion);
|
||||
.detail("BeginVersion", savedVersion)
|
||||
.detail("EndVersion", info.epochEnd);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (const Tag tag : tags) { // tags without progress data
|
||||
tagVersions.insert({ tag, lastEpochEndVersion });
|
||||
tagVersions.insert({ tag, info.epochBegin - 1 });
|
||||
TraceEvent("BW", dbgid)
|
||||
.detail("OldEpoch", epoch)
|
||||
.detail("Tag", tag.toString())
|
||||
.detail("Version", lastEpochEndVersion)
|
||||
.detail("EpochEndVersion", endVersion);
|
||||
.detail("BeginVersion", info.epochBegin - 1)
|
||||
.detail("EndVersion", info.epochEnd);
|
||||
}
|
||||
if (!tagVersions.empty()) {
|
||||
toRecruit[{ epoch, endVersion }] = tagVersions;
|
||||
toRecruit[{ epoch, info.epochEnd }] = tagVersions;
|
||||
}
|
||||
lastEpochEndVersion = tagsAndEndVersion.second;
|
||||
}
|
||||
return toRecruit;
|
||||
}
|
||||
|
||||
Version BackupProgress::getLastEpochEndVersion(LogEpoch epoch) {
|
||||
auto it = progress.lower_bound(epoch);
|
||||
if (it != progress.end()) {
|
||||
if (it == progress.begin()) {
|
||||
it = progress.end();
|
||||
} else {
|
||||
it--;
|
||||
}
|
||||
} else if (!progress.empty()) {
|
||||
it = --progress.end();
|
||||
}
|
||||
if (it == progress.end()) return 1;
|
||||
|
||||
Version v = 0;
|
||||
for (const auto& [tag, savedVersion] : it->second) {
|
||||
v = std::max(v, savedVersion);
|
||||
}
|
||||
return v + 1;
|
||||
}
|
||||
|
||||
// Returns each tag's savedVersion for all epochs.
|
||||
ACTOR Future<Void> getBackupProgress(Database cx, UID dbgid, Reference<BackupProgress> bStatus) {
|
||||
state Transaction tr(cx);
|
||||
|
@ -108,19 +81,19 @@ ACTOR Future<Void> getBackupProgress(Database cx, UID dbgid, Reference<BackupPro
|
|||
}
|
||||
|
||||
TEST_CASE("/BackupProgress/Unfinished") {
|
||||
std::map<LogEpoch, std::pair<int, Version>> epochTagsEndVersions;
|
||||
std::map<LogEpoch, ILogSystem::EpochTagsVersionsInfo> epochInfos;
|
||||
|
||||
const int epoch1 = 2, end1 = 100;
|
||||
const int epoch1 = 2, begin1 = 1, end1 = 100;
|
||||
const Tag tag1(tagLocalityLogRouter, 0);
|
||||
epochTagsEndVersions.insert({ epoch1, { 1, end1 } });
|
||||
BackupProgress progress(UID(0, 0), epochTagsEndVersions);
|
||||
epochInfos.insert({ epoch1, ILogSystem::EpochTagsVersionsInfo(1, begin1, end1) });
|
||||
BackupProgress progress(UID(0, 0), epochInfos);
|
||||
|
||||
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> unfinished = progress.getUnfinishedBackup();
|
||||
|
||||
ASSERT(unfinished.size() == 1);
|
||||
for (const auto [epochVersion, tagVersion] : unfinished) {
|
||||
ASSERT(epochVersion.first == epoch1 && epochVersion.second == end1);
|
||||
ASSERT(tagVersion.size() == 1 && tagVersion.begin()->first == tag1 && tagVersion.begin()->second == 1);
|
||||
ASSERT(tagVersion.size() == 1 && tagVersion.begin()->first == tag1 && tagVersion.begin()->second == begin1 - 1);
|
||||
}
|
||||
|
||||
const int saved1 = 50;
|
||||
|
|
|
@ -26,14 +26,15 @@
|
|||
|
||||
#include <map>
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbserver/LogSystem.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/FastRef.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
class BackupProgress : NonCopyable, ReferenceCounted<BackupProgress> {
|
||||
public:
|
||||
BackupProgress(UID id, const std::map<LogEpoch, std::pair<int, Version>>& epochTagsEndVersions)
|
||||
: dbgid(id), epochTagsEndVersions(epochTagsEndVersions) {}
|
||||
BackupProgress(UID id, const std::map<LogEpoch, ILogSystem::EpochTagsVersionsInfo>& infos)
|
||||
: dbgid(id), epochInfos(infos) {}
|
||||
~BackupProgress() {}
|
||||
|
||||
// Adds a backup status. If the tag already has an entry, then the max of
|
||||
|
@ -43,12 +44,11 @@ public:
|
|||
// Returns a map of pair<Epoch, endVersion> : map<tag, savedVersion>, so that
|
||||
// the backup range should be [savedVersion + 1, endVersion) for the "tag" of the "Epoch".
|
||||
//
|
||||
// for each old epoch:
|
||||
// Specifically, the backup ranges for each old epoch are:
|
||||
// if tag in tags_without_backup_progress:
|
||||
// savedVersion = last epoch's endVersion - 1 (First epoch's startVersion = 1)
|
||||
// backup [epochBegin, endVersion)
|
||||
// else if savedVersion < endVersion - 1 = knownCommittedVersion
|
||||
// savedVersion = tag's_savedVersion
|
||||
// backup [savedVersion + 1, endVersion)
|
||||
// backup [savedVersion + 1, endVersion)
|
||||
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> getUnfinishedBackup();
|
||||
|
||||
void addref() override { ReferenceCounted<BackupProgress>::addref(); }
|
||||
|
@ -56,12 +56,6 @@ public:
|
|||
void delref() override { ReferenceCounted<BackupProgress>::delref(); }
|
||||
|
||||
private:
|
||||
// Returns the previous epoch's EndVersion. Note epoch is not continuous, and
|
||||
// we often has previous epoch N and current epoch N + 2. In case of consecutive
|
||||
// recovery, more gaps are possible. The idea here is to find the last epoch whose
|
||||
// epoch number most close to "epoch", and return the max(savedVersion) + 1.
|
||||
Version getLastEpochEndVersion(LogEpoch epoch);
|
||||
|
||||
std::set<Tag> enumerateLogRouterTags(int logRouterTags) {
|
||||
std::set<Tag> tags;
|
||||
for (int i = 0; i < logRouterTags; i++) {
|
||||
|
@ -72,12 +66,12 @@ private:
|
|||
|
||||
const UID dbgid;
|
||||
|
||||
// Note this should be iterated in ascending order.
|
||||
const std::map<LogEpoch, std::pair<int, Version>> epochTagsEndVersions;
|
||||
// Note this MUST be iterated in ascending order.
|
||||
const std::map<LogEpoch, ILogSystem::EpochTagsVersionsInfo> epochInfos;
|
||||
|
||||
// Backup progress saved in the system keyspace. Note there can be multiple
|
||||
// progress status for a tag in an epoch due to later epoch trying to fill
|
||||
// the gap. "progress" should be iterated in ascending order.
|
||||
// the gap. "progress" MUST be iterated in ascending order.
|
||||
std::map<LogEpoch, std::map<Tag, Version>> progress;
|
||||
};
|
||||
|
||||
|
|
|
@ -716,8 +716,16 @@ struct ILogSystem {
|
|||
|
||||
virtual Version getStartVersion() const = 0; // Returns the start version of current epoch.
|
||||
|
||||
// Returns (tags, endVersion) pair for old epochs that this log system is aware of, excluding the current epoch.
|
||||
virtual std::map<LogEpoch, std::pair<int32_t, Version>> getOldEpochTagsAndEndVersions() const = 0;
|
||||
struct EpochTagsVersionsInfo {
|
||||
int32_t logRouterTags; // Number of log router tags.
|
||||
Version epochBegin, epochEnd;
|
||||
|
||||
explicit EpochTagsVersionsInfo(int32_t n, Version begin, Version end)
|
||||
: logRouterTags(n), epochBegin(begin), epochEnd(end) {}
|
||||
};
|
||||
|
||||
// Returns EpochTagVersionsInfo for old epochs that this log system is aware of, excluding the current epoch.
|
||||
virtual std::map<LogEpoch, EpochTagsVersionsInfo> getOldEpochTagsVersionsInfo() const = 0;
|
||||
|
||||
virtual Future<Reference<ILogSystem>> newEpoch( struct RecruitFromConfigurationReply const& recr, Future<struct RecruitRemoteFromConfigurationReply> const& fRemoteWorkers, DatabaseConfiguration const& config,
|
||||
LogEpoch recoveryCount, int8_t primaryLocality, int8_t remoteLocality, std::vector<Tag> const& allTags, Reference<AsyncVar<bool>> const& recruitmentStalled ) = 0;
|
||||
|
|
|
@ -1357,16 +1357,18 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
return tLogs[0]->startVersion;
|
||||
}
|
||||
|
||||
std::map<LogEpoch, std::pair<int32_t, Version>> getOldEpochTagsAndEndVersions() const override {
|
||||
std::map<LogEpoch, std::pair<int32_t, Version>> epochTagsVersions;
|
||||
std::map<LogEpoch, ILogSystem::EpochTagsVersionsInfo> getOldEpochTagsVersionsInfo() const override {
|
||||
std::map<LogEpoch, EpochTagsVersionsInfo> epochInfos;
|
||||
for (const auto& old : oldLogData) {
|
||||
epochTagsVersions[old.epoch] = { old.logRouterTags, old.epochEnd };
|
||||
epochInfos.insert(
|
||||
{ old.epoch, ILogSystem::EpochTagsVersionsInfo(old.logRouterTags, old.epochBegin, old.epochEnd) });
|
||||
TraceEvent("OldEpochTagsVersions", dbgid)
|
||||
.detail("Epoch", old.epoch)
|
||||
.detail("Tags", old.logRouterTags)
|
||||
.detail("BeginVersion", old.epochBegin)
|
||||
.detail("EndVersion", old.epochEnd);
|
||||
}
|
||||
return epochTagsVersions;
|
||||
return epochInfos;
|
||||
}
|
||||
|
||||
inline Reference<LogSet> getEpochLogSet(LogEpoch epoch) const {
|
||||
|
|
|
@ -1241,7 +1241,7 @@ ACTOR static Future<Void> recruitBackupWorkers(Reference<MasterData> self) {
|
|||
state LogEpoch epoch = self->cstate.myDBState.recoveryCount;
|
||||
state Database cx = openDBOnServer(self->dbInfo, TaskPriority::DefaultEndpoint, true, true);
|
||||
state Reference<BackupProgress> backupProgress(
|
||||
new BackupProgress(self->dbgid, self->logSystem->getOldEpochTagsAndEndVersions()));
|
||||
new BackupProgress(self->dbgid, self->logSystem->getOldEpochTagsVersionsInfo()));
|
||||
state Future<Void> gotProgress = getBackupProgress(cx, self->dbgid, backupProgress);
|
||||
state std::vector<Future<BackupInterface>> initializationReplies;
|
||||
|
||||
|
|
Loading…
Reference in New Issue