Remove old backup workers when done

For backup workers working on old epochs, once their work is done, they will
notify the master. Then the master removes them from the log system and
acknowledge back to the backup workers so that they can gracefully shut down.

The popping of a backup worker is stalled if there are workers from older
epochs still working. Otherwise, workers from old epochs will lost data.

However, allowing newer epoch to start backup can cause holes in version ranges.
The restore process must verify the backup progress to make sure there are no
holes, otherwise it has to wait.
This commit is contained in:
Jingyu Zhou 2019-08-14 17:00:20 -07:00
parent 85c4a4e422
commit 0c08161d8e
6 changed files with 105 additions and 30 deletions

View File

@ -59,6 +59,16 @@ struct BackupData {
}
void pop() {
const LogEpoch oldest = logSystem.get()->getOldestBackupEpoch();
if (backupEpoch > oldest) {
// Defer pop if old epoch hasn't finished popping yet.
TraceEvent("BackupWorkerPopDeferred", myId)
.suppressFor(1.0)
.detail("BackupEpoch", backupEpoch)
.detail("OldestEpoch", oldest)
.detail("Version", savedVersion);
return;
}
const Tag popTag = logSystem.get()->getPseudoPopTag(tag, ProcessClass::BackupClass);
logSystem.get()->pop(savedVersion, popTag);
}
@ -223,24 +233,22 @@ ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest
when(wait(dbInfoChange)) {
dbInfoChange = db->onChange();
Reference<ILogSystem> ls = ILogSystem::fromServerDBInfo(self.myId, db->get(), true);
if (ls && ls->hasPseudoLocality(tagLocalityBackup)) {
bool hasPseudoLocality = ls.isValid() && ls->hasPseudoLocality(tagLocalityBackup);
if (hasPseudoLocality) {
self.logSystem.set(ls);
self.pop();
TraceEvent("BackupWorkerLogSystem", interf.id())
.detail("HasBackupLocality", true)
.detail("Tag", self.tag.toString());
} else {
TraceEvent("BackupWorkerLogSystem", interf.id())
.detail("HasBackupLocality", false)
.detail("Tag", self.tag.toString());
}
TraceEvent("BackupWorkerLogSystem", interf.id())
.detail("HasBackupLocality", hasPseudoLocality)
.detail("Tag", self.tag.toString());
}
when(wait(onDone)) {
// TODO: when backup is done, we don't exit because master would think
// this worker failed and start recovery. Should fix the protocol between
// this worker and master so that done/exit here doesn't trigger recovery.
TraceEvent("BackupWorkerDone", interf.id());
onDone = Never();
TraceEvent("BackupWorkerDone", interf.id()).detail("BackupEpoch", self.backupEpoch);
// Notify master so that this worker can be removed from log system, then this
// worker (for an old epoch's unfinished work) can safely exit.
wait(brokenPromiseToNever(db->get().master.notifyBackupWorkerDone.getReply(
BackupWorkerDoneRequest(self.myId, self.endVersion))));
break;
}
when(wait(error)) {}
}

View File

@ -763,7 +763,13 @@ struct ILogSystem {
virtual Version popPseudoLocalityTag(Tag tag, Version upTo) = 0;
virtual void setBackupWorkers(
std::vector<Reference<AsyncVar<OptionalInterface<BackupInterface>>>> backupWorkers) = 0;
const std::vector<Reference<AsyncVar<OptionalInterface<BackupInterface>>>>& backupWorkers) = 0;
// Removes a finished backup worker from log system and returns true. Returns false
// if the worker is not found.
virtual bool removeBackupWorker(const BackupWorkerDoneRequest& req) = 0;
virtual LogEpoch getOldestBackupEpoch() const = 0;
};
struct LengthPrefixedStringRef {

View File

@ -230,9 +230,11 @@ struct LogSystemConfig {
Optional<Version> recoveredAt;
std::set<int8_t> pseudoLocalities;
LogEpoch epoch;
LogEpoch oldestBackupEpoch;
LogSystemConfig(LogEpoch e = 0)
: logSystemType(LogSystemType::empty), logRouterTags(0), txsTags(0), expectedLogSets(0), stopped(false), epoch(e) {}
: logSystemType(LogSystemType::empty), logRouterTags(0), txsTags(0), expectedLogSets(0), stopped(false), epoch(e),
oldestBackupEpoch(e) {}
std::string toString() const {
return format("type: %d oldGenerations: %d tags: %d %s", logSystemType, oldTLogs.size(), logRouterTags, describe(tLogs).c_str());
@ -371,9 +373,9 @@ struct LogSystemConfig {
bool isEqual(LogSystemConfig const& r) const {
return logSystemType == r.logSystemType && tLogs == r.tLogs && oldTLogs == r.oldTLogs &&
expectedLogSets == r.expectedLogSets && logRouterTags == r.logRouterTags &&
txsTags == r.txsTags && recruitmentID == r.recruitmentID && stopped == r.stopped &&
recoveredAt == r.recoveredAt && pseudoLocalities == r.pseudoLocalities && epoch == r.epoch;
expectedLogSets == r.expectedLogSets && logRouterTags == r.logRouterTags && txsTags == r.txsTags &&
recruitmentID == r.recruitmentID && stopped == r.stopped && recoveredAt == r.recoveredAt &&
pseudoLocalities == r.pseudoLocalities && epoch == r.epoch && oldestBackupEpoch == r.oldestBackupEpoch;
}
bool isEqualIds(LogSystemConfig const& r) const {
@ -462,7 +464,7 @@ struct LogSystemConfig {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, logSystemType, tLogs, logRouterTags, oldTLogs, expectedLogSets, recruitmentID, stopped,
recoveredAt, pseudoLocalities, txsTags, epoch);
recoveredAt, pseudoLocalities, txsTags, epoch, oldestBackupEpoch);
}
};

View File

@ -37,6 +37,7 @@ struct MasterInterface {
RequestStream< struct TLogRejoinRequest > tlogRejoin; // sent by tlog (whether or not rebooted) to communicate with a new master
RequestStream< struct ChangeCoordinatorsRequest > changeCoordinators;
RequestStream< struct GetCommitVersionRequest > getCommitVersion;
RequestStream<struct BackupWorkerDoneRequest> notifyBackupWorkerDone;
NetworkAddress address() const { return changeCoordinators.getEndpoint().getPrimaryAddress(); }
@ -44,9 +45,9 @@ struct MasterInterface {
template <class Archive>
void serialize(Archive& ar) {
if constexpr (!is_fb_function<Archive>) {
ASSERT( ar.protocolVersion().isValid() );
}
serializer(ar, locality, waitFailure, tlogRejoin, changeCoordinators, getCommitVersion);
ASSERT(ar.protocolVersion().isValid());
}
serializer(ar, locality, waitFailure, tlogRejoin, changeCoordinators, getCommitVersion, notifyBackupWorkerDone);
}
void initEndpoints() {
@ -157,6 +158,21 @@ struct GetCommitVersionRequest {
}
};
struct BackupWorkerDoneRequest {
constexpr static FileIdentifier file_identifier = 8736351;
UID workerUID;
LogEpoch backupEpoch;
ReplyPromise<Void> reply;
BackupWorkerDoneRequest() : workerUID(), backupEpoch(-1) {}
BackupWorkerDoneRequest(UID id, LogEpoch epoch) : workerUID(id), backupEpoch(epoch) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, workerUID, backupEpoch, reply);
}
};
struct LifetimeToken {
UID ccID;
int64_t count;

View File

@ -179,6 +179,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
bool stopped;
std::set<int8_t> pseudoLocalities; // Represent special localities that will be mapped to tagLocalityLogRouter
const LogEpoch epoch;
LogEpoch oldestBackupEpoch;
// new members
std::map<Tag, Version> pseudoLocalityPopVersion;
@ -205,9 +206,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
TagPartitionedLogSystem(UID dbgid, LocalityData locality, LogEpoch e,
Optional<PromiseStream<Future<Void>>> addActor = Optional<PromiseStream<Future<Void>>>())
: dbgid(dbgid), logSystemType(LogSystemType::empty), expectedLogSets(0), logRouterTags(0), txsTags(0),
repopulateRegionAntiQuorum(0), epoch(e), recoveryCompleteWrittenToCoreState(false), locality(locality),
remoteLogsWrittenToCoreState(false), hasRemoteServers(false), stopped(false), addActor(addActor),
popActors(false) {}
repopulateRegionAntiQuorum(0), epoch(e), oldestBackupEpoch(e), recoveryCompleteWrittenToCoreState(false),
locality(locality), remoteLogsWrittenToCoreState(false), hasRemoteServers(false), stopped(false),
addActor(addActor), popActors(false) {}
virtual void stopRejoins() {
rejoins = Future<Void>();
@ -1230,7 +1231,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return newEpoch( Reference<TagPartitionedLogSystem>::addRef(this), recr, fRemoteWorkers, config, recoveryCount, primaryLocality, remoteLocality, allTags, recruitmentStalled );
}
virtual LogSystemConfig getLogSystemConfig() {
LogSystemConfig getLogSystemConfig() override {
LogSystemConfig logSystemConfig(epoch);
logSystemConfig.logSystemType = logSystemType;
logSystemConfig.expectedLogSets = expectedLogSets;
@ -1240,6 +1241,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
logSystemConfig.stopped = stopped;
logSystemConfig.recoveredAt = recoveredAt;
logSystemConfig.pseudoLocalities = pseudoLocalities;
logSystemConfig.oldestBackupEpoch = oldestBackupEpoch;
for (const Reference<LogSet>& logSet : tLogs) {
if (logSet->isLocal || remoteLogsWrittenToCoreState) {
logSystemConfig.tLogs.emplace_back(*logSet);
@ -1249,9 +1251,6 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(!recoveryCompleteWrittenToCoreState.get()) {
for (const auto& oldData : oldLogData) {
logSystemConfig.oldTLogs.emplace_back(oldData);
//TraceEvent("BWGetLSConf")
// .detail("Epoch", logSystemConfig.oldTLogs.back().epoch)
// .detail("Version", logSystemConfig.oldTLogs.back().epochEnd);
}
}
return logSystemConfig;
@ -1363,15 +1362,18 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
return Reference<LogSet>(nullptr);
}
void setBackupWorkers(std::vector<Reference<AsyncVar<OptionalInterface<BackupInterface>>>> backupWorkers) override {
void setBackupWorkers(
const std::vector<Reference<AsyncVar<OptionalInterface<BackupInterface>>>>& backupWorkers) override {
ASSERT(tLogs.size() > 0);
Reference<LogSet> logset = tLogs[0]; // Master recruits this epoch's worker first.
LogEpoch logsetEpoch = this->epoch;
oldestBackupEpoch = this->epoch;
for (const auto& worker : backupWorkers) {
if (worker->get().interf().backupEpoch != logsetEpoch) {
// find the logset from oldLogData
logsetEpoch = worker->get().interf().backupEpoch;
oldestBackupEpoch = std::min(oldestBackupEpoch, logsetEpoch);
logset = getEpochLogSet(logsetEpoch);
ASSERT(logset.isValid());
}
@ -1380,6 +1382,40 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
backupWorkerChanged.trigger();
}
bool removeBackupWorker(const BackupWorkerDoneRequest& req) override {
bool removed = false;
Reference<LogSet> logset = getEpochLogSet(req.backupEpoch);
std::string msg("BackupWorkerNotFound");
if (logset.isValid()) {
for (auto it = logset->backupWorkers.begin(); it != logset->backupWorkers.end(); it++) {
if (it->getPtr()->get().interf().id() == req.workerUID) {
msg = "BackupWorkerRemoved";
logset->backupWorkers.erase(it);
removed = true;
break;
}
}
}
if (removed) {
oldestBackupEpoch = epoch;
for (const auto& old : oldLogData) {
if (old.epoch < oldestBackupEpoch && old.tLogs[0]->backupWorkers.size() > 0) {
oldestBackupEpoch = old.epoch;
}
}
backupWorkerChanged.trigger();
}
TraceEvent(msg.c_str(), dbgid)
.detail("BackupEpoch", req.backupEpoch)
.detail("WorkerID", req.workerUID)
.detail("OldestBackupEpoch", oldestBackupEpoch);
return removed;
}
LogEpoch getOldestBackupEpoch() const override { return oldestBackupEpoch; }
ACTOR static Future<Void> monitorLog(Reference<AsyncVar<OptionalInterface<TLogInterface>>> logServer, Reference<AsyncVar<bool>> failed) {
state Future<Void> waitFailure;
loop {

View File

@ -29,6 +29,7 @@
#include "fdbserver/DataDistribution.actor.h"
#include "fdbserver/Knobs.h"
#include <iterator>
#include "fdbserver/MasterInterface.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/ClusterRecruitmentInterface.h"
@ -1626,6 +1627,12 @@ ACTOR Future<Void> masterServer( MasterInterface mi, Reference<AsyncVar<ServerDB
throw worker_removed();
}
}
when(BackupWorkerDoneRequest req = waitNext(mi.notifyBackupWorkerDone.getFuture())) {
if (self->logSystem->removeBackupWorker(req)) {
self->registrationTrigger.trigger();
}
req.reply.send(Void());
}
when (wait(collection) ) { ASSERT(false); throw internal_error(); }
}
} catch (Error& e) {