Recruit backup workers for old epochs
If there are unfinished ranges in the old epochs, the new master will recruit backup workers responsible for finishing these ranges. These workers remains in the cluster until the next epoch, when it will remove itself.
This commit is contained in:
parent
ac851619bb
commit
19d6a889ff
|
@ -971,4 +971,18 @@ struct HealthMetrics {
|
|||
}
|
||||
};
|
||||
|
||||
struct WorkerBackupStatus {
|
||||
LogEpoch epoch;
|
||||
Version version;
|
||||
Tag tag;
|
||||
|
||||
WorkerBackupStatus() : epoch(0), version(invalidVersion) {}
|
||||
WorkerBackupStatus(LogEpoch e, Version v, Tag t) : epoch(e), version(v), tag(t) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, epoch, version, tag);
|
||||
}
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -502,9 +502,9 @@ const Key backupProgressKeyFor(UID workerID) {
|
|||
return wr.toValue();
|
||||
}
|
||||
|
||||
const Value backupProgressValue(LogEpoch epoch, Version version) {
|
||||
const Value backupProgressValue(const WorkerBackupStatus& status) {
|
||||
BinaryWriter wr(IncludeVersion());
|
||||
wr << epoch << version;
|
||||
wr << status;
|
||||
return wr.toValue();
|
||||
}
|
||||
|
||||
|
@ -515,9 +515,11 @@ UID decodeBackupProgressKey(const KeyRef& key) {
|
|||
return serverID;
|
||||
}
|
||||
|
||||
void decodeBackupProgressValue(const ValueRef& value, LogEpoch& epoch, Version& version) {
|
||||
WorkerBackupStatus decodeBackupProgressValue(const ValueRef& value) {
|
||||
WorkerBackupStatus status;
|
||||
BinaryReader reader(value, IncludeVersion());
|
||||
reader >> epoch >> version;
|
||||
reader >> status;
|
||||
return status;
|
||||
}
|
||||
|
||||
const KeyRef coordinatorsKey = LiteralStringRef("\xff/coordinators");
|
||||
|
|
|
@ -174,13 +174,13 @@ const Value workerListValue( ProcessData const& );
|
|||
Key decodeWorkerListKey( KeyRef const& );
|
||||
ProcessData decodeWorkerListValue( ValueRef const& );
|
||||
|
||||
// "\xff/backupProgress/[[workerID]]" := "[[RecoveryCount, Version]]"
|
||||
// "\xff/backupProgress/[[workerID]]" := "[[WorkerBackupStatus]]"
|
||||
extern const KeyRangeRef backupProgressKeys;
|
||||
extern const KeyRef backupProgressPrefix;
|
||||
const Key backupProgressKeyFor(UID workerID);
|
||||
const Value backupProgressValue(LogEpoch epoch, Version version);
|
||||
const Value backupProgressValue(const WorkerBackupStatus& status);
|
||||
UID decodeBackupProgressKey(const KeyRef& key);
|
||||
void decodeBackupProgressValue(const ValueRef& value, LogEpoch& epoch, Version& version);
|
||||
WorkerBackupStatus decodeBackupProgressValue(const ValueRef& value);
|
||||
|
||||
extern const KeyRef coordinatorsKey;
|
||||
extern const KeyRef logsKey;
|
||||
|
|
|
@ -32,8 +32,9 @@ struct BackupData {
|
|||
const UID myId;
|
||||
const Tag tag; // LogRouter tag for this worker, i.e., (-2, i)
|
||||
const Version startVersion;
|
||||
Version endVersion; // mutable in a new epoch, i.e., end version for this epoch is known.
|
||||
const LogEpoch epoch;
|
||||
const Version endVersion;
|
||||
const LogEpoch recruitedEpoch;
|
||||
const LogEpoch backupEpoch;
|
||||
Version minKnownCommittedVersion;
|
||||
Version poppedVersion;
|
||||
AsyncVar<Reference<ILogSystem>> logSystem;
|
||||
|
@ -49,8 +50,8 @@ struct BackupData {
|
|||
explicit BackupData(UID id, Reference<AsyncVar<ServerDBInfo>> db, const InitializeBackupRequest& req)
|
||||
: myId(id), tag(req.routerTag), startVersion(req.startVersion),
|
||||
endVersion(req.endVersion.present() ? req.endVersion.get() : std::numeric_limits<Version>::max()),
|
||||
epoch(req.epoch), minKnownCommittedVersion(invalidVersion), poppedVersion(invalidVersion),
|
||||
version(req.startVersion - 1), cc("BackupWorker", id.toString()) {
|
||||
recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch), minKnownCommittedVersion(invalidVersion),
|
||||
poppedVersion(invalidVersion), version(req.startVersion - 1), cc("BackupWorker", id.toString()) {
|
||||
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true);
|
||||
|
||||
specialCounter(cc, "PoppedVersion", [this]() { return this->poppedVersion; });
|
||||
|
@ -69,7 +70,8 @@ ACTOR Future<Void> saveProgress(BackupData* self, Version backupVersion) {
|
|||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
|
||||
tr.set(backupProgressKeyFor(self->myId), backupProgressValue(self->epoch, backupVersion));
|
||||
WorkerBackupStatus status(self->backupEpoch, backupVersion, self->tag);
|
||||
tr.set(backupProgressKeyFor(self->myId), backupProgressValue(status));
|
||||
wait(tr.commit());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
|
@ -107,6 +109,7 @@ ACTOR Future<Void> uploadData(BackupData* self) {
|
|||
}
|
||||
if (self->poppedVersion >= self->endVersion) {
|
||||
self->backupDone.trigger();
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -154,11 +157,10 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {
|
|||
|
||||
ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db, LogEpoch recoveryCount,
|
||||
BackupData* self) {
|
||||
state UID lastMasterID(0,0);
|
||||
loop {
|
||||
bool isDisplaced =
|
||||
(db->get().recoveryCount > recoveryCount && db->get().recoveryState != RecoveryState::UNINITIALIZED);
|
||||
// (db->get().recoveryCount == recoveryCount && db->get().recoveryState == RecoveryState::FULLY_RECOVERED));
|
||||
(db->get().recoveryCount > recoveryCount && db->get().recoveryState != RecoveryState::UNINITIALIZED) ||
|
||||
(db->get().recoveryCount == recoveryCount && db->get().recoveryState == RecoveryState::FULLY_RECOVERED);
|
||||
isDisplaced = isDisplaced && !db->get().logSystemConfig.hasBackupWorker(self->myId);
|
||||
if (isDisplaced) {
|
||||
TraceEvent("BackupWorkerDisplaced", self->myId)
|
||||
|
@ -168,16 +170,6 @@ ACTOR Future<Void> checkRemoved(Reference<AsyncVar<ServerDBInfo>> db, LogEpoch r
|
|||
.detail("RecoveryState", (int)db->get().recoveryState);
|
||||
throw worker_removed();
|
||||
}
|
||||
if (db->get().master.id() != lastMasterID) {
|
||||
lastMasterID = db->get().master.id();
|
||||
const Version endVersion = db->get().logSystemConfig.getEpochEndVersion(recoveryCount);
|
||||
if (endVersion != invalidVersion) {
|
||||
TraceEvent("BackupWorkerSet", self->myId)
|
||||
.detail("Before", self->endVersion)
|
||||
.detail("Now", endVersion - 1);
|
||||
self->endVersion = endVersion - 1;
|
||||
}
|
||||
}
|
||||
wait(db->onChange());
|
||||
}
|
||||
}
|
||||
|
@ -188,11 +180,13 @@ ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest
|
|||
state PromiseStream<Future<Void>> addActor;
|
||||
state Future<Void> error = actorCollection(addActor.getFuture());
|
||||
state Future<Void> dbInfoChange = Void();
|
||||
state Future<Void> onDone = self.backupDone.onTrigger();
|
||||
|
||||
TraceEvent("BackupWorkerStart", interf.id())
|
||||
.detail("Tag", req.routerTag.toString())
|
||||
.detail("StartVersion", req.startVersion)
|
||||
.detail("LogEpoch", req.epoch);
|
||||
.detail("LogEpoch", req.recruitedEpoch)
|
||||
.detail("BackupEpoch", req.backupEpoch);
|
||||
try {
|
||||
addActor.send(pullAsyncData(&self));
|
||||
addActor.send(uploadData(&self));
|
||||
|
@ -208,13 +202,13 @@ ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest
|
|||
TraceEvent("BackupWorkerHalted", interf.id()).detail("ReqID", req.requesterID);
|
||||
break;
|
||||
}
|
||||
when(wait(checkRemoved(db, req.epoch, &self))) {
|
||||
TraceEvent("BackupWorkerRemoved", interf.id());
|
||||
break;
|
||||
}
|
||||
when(wait(self.backupDone.onTrigger())) {
|
||||
when(wait(checkRemoved(db, req.recruitedEpoch, &self))) {}
|
||||
when(wait(onDone)) {
|
||||
// TODO: when backup is done, we don't exit because master would think
|
||||
// this worker failed and start recovery. Should fix the protocol between
|
||||
// this worker and master so that done/exit here doesn't trigger recovery.
|
||||
TraceEvent("BackupWorkerDone", interf.id());
|
||||
break;
|
||||
onDone = Never();
|
||||
}
|
||||
when(wait(error)) {}
|
||||
}
|
||||
|
|
|
@ -2174,15 +2174,15 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
|
|||
if (req.ratekeeperInterf.present()) {
|
||||
if((self->recruitingRatekeeperID.present() && self->recruitingRatekeeperID.get() != req.ratekeeperInterf.get().id()) ||
|
||||
self->clusterControllerDcId != w.locality.dcId()) {
|
||||
TraceEvent("CCHaltRegisteringRatekeeper", self->id)
|
||||
.detail("RKID", req.ratekeeperInterf.get().id())
|
||||
.detail("DcID", printable(self->clusterControllerDcId))
|
||||
.detail("ReqDcID", printable(w.locality.dcId()))
|
||||
.detail("RecruitingRKID",
|
||||
self->recruitingRatekeeperID.present() ? self->recruitingRatekeeperID.get() : UID());
|
||||
TraceEvent("CCHaltRegisteringRatekeeper", self->id)
|
||||
.detail("RKID", req.ratekeeperInterf.get().id())
|
||||
.detail("DcID", printable(self->clusterControllerDcId))
|
||||
.detail("ReqDcID", printable(w.locality.dcId()))
|
||||
.detail("RecruitingRKID",
|
||||
self->recruitingRatekeeperID.present() ? self->recruitingRatekeeperID.get() : UID());
|
||||
self->id_worker[w.locality.processId()].haltRatekeeper = brokenPromiseToNever(
|
||||
req.ratekeeperInterf.get().haltRatekeeper.getReply(HaltRatekeeperRequest(self->id)));
|
||||
} else if(!self->recruitingRatekeeperID.present()) {
|
||||
} else if (!self->recruitingRatekeeperID.present()) {
|
||||
const RatekeeperInterface& rki = req.ratekeeperInterf.get();
|
||||
const auto& ratekeeper = self->db.serverInfo->get().read().ratekeeper;
|
||||
TraceEvent("CCRegisterRatekeeper", self->id).detail("RKID", rki.id());
|
||||
|
|
|
@ -172,7 +172,7 @@ struct DBCoreState {
|
|||
serializer(ar, txsTags);
|
||||
}
|
||||
if (ar.protocolVersion().hasBackupWorker()) {
|
||||
serializer(ar, epoch); // TODO: serialize epoch in higher version?
|
||||
serializer(ar, epoch);
|
||||
}
|
||||
} else if(ar.isDeserializing) {
|
||||
tLogs.push_back(CoreTLogSet());
|
||||
|
|
|
@ -1332,7 +1332,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
|
||||
std::map<LogEpoch, Version> getEpochEndVersions() const override {
|
||||
std::map<LogEpoch, Version> epochEndVersion;
|
||||
for (const auto old : oldLogData) {
|
||||
for (const auto& old : oldLogData) {
|
||||
epochEndVersion[old.epoch] = old.epochEnd;
|
||||
}
|
||||
return epochEndVersion;
|
||||
|
|
|
@ -151,7 +151,8 @@ struct InitializeLogRouterRequest {
|
|||
struct InitializeBackupRequest {
|
||||
constexpr static FileIdentifier file_identifier = 68354279;
|
||||
UID reqId;
|
||||
LogEpoch epoch;
|
||||
LogEpoch recruitedEpoch;
|
||||
LogEpoch backupEpoch;
|
||||
Tag routerTag;
|
||||
Version startVersion;
|
||||
Optional<Version> endVersion;
|
||||
|
@ -162,7 +163,7 @@ struct InitializeBackupRequest {
|
|||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, reqId, epoch, routerTag, startVersion, endVersion, reply);
|
||||
serializer(ar, reqId, recruitedEpoch, backupEpoch, routerTag, startVersion, endVersion, reply);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -1233,25 +1233,39 @@ ACTOR Future<Void> configurationMonitor( Reference<MasterData> self ) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<std::vector<std::tuple<UID, LogEpoch, Version>>> getBackupProgress(Reference<MasterData> self) {
|
||||
void addBackupStatus(std::map<LogEpoch, std::map<Tag, Version>>& progress, const WorkerBackupStatus& status) {
|
||||
auto& it = progress[status.epoch];
|
||||
auto lb = it.lower_bound(status.tag);
|
||||
if (lb != it.end() && status.tag == lb->first) {
|
||||
if (lb->second < status.version) {
|
||||
lb->second = status.version;
|
||||
}
|
||||
} else {
|
||||
it.insert(lb, { status.tag, status.version });
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<std::map<LogEpoch, std::map<Tag, Version>>> getBackupProgress(Reference<MasterData> self) {
|
||||
state Database cx = openDBOnServer(self->dbInfo, TaskPriority::DefaultEndpoint, true, true);
|
||||
state Transaction tr(cx);
|
||||
|
||||
loop {
|
||||
try {
|
||||
state std::vector<std::tuple<UID, LogEpoch, Version>> progress;
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
Standalone<RangeResultRef> results = wait(tr.getRange(backupProgressKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
ASSERT(!results.more && results.size() < CLIENT_KNOBS->TOO_MANY);
|
||||
|
||||
state std::map<LogEpoch, std::map<Tag, Version>> progress;
|
||||
for (auto& it : results) {
|
||||
UID workerID = decodeBackupProgressKey(it.key);
|
||||
LogEpoch recoveryCount;
|
||||
Version v;
|
||||
decodeBackupProgressValue(it.value, recoveryCount, v);
|
||||
progress.emplace_back(workerID, recoveryCount, v);
|
||||
TraceEvent("GotBackupProgress", self->dbgid).detail("W", workerID).detail("Epoch", recoveryCount).detail("Version", v);
|
||||
const UID workerID = decodeBackupProgressKey(it.key);
|
||||
const WorkerBackupStatus status = decodeBackupProgressValue(it.value);
|
||||
addBackupStatus(progress, status);
|
||||
TraceEvent("GotBackupProgress", self->dbgid)
|
||||
.detail("W", workerID)
|
||||
.detail("Epoch", status.epoch)
|
||||
.detail("Version", status.version)
|
||||
.detail("Tag", status.tag.toString());
|
||||
}
|
||||
wait(tr.commit());
|
||||
return progress;
|
||||
|
@ -1261,20 +1275,58 @@ ACTOR Future<std::vector<std::tuple<UID, LogEpoch, Version>>> getBackupProgress(
|
|||
}
|
||||
}
|
||||
|
||||
// for each old epoch:
|
||||
// if savedVersion < epochEnd - 1 = knownCommittedVersion
|
||||
// recover/backup [savedVersion + 1, epochEnd - 1]
|
||||
//
|
||||
// Returns a map of pair<Epoch, EpochEndVersion> : map<tag, savedVersion>, so that
|
||||
// the backup range should be [savedVersion + 1, EpochEndVersion).
|
||||
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> getUnfinishedBackup(
|
||||
const UID dbgid, const std::map<LogEpoch, std::map<Tag, Version>>& progress,
|
||||
const std::map<LogEpoch, Version>& epochEndVersions) {
|
||||
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> toRecruit;
|
||||
for (const auto& it : progress) {
|
||||
const LogEpoch epoch = it.first;
|
||||
auto versionIt = epochEndVersions.find(epoch);
|
||||
if (versionIt != epochEndVersions.end()) {
|
||||
const Version endVersion = versionIt->second;
|
||||
std::map<Tag, Version> tagVersions;
|
||||
for (const auto& tv : it.second) {
|
||||
if (tv.second < endVersion - 1) {
|
||||
tagVersions.insert({ tv.first, tv.second });
|
||||
TraceEvent("BW", dbgid)
|
||||
.detail("OldEpoch", epoch)
|
||||
.detail("Tag", tv.first.toString())
|
||||
.detail("Version", tv.second)
|
||||
.detail("EpochEndVersion", endVersion);
|
||||
}
|
||||
}
|
||||
if (!tagVersions.empty()) {
|
||||
toRecruit[{ epoch, endVersion }] = tagVersions;
|
||||
}
|
||||
} else {
|
||||
TraceEvent("BW", dbgid).detail("SkipEpoch", epoch);
|
||||
}
|
||||
}
|
||||
return toRecruit;
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> recruitBackupWorkers(Reference<MasterData> self) {
|
||||
if (self->backupWorkers.size() == 0) return Void();
|
||||
|
||||
LogEpoch epoch = self->cstate.myDBState.recoveryCount;
|
||||
state Future<std::vector<std::tuple<UID, LogEpoch, Version>>> backupProgress = getBackupProgress(self);
|
||||
state std::map<LogEpoch, Version> epochEndVersion = self->logSystem->getEpochEndVersions();
|
||||
state LogEpoch epoch = self->cstate.myDBState.recoveryCount;
|
||||
state Future<std::map<LogEpoch, std::map<Tag, Version>>> backupProgress = getBackupProgress(self);
|
||||
state std::map<LogEpoch, Version> epochEndVersions = self->logSystem->getEpochEndVersions();
|
||||
|
||||
std::vector<Future<BackupInterface>> initializationReplies;
|
||||
state std::vector<Future<BackupInterface>> initializationReplies;
|
||||
const int logRouterTags = self->logSystem->getLogRouterTags();
|
||||
const Version startVersion = self->logSystem->getStartVersion();
|
||||
for (int i = 0; i < logRouterTags; i++) {
|
||||
state int i = 0;
|
||||
for (; i < logRouterTags; i++) {
|
||||
const auto& worker = self->backupWorkers[i % self->backupWorkers.size()];
|
||||
InitializeBackupRequest req(deterministicRandom()->randomUniqueID());
|
||||
req.epoch = epoch;
|
||||
req.recruitedEpoch = epoch;
|
||||
req.backupEpoch = epoch;
|
||||
req.routerTag = Tag(tagLocalityLogRouter, i);
|
||||
req.startVersion = startVersion;
|
||||
TraceEvent("BackupRecruitment", self->dbgid)
|
||||
|
@ -1287,38 +1339,40 @@ ACTOR static Future<Void> recruitBackupWorkers(Reference<MasterData> self) {
|
|||
master_recovery_failed()));
|
||||
}
|
||||
|
||||
std::map<LogEpoch, std::map<Tag, Version>> progress = wait(backupProgress);
|
||||
std::map<std::pair<LogEpoch, Version>, std::map<Tag, Version>> toRecruit =
|
||||
getUnfinishedBackup(self->dbgid, progress, epochEndVersions);
|
||||
for (const auto& epochData : toRecruit) {
|
||||
const LogEpoch backupEpoch = epochData.first.first;
|
||||
const Version epochEndVersion = epochData.first.second;
|
||||
for (const auto& tagVersion : epochData.second) {
|
||||
const auto& worker = self->backupWorkers[i % self->backupWorkers.size()];
|
||||
i++;
|
||||
InitializeBackupRequest req(deterministicRandom()->randomUniqueID());
|
||||
req.recruitedEpoch = epoch;
|
||||
req.backupEpoch = backupEpoch;
|
||||
req.routerTag = tagVersion.first;
|
||||
req.startVersion = tagVersion.second + 1; // savedVersion + 1
|
||||
req.endVersion = epochEndVersion - 1;
|
||||
TraceEvent("BackupRecruitment", self->dbgid)
|
||||
.detail("WorkerID", worker.id())
|
||||
.detail("Epoch", epoch)
|
||||
.detail("BackupEpoch", backupEpoch)
|
||||
.detail("StartVersion", req.startVersion);
|
||||
initializationReplies.push_back(transformErrors(
|
||||
throwErrorOr(worker.backup.getReplyUnlessFailedFor(req, SERVER_KNOBS->BACKUP_TIMEOUT,
|
||||
SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY)),
|
||||
master_recovery_failed()));
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<BackupInterface> newRecruits = wait(getAll(initializationReplies));
|
||||
std::vector<Reference<AsyncVar<OptionalInterface<BackupInterface>>>> backupWorkers;
|
||||
state std::vector<Reference<AsyncVar<OptionalInterface<BackupInterface>>>> backupWorkers;
|
||||
for (const auto& interf : newRecruits) {
|
||||
backupWorkers.emplace_back(
|
||||
new AsyncVar<OptionalInterface<BackupInterface>>(OptionalInterface<BackupInterface>(interf)));
|
||||
}
|
||||
self->logSystem->setBackupWorkers(backupWorkers);
|
||||
|
||||
// get a map of UID -> recoveryCount, savedVersion
|
||||
// for each old epoch:
|
||||
// if savedVersion >= epochEnd - 1 = knownCommittedVersion
|
||||
// skip or remove this entry
|
||||
// else
|
||||
// recover/backup [savedVersion + 1, epochEnd - 1]
|
||||
// use the old recoveryCount and tag. Make sure old worker stopped.
|
||||
|
||||
state std::vector<std::tuple<UID, LogEpoch, Version>> progress = wait(backupProgress);
|
||||
for (const auto& t : progress) {
|
||||
const UID uid = std::get<0>(t);
|
||||
const LogEpoch epoch = std::get<1>(t);
|
||||
const Version version = std::get<2>(t);
|
||||
auto it = epochEndVersion.find(epoch);
|
||||
if (it != epochEndVersion.end()) {
|
||||
TraceEvent("BW", self->dbgid)
|
||||
.detail("UID", uid)
|
||||
.detail("Epoch", epoch)
|
||||
.detail("Version", version)
|
||||
.detail("BackedupVersion", it->second);
|
||||
} else {
|
||||
TraceEvent("BW", self->dbgid).detail("UID", uid).detail("Epoch", epoch).detail("Version", version);
|
||||
}
|
||||
}
|
||||
TraceEvent("BackupRecruitmentDone", self->dbgid);
|
||||
return Void();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue