Move ratekeeper or data distributor from excluded servers

This commit is contained in:
Jingyu Zhou 2019-03-21 17:07:12 -07:00
parent 48324ad4be
commit 299961aecb
1 changed files with 15 additions and 13 deletions

View File

@ -1385,8 +1385,9 @@ void checkBetterDDOrRK(ClusterControllerData* self) {
auto& rkWorker = self->id_worker[db.ratekeeper.get().locality.processId()];
auto rkFitness = rkWorker.details.processClass.machineClassFitness(ProcessClass::RateKeeper);
if (self->isProxyOrResolver(rkWorker.details.interf.locality.processId()) ||
rkFitness > masterFitnessForRK) {
rkFitness > masterFitnessForRK || rkWorker.priorityInfo.isExcluded) {
TraceEvent("CC_HaltRK", self->id).detail("RKID", db.ratekeeper.get().id())
.detail("Excluded", rkWorker.priorityInfo.isExcluded)
.detail("Fitness", rkFitness).detail("MasterFitness", masterFitnessForRK);
self->recruitRatekeeper.trigger();
}
@ -1396,8 +1397,9 @@ void checkBetterDDOrRK(ClusterControllerData* self) {
auto& ddWorker = self->id_worker[db.distributor.get().locality.processId()];
auto ddFitness = ddWorker.details.processClass.machineClassFitness(ProcessClass::DataDistributor);
if (self->isProxyOrResolver(ddWorker.details.interf.locality.processId()) ||
ddFitness > masterFitnessForDD) {
ddFitness > masterFitnessForDD || ddWorker.priorityInfo.isExcluded) {
TraceEvent("CC_HaltDD", self->id).detail("DDID", db.distributor.get().id())
.detail("Excluded", ddWorker.priorityInfo.isExcluded)
.detail("Fitness", ddFitness).detail("MasterFitness", masterFitnessForDD);
self->addActor.send(brokenPromiseToNever(db.distributor.get().haltDataDistributor.getReply(HaltDataDistributorRequest(self->id))));
}
@ -1847,20 +1849,20 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
self->clusterControllerDcId == req.distributorInterf.get().locality.dcId() &&
!self->recruitingDistributor) {
const DataDistributorInterface& di = req.distributorInterf.get();
TraceEvent("ClusterController_RegisterDataDistributor", self->id).detail("DDID", di.id());
TraceEvent("CC_RegisterDataDistributor", self->id).detail("DDID", di.id());
self->db.setDistributor(di);
}
if (req.ratekeeperInterf.present()) {
if (self->clusterControllerDcId == req.ratekeeperInterf.get().locality.dcId() &&
(!self->recruitingRatekeeperID.present() || self->recruitingRatekeeperID.get() == req.ratekeeperInterf.get().id())) {
const RatekeeperInterface& rki = req.ratekeeperInterf.get();
const auto& ratekeeper = self->db.serverInfo->get().ratekeeper;
TraceEvent("CC_RegisterRatekeeper", self->id).detail("RKID", rki.id());
if (self->db.serverInfo->get().ratekeeper.present()) {
TraceEvent("CC_HaltRatekeeper", self->id)
.detail("RKID", self->db.serverInfo->get().ratekeeper.get().id());
self->db.serverInfo->get().ratekeeper.get().haltRatekeeper.send(HaltRatekeeperRequest(self->id));
if (ratekeeper.present() && ratekeeper.get().id() != rki.id()) {
TraceEvent("CC_HaltRatekeeper", self->id).detail("RKID", ratekeeper.get().id());
ratekeeper.get().haltRatekeeper.send(HaltRatekeeperRequest(self->id));
}
if (self->recruitingRatekeeperID.present() && self->recruitingRatekeeperID.get() == rki.id()) {
if (self->recruitingRatekeeperID.present()) {
self->recruitingRatekeeperID = Optional<UID>();
}
self->db.setRatekeeper(rki);
@ -2436,16 +2438,16 @@ ACTOR Future<DataDistributorInterface> startDataDistributor( ClusterControllerDa
worker = self->id_worker[self->masterProcessId.get()].details;
}
InitializeDataDistributorRequest req(g_random->randomUniqueID());
TraceEvent("ClusterController_DataDistributorRecruit", self->id).detail("Addr", worker.interf.address());
TraceEvent("CC_DataDistributorRecruit", self->id).detail("Addr", worker.interf.address());
ErrorOr<DataDistributorInterface> distributor = wait( worker.interf.dataDistributor.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_DISTRIBUTOR_JOIN_DELAY, 0) );
if (distributor.present()) {
TraceEvent("ClusterController_DataDistributorRecruited", self->id).detail("Addr", worker.interf.address());
TraceEvent("CC_DataDistributorRecruited", self->id).detail("Addr", worker.interf.address());
return distributor.get();
}
}
catch (Error& e) {
TraceEvent("ClusterController_DataDistributorRecruitError", self->id).error(e);
TraceEvent("CC_DataDistributorRecruitError", self->id).error(e);
if ( e.code() != error_code_no_more_servers ) {
throw;
}
@ -2462,8 +2464,8 @@ ACTOR Future<Void> monitorDataDistributor(ClusterControllerData *self) {
when ( wait(initialDelay) ) { break; }
when ( wait(self->db.serverInfo->onChange()) ) { // Rejoins via worker registration
if ( self->db.serverInfo->get().distributor.present() ) {
TraceEvent("ClusterController_InfoChange", self->id)
.detail("DataDistributorID", self->db.serverInfo->get().distributor.get().id());
TraceEvent("CC_InfoChange", self->id)
.detail("DDID", self->db.serverInfo->get().distributor.get().id());
break;
}
}