Fix a race during ratekeeper registration
When a ratekeeper registers, the monitorRatekeeper wakes up and recruits a new ratekeeper. Adding a 0s delay to avoid this. If a ratekeeper is recruited on an existing machine, update the interface so that the cluster controller can clear the ratekeeperID.
This commit is contained in:
parent
8edefda193
commit
48324ad4be
|
@ -2540,19 +2540,25 @@ ACTOR Future<Void> monitorRatekeeper(ClusterControllerData *self) {
|
|||
loop {
|
||||
if (self->db.serverInfo->get().ratekeeper.present()) {
|
||||
ratekeeperFailed = waitFailureClient(self->db.serverInfo->get().ratekeeper.get().waitFailure, SERVER_KNOBS->RATEKEEPER_FAILURE_TIME);
|
||||
} else if (!recruitingRatekeeper) {
|
||||
} else if (!recruitingRatekeeper && !self->recruitingRatekeeperID.present()) {
|
||||
// Ratekeeper worker registration may happen after startRatekeeper().
|
||||
// So checking recruitingRatekeeperID to make sure recruiting is actually done.
|
||||
recruitingRatekeeper = true;
|
||||
rkInterf = startRatekeeper(self);
|
||||
}
|
||||
choose {
|
||||
when ( wait(self->recruitRatekeeper.onTrigger()) ) {
|
||||
// Force recruiting even if we have a valid ratekeeper now.
|
||||
if (!recruitingRatekeeper) {
|
||||
if (!recruitingRatekeeper && !self->recruitingRatekeeperID.present()) {
|
||||
recruitingRatekeeper = true;
|
||||
rkInterf = startRatekeeper(self);
|
||||
}
|
||||
}
|
||||
when ( wait(self->db.serverInfo->onChange()) ) {}
|
||||
when ( wait(self->db.serverInfo->onChange()) ) {
|
||||
// When a new ratekeeper registers, this wakes up and attempts to recruit
|
||||
// another ratekeeper. So switch here to allow ratekeeper to be set first.
|
||||
wait(delay(0.001));
|
||||
}
|
||||
when ( wait(ratekeeperFailed) ) {
|
||||
ratekeeperFailed = Never();
|
||||
TraceEvent("CC_RateKeeperDied", self->id)
|
||||
|
|
|
@ -850,7 +850,6 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
|
|||
recruited.initEndpoints();
|
||||
|
||||
if (rkInterf->get().present()) {
|
||||
recruited = rkInterf->get().get();
|
||||
TEST(true); // Recruited while already a ratekeeper.
|
||||
} else {
|
||||
startRole(Role::RATE_KEEPER, recruited.id(), interf.id());
|
||||
|
@ -860,8 +859,8 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
|
|||
|
||||
Future<Void> ratekeeper = rateKeeper( recruited, dbInfo );
|
||||
errorForwarders.add( forwardError( errors, Role::RATE_KEEPER, recruited.id(), setWhenDoneOrError( ratekeeper, rkInterf, Optional<RatekeeperInterface>() ) ) );
|
||||
rkInterf->set(Optional<RatekeeperInterface>(recruited));
|
||||
}
|
||||
rkInterf->set(Optional<RatekeeperInterface>(recruited));
|
||||
TraceEvent("Ratekeeper_InitRequest", req.reqId).detail("RatekeeperId", recruited.id());
|
||||
req.reply.send(recruited);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue