simplified ratekeeper monitoring loop

This commit is contained in:
Evan Tschannen 2019-03-22 18:22:45 -07:00
parent 12917d8c7d
commit ddb6058770
1 changed files with 14 additions and 34 deletions

View File

@ -1129,13 +1129,13 @@ public:
PromiseStream<Future<Void>> addActor;
bool recruitingDistributor;
Optional<UID> recruitingRatekeeperID;
AsyncTrigger recruitRatekeeper;
AsyncVar<bool> recruitRatekeeper;
ClusterControllerData( ClusterControllerFullInterface const& ccInterface, LocalityData const& locality )
: clusterControllerProcessId(locality.processId()), clusterControllerDcId(locality.dcId()),
id(ccInterface.id()), ac(false), outstandingRequestChecker(Void()), gotProcessClasses(false),
gotFullyRecoveredConfig(false), startTime(now()), datacenterVersionDifference(0),
versionDifferenceUpdated(false), recruitingDistributor(false)
versionDifferenceUpdated(false), recruitingDistributor(false), recruitRatekeeper(false)
{
auto serverInfo = db.serverInfo->get();
serverInfo.id = g_random->randomUniqueID();
@ -2531,12 +2531,14 @@ ACTOR Future<Void> startRatekeeper(ClusterControllerData *self) {
if (self->db.serverInfo->get().ratekeeper.present() && self->db.serverInfo->get().ratekeeper.get().locality.processId() == worker.interf.locality.processId()) {
throw no_more_servers(); // Avoid recruiting an existing one.
}
self->recruitingRatekeeperID = req.reqId;
TraceEvent("ClusterController_RecruitRatekeeper", self->id).detail("Addr", worker.interf.address()).detail("RKID", req.reqId);
ErrorOr<RatekeeperInterface> interf = wait( worker.interf.ratekeeper.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_RATEKEEPER_JOIN_DELAY, 0) );
if (interf.present()) {
TraceEvent("ClusterController_RatekeeperRecruited", self->id).detail("Addr", worker.interf.address());
self->recruitRatekeeper.set(false);
self->recruitingRatekeeperID = Optional<UID>();
const auto& ratekeeper = self->db.serverInfo->get().ratekeeper;
TraceEvent("CC_RegisterRatekeeper", self->id).detail("RKID", interf.get().id());
@ -2548,6 +2550,7 @@ ACTOR Future<Void> startRatekeeper(ClusterControllerData *self) {
if(!ratekeeper.present() || ratekeeper.get().id() != interf.get().id()) {
self->db.setRatekeeper(interf.get());
}
checkOutstandingRequests(self);
return Void();
}
}
@ -2576,41 +2579,18 @@ ACTOR Future<Void> monitorRatekeeper(ClusterControllerData *self) {
}
}
state Future<Void> ratekeeperFailed = Never();
state Future<Void> rkInterf = Never();
state bool recruitingRatekeeper = false;
loop {
if (self->db.serverInfo->get().ratekeeper.present()) {
ratekeeperFailed = waitFailureClient(self->db.serverInfo->get().ratekeeper.get().waitFailure, SERVER_KNOBS->RATEKEEPER_FAILURE_TIME);
} else if (!recruitingRatekeeper && !self->recruitingRatekeeperID.present()) {
// Ratekeeper worker registration may happen after startRatekeeper().
// So checking recruitingRatekeeperID to make sure recruiting is actually done.
recruitingRatekeeper = true;
rkInterf = startRatekeeper(self);
}
choose {
when ( wait(self->recruitRatekeeper.onTrigger()) ) {
// Force recruiting even if we have a valid ratekeeper now.
if (!recruitingRatekeeper && !self->recruitingRatekeeperID.present()) {
recruitingRatekeeper = true;
rkInterf = startRatekeeper(self);
if ( self->db.serverInfo->get().ratekeeper.present() && !self->recruitRatekeeper.get() ) {
choose {
when(wait(waitFailureClient( self->db.serverInfo->get().ratekeeper.get().waitFailure, SERVER_KNOBS->RATEKEEPER_FAILURE_TIME ))) {
TraceEvent("ClusterController_RateKeeperDied", self->id)
.detail("RKID", self->db.serverInfo->get().ratekeeper.get().id());
self->db.clearInterf(ProcessClass::RateKeeperClass);
}
when(wait(self->recruitRatekeeper.onChange())) {}
}
when ( wait(self->db.serverInfo->onChange()) ) {
// When a new ratekeeper registers, this wakes up and attempts to recruit
// another ratekeeper. So switch here to allow ratekeeper to be set first.
wait(delay(0.001));
}
when ( wait(ratekeeperFailed) ) {
ratekeeperFailed = Never();
TraceEvent("CC_RatekeeperDied", self->id)
.detail("RKID", self->db.serverInfo->get().ratekeeper.get().id());
self->db.clearInterf(ProcessClass::RateKeeperClass);
}
when ( wait(rkInterf) ) {
rkInterf = Never();
recruitingRatekeeper = false;
}
} else {
wait( startRatekeeper(self) );
}
}
}