Fix wait failure bug on cluster controller

The setDistributor() sets an AsyncVar and then runs waitFailureClient. This
ordering is wrong because the AsyncVar::set triggers the other loop to run
first, which will wait on Never(). The correct code should wait on the Future
returned by the waitFailureClient.
This commit is contained in:
Jingyu Zhou 2019-01-29 10:14:11 -08:00 committed by Jingyu Zhou
parent 00f2253229
commit 7897616164
4 changed files with 13 additions and 10 deletions

View File

@ -116,12 +116,12 @@ public:
ServerDBInfo newInfo = serverInfo->get();
newInfo.id = g_random->randomUniqueID();
newInfo.distributor = distributorInterf;
serverInfo->set( newInfo );
if ( distributorInterf.isValid() ) {
distributorFailed = waitFailureClient( distributorInterf.waitFailure, SERVER_KNOBS->DD_FAILURE_TIME );
} else {
distributorFailed = Never();
}
serverInfo->set( newInfo );
}
};
@ -2296,14 +2296,14 @@ ACTOR Future<DataDistributorInterface> startDataDistributor( ClusterControllerDa
state WorkerFitnessInfo data_distributor = self->getWorkerForRoleInDatacenter(dcId, ProcessClass::DataDistributor, ProcessClass::NeverAssign, self->db.config, id_used);
state InitializeDataDistributorRequest req;
req.reqId = g_random->randomUniqueID();
TraceEvent("DataDistributorReqID", req.reqId).detail("Recruit", data_distributor.worker.first.address());
TraceEvent("ClusterController_DataDistributorReqID", req.reqId).detail("Recruit", data_distributor.worker.first.address());
ErrorOr<DataDistributorInterface> distributor = wait( data_distributor.worker.first.dataDistributor.getReplyUnlessFailedFor(req, 1, 0) );
if (distributor.present()) {
TraceEvent("DataDistributorReqID", req.reqId).detail("Recruited", data_distributor.worker.first.address());
TraceEvent("ClusterController_DataDistributorReqID", req.reqId).detail("Recruited", data_distributor.worker.first.address());
return distributor.get();
}
TraceEvent("DataDistributorReqID", req.reqId)
TraceEvent("ClusterController_DataDistributorReqID", req.reqId)
.detail("RecruitFailed", data_distributor.worker.first.address())
.error(distributor.getError());
}
@ -2315,14 +2315,17 @@ ACTOR Future<Void> waitDDRejoinOrStartDD( ClusterControllerData *self, ClusterCo
// wait for a while to see if existing data distributor will join.
loop choose {
when ( DataDistributorRejoinRequest req = waitNext( clusterInterface->dataDistributorRejoin.getFuture() ) ) {
TraceEvent("ClusterController", self->id).detail("DataDistributorRejoinID", req.dataDistributor.id());
TraceEvent("ClusterController_Rejoin", self->id).detail("DataDistributorID", req.dataDistributor.id());
self->db.setDistributor( req.dataDistributor );
req.reply.send( Void() );
break;
}
when ( wait( delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) ) ) { break; }
when ( wait( self->db.serverInfo->onChange() ) ) { // Rejoins via worker registration
if ( self->db.serverInfo->get().distributor.isValid() ) break;
if ( self->db.serverInfo->get().distributor.isValid() ) {
TraceEvent("ClusterController_InfoChange", self->id).detail("DataDistributorID", self->db.serverInfo->get().distributor.id());
break;
}
}
}

View File

@ -3413,7 +3413,7 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
TraceEvent("DataDistributor", di.id()).detail("IncomingID", distributor.id()).detail("Valid", distributor.isValid());
if ( distributor.isValid() && distributor.id() != di.id() ) {
TraceEvent("DataDistributorExit", di.id()).detail("CurrentLiveID", distributor.id());
break;
// break;
}
}
when ( wait( trigger ) ) {
@ -3434,10 +3434,10 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
}
catch ( Error &err ) {
if ( normalDataDistributorErrors().count(err.code()) == 0 ) {
TraceEvent("DataDistributorError", di.id()).error(err);
TraceEvent("DataDistributorError", di.id()).error(err, true);
throw err;
}
TraceEvent("DataDistributorDied", di.id()).error(err);
TraceEvent("DataDistributorDied", di.id()).error(err, true);
}
return Void();

View File

@ -1909,7 +1909,6 @@ ACTOR Future<Void> updateLogSystem(TLogData* self, Reference<LogData> logData, L
ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, LocalityData locality ) {
state TLogInterface recruited(self->dbgid, locality);
recruited.locality = locality;
recruited.initEndpoints();
DUMPTOKEN( recruited.peekMessages );

View File

@ -290,6 +290,7 @@ ACTOR Future<Void> registrationClient(
asyncPriorityInfo->set( reply.priorityInfo );
}
when ( wait( ccInterface->onChange() )) { }
when ( wait( ddInterf->onChange() ) ) {}
}
}
}