Add actors to store halt request futures

Address best fitness in checking better DD or RK.
This commit is contained in:
Jingyu Zhou 2019-03-22 18:00:16 -07:00
parent e8977aeb98
commit 12917d8c7d
1 changed files with 23 additions and 10 deletions

View File

@ -55,6 +55,8 @@ struct WorkerInfo : NonCopyable {
ProcessClass initialClass;
ClusterControllerPriorityInfo priorityInfo;
WorkerDetails details;
Future<Void> haltRatekeeper;
Future<Void> haltDistributor;
WorkerInfo() : gen(-1), reboots(0), lastAvailableTime(now()), priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown) {}
WorkerInfo( Future<Void> watcher, ReplyPromise<RegisterWorkerReply> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, bool degraded ) :
@ -412,6 +414,17 @@ public:
}
}
ProcessClass::Fitness getBestFitnessForRoleInDatacenter(ProcessClass::ClusterRole role) {
ProcessClass::Fitness bestFitness = ProcessClass::NeverAssign;
for (const auto& it : id_worker) {
if (it.second.priorityInfo.isExcluded || it.second.details.interf.locality.dcId() != clusterControllerDcId) {
continue;
}
bestFitness = std::min(bestFitness, it.second.details.processClass.machineClassFitness(role));
}
return bestFitness;
}
WorkerFitnessInfo getWorkerForRoleInDatacenter(Optional<Standalone<StringRef>> const& dcId, ProcessClass::ClusterRole role, ProcessClass::Fitness unacceptableFitness, DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false ) {
std::map<std::pair<ProcessClass::Fitness,int>, vector<WorkerDetails>> fitness_workers;
@ -1378,17 +1391,17 @@ void checkBetterDDOrRK(ClusterControllerData* self) {
auto& masterWorker = self->id_worker[self->masterProcessId.get()];
const ServerDBInfo& db = self->db.serverInfo->get();
auto masterFitnessForRK = masterWorker.details.processClass.machineClassFitness(ProcessClass::RateKeeper);
auto masterFitnessForDD = masterWorker.details.processClass.machineClassFitness(ProcessClass::DataDistributor);
auto bestFitnessForRK = self->getBestFitnessForRoleInDatacenter(ProcessClass::RateKeeper);
auto bestFitnessForDD = self->getBestFitnessForRoleInDatacenter(ProcessClass::DataDistributor);
if (!self->recruitingRatekeeperID.present() && db.ratekeeper.present() && self->id_worker.count(db.ratekeeper.get().locality.processId())) {
auto& rkWorker = self->id_worker[db.ratekeeper.get().locality.processId()];
auto rkFitness = rkWorker.details.processClass.machineClassFitness(ProcessClass::RateKeeper);
if (self->isProxyOrResolver(rkWorker.details.interf.locality.processId()) ||
rkFitness > masterFitnessForRK || rkWorker.priorityInfo.isExcluded) {
rkFitness > bestFitnessForRK || rkWorker.priorityInfo.isExcluded) {
TraceEvent("CC_HaltRK", self->id).detail("RKID", db.ratekeeper.get().id())
.detail("Excluded", rkWorker.priorityInfo.isExcluded)
.detail("Fitness", rkFitness).detail("MasterFitness", masterFitnessForRK);
.detail("Fitness", rkFitness).detail("BestFitness", bestFitnessForRK);
self->recruitRatekeeper.trigger();
}
}
@ -1397,11 +1410,11 @@ void checkBetterDDOrRK(ClusterControllerData* self) {
auto& ddWorker = self->id_worker[db.distributor.get().locality.processId()];
auto ddFitness = ddWorker.details.processClass.machineClassFitness(ProcessClass::DataDistributor);
if (self->isProxyOrResolver(ddWorker.details.interf.locality.processId()) ||
ddFitness > masterFitnessForDD || ddWorker.priorityInfo.isExcluded) {
ddFitness > bestFitnessForDD || ddWorker.priorityInfo.isExcluded) {
TraceEvent("CC_HaltDD", self->id).detail("DDID", db.distributor.get().id())
.detail("Excluded", ddWorker.priorityInfo.isExcluded)
.detail("Fitness", ddFitness).detail("MasterFitness", masterFitnessForDD);
self->addActor.send(brokenPromiseToNever(db.distributor.get().haltDataDistributor.getReply(HaltDataDistributorRequest(self->id))));
.detail("Fitness", ddFitness).detail("BestFitness", bestFitnessForDD);
self->id_worker[db.distributor.get().locality.processId()].haltDistributor = brokenPromiseToNever(db.distributor.get().haltDataDistributor.getReply(HaltDataDistributorRequest(self->id)));
}
}
}
@ -1882,7 +1895,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
.detail("DcID", printable(self->clusterControllerDcId))
.detail("ReqDcID", printable(req.ratekeeperInterf.get().locality.dcId()))
.detail("RecruitingRKID", self->recruitingRatekeeperID.present() ? self->recruitingRatekeeperID.get() : UID());
self->addActor.send(brokenPromiseToNever(req.ratekeeperInterf.get().haltRatekeeper.getReply(HaltRatekeeperRequest(self->id))));
self->id_worker[req.ratekeeperInterf.get().locality.processId()].haltRatekeeper = brokenPromiseToNever(req.ratekeeperInterf.get().haltRatekeeper.getReply(HaltRatekeeperRequest(self->id)));
} else if(!self->recruitingRatekeeperID.present()) {
const RatekeeperInterface& rki = req.ratekeeperInterf.get();
const auto& ratekeeper = self->db.serverInfo->get().ratekeeper;
@ -1892,7 +1905,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
.detail("DcID", printable(self->clusterControllerDcId))
.detail("ReqDcID", printable(req.ratekeeperInterf.get().locality.dcId()))
.detail("RecruitingRKID", self->recruitingRatekeeperID.present() ? self->recruitingRatekeeperID.get() : UID());
self->addActor.send(brokenPromiseToNever(ratekeeper.get().haltRatekeeper.getReply(HaltRatekeeperRequest(self->id))));
self->id_worker[ratekeeper.get().locality.processId()].haltRatekeeper = brokenPromiseToNever(ratekeeper.get().haltRatekeeper.getReply(HaltRatekeeperRequest(self->id)));
}
if(!ratekeeper.present() || ratekeeper.get().id() != rki.id()) {
self->db.setRatekeeper(rki);
@ -2530,7 +2543,7 @@ ACTOR Future<Void> startRatekeeper(ClusterControllerData *self) {
if (ratekeeper.present() && ratekeeper.get().id() != interf.get().id()) {
TraceEvent("CC_HaltRatekeeper", self->id).detail("RKID", ratekeeper.get().id())
.detail("DcID", printable(self->clusterControllerDcId));
self->addActor.send(brokenPromiseToNever(ratekeeper.get().haltRatekeeper.getReply(HaltRatekeeperRequest(self->id))));
self->id_worker[ratekeeper.get().locality.processId()].haltRatekeeper = brokenPromiseToNever(ratekeeper.get().haltRatekeeper.getReply(HaltRatekeeperRequest(self->id)));
}
if(!ratekeeper.present() || ratekeeper.get().id() != interf.get().id()) {
self->db.setRatekeeper(interf.get());