Avoid unnecessary recuriting of DD or RK
While waiting for recruting data distributor or ratekeeper, a previous one could already joined. So we can skip this unnecessary recruiting. Revert the change of worker.actor.cpp for ratekeeper. Instead, recruiting ratekeeper should avoid the process with an existing one. This fixes a bug where the ratekeeper interface became zombie, killing other healthy ratekeeper but doing no useful work. Found by: -r simulation --crash -f tests/fast/WriteDuringRead.txt -s 31858110 -b on
This commit is contained in:
parent
299961aecb
commit
da338c3ad6
|
@ -1860,7 +1860,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
|
|||
TraceEvent("CC_RegisterRatekeeper", self->id).detail("RKID", rki.id());
|
||||
if (ratekeeper.present() && ratekeeper.get().id() != rki.id()) {
|
||||
TraceEvent("CC_HaltRatekeeper", self->id).detail("RKID", ratekeeper.get().id());
|
||||
ratekeeper.get().haltRatekeeper.send(HaltRatekeeperRequest(self->id));
|
||||
self->addActor.send(brokenPromiseToNever(ratekeeper.get().haltRatekeeper.getReply(HaltRatekeeperRequest(self->id))));
|
||||
}
|
||||
if (self->recruitingRatekeeperID.present()) {
|
||||
self->recruitingRatekeeperID = Optional<UID>();
|
||||
|
@ -1868,7 +1868,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
|
|||
self->db.setRatekeeper(rki);
|
||||
} else {
|
||||
TraceEvent("CC_HaltRatekeeper", self->id).detail("RKID", req.ratekeeperInterf.get().id());
|
||||
req.ratekeeperInterf.get().haltRatekeeper.send(HaltRatekeeperRequest(self->id));
|
||||
self->addActor.send(brokenPromiseToNever(req.ratekeeperInterf.get().haltRatekeeper.getReply(HaltRatekeeperRequest(self->id))));
|
||||
}
|
||||
}
|
||||
if( info == self->id_worker.end() ) {
|
||||
|
@ -2427,9 +2427,13 @@ ACTOR Future<DataDistributorInterface> startDataDistributor( ClusterControllerDa
|
|||
|
||||
loop {
|
||||
try {
|
||||
state bool no_distributor = !self->db.serverInfo->get().distributor.present();
|
||||
while (!self->masterProcessId.present() || self->masterProcessId != self->db.serverInfo->get().master.locality.processId() || self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
|
||||
wait(self->db.serverInfo->onChange() || delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY));
|
||||
}
|
||||
if (no_distributor && self->db.serverInfo->get().distributor.present()) {
|
||||
return self->db.serverInfo->get().distributor.get();
|
||||
}
|
||||
|
||||
std::map<Optional<Standalone<StringRef>>, int> id_used = self->getUsedIds();
|
||||
WorkerFitnessInfo data_distributor = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId, ProcessClass::DataDistributor, ProcessClass::NeverAssign, self->db.config, id_used);
|
||||
|
@ -2491,9 +2495,14 @@ ACTOR Future<Void> startRatekeeper(ClusterControllerData *self) {
|
|||
|
||||
loop {
|
||||
try {
|
||||
state bool no_ratekeeper = !self->db.serverInfo->get().ratekeeper.present();
|
||||
while (!self->masterProcessId.present() || self->masterProcessId != self->db.serverInfo->get().master.locality.processId() || self->db.serverInfo->get().recoveryState < RecoveryState::ACCEPTING_COMMITS) {
|
||||
wait(self->db.serverInfo->onChange() || delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY));
|
||||
}
|
||||
if (no_ratekeeper && self->db.serverInfo->get().ratekeeper.present()) {
|
||||
// Existing ratekeeper registers while waiting, so skip.
|
||||
return Void();
|
||||
}
|
||||
|
||||
std::map<Optional<Standalone<StringRef>>, int> id_used = self->getUsedIds();
|
||||
WorkerFitnessInfo rkWorker = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId, ProcessClass::RateKeeper, ProcessClass::NeverAssign, self->db.config, id_used);
|
||||
|
@ -2502,6 +2511,9 @@ ACTOR Future<Void> startRatekeeper(ClusterControllerData *self) {
|
|||
if (self->onMasterIsBetter(worker, ProcessClass::RateKeeper)) {
|
||||
worker = self->id_worker[self->masterProcessId.get()].details;
|
||||
}
|
||||
if (self->db.serverInfo->get().ratekeeper.present() && self->db.serverInfo->get().ratekeeper.get().locality.processId() == worker.interf.locality.processId()) {
|
||||
throw no_more_servers(); // Avoid recruiting an existing one.
|
||||
}
|
||||
self->recruitingRatekeeperID = req.reqId;
|
||||
TraceEvent("ClusterController_RecruitRatekeeper", self->id).detail("Addr", worker.interf.address()).detail("RKID", req.reqId);
|
||||
|
||||
|
@ -2563,7 +2575,7 @@ ACTOR Future<Void> monitorRatekeeper(ClusterControllerData *self) {
|
|||
}
|
||||
when ( wait(ratekeeperFailed) ) {
|
||||
ratekeeperFailed = Never();
|
||||
TraceEvent("CC_RateKeeperDied", self->id)
|
||||
TraceEvent("CC_RatekeeperDied", self->id)
|
||||
.detail("RKID", self->db.serverInfo->get().ratekeeper.get().id());
|
||||
self->db.clearInterf(ProcessClass::RateKeeperClass);
|
||||
}
|
||||
|
|
|
@ -597,6 +597,7 @@ ACTOR Future<Void> monitorServerDBInfo( Reference<AsyncVar<Optional<ClusterContr
|
|||
choose {
|
||||
when( ServerDBInfo ni = wait( ccInterface->get().present() ? brokenPromiseToNever( ccInterface->get().get().getServerDBInfo.getReply( req ) ) : Never() ) ) {
|
||||
TraceEvent("GotServerDBInfoChange").detail("ChangeID", ni.id).detail("MasterID", ni.master.id())
|
||||
.detail("RatekeeperID", ni.ratekeeper.present() ? ni.ratekeeper.get().id() : UID())
|
||||
.detail("DataDistributorID", ni.distributor.present() ? ni.distributor.get().id() : UID());
|
||||
ServerDBInfo localInfo = ni;
|
||||
localInfo.myLocality = locality;
|
||||
|
@ -850,6 +851,7 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
|
|||
recruited.initEndpoints();
|
||||
|
||||
if (rkInterf->get().present()) {
|
||||
recruited = rkInterf->get().get();
|
||||
TEST(true); // Recruited while already a ratekeeper.
|
||||
} else {
|
||||
startRole(Role::RATE_KEEPER, recruited.id(), interf.id());
|
||||
|
@ -859,8 +861,8 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
|
|||
|
||||
Future<Void> ratekeeper = rateKeeper( recruited, dbInfo );
|
||||
errorForwarders.add( forwardError( errors, Role::RATE_KEEPER, recruited.id(), setWhenDoneOrError( ratekeeper, rkInterf, Optional<RatekeeperInterface>() ) ) );
|
||||
rkInterf->set(Optional<RatekeeperInterface>(recruited));
|
||||
}
|
||||
rkInterf->set(Optional<RatekeeperInterface>(recruited));
|
||||
TraceEvent("Ratekeeper_InitRequest", req.reqId).detail("RatekeeperId", recruited.id());
|
||||
req.reply.send(recruited);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue