make sure id_worker is updated before setting ratekeeper or data distribution

This commit is contained in:
Evan Tschannen 2019-03-22 17:08:54 -07:00
parent 6a9c9d79cc
commit 82c80c225d
1 changed files with 23 additions and 29 deletions

View File

@ -1845,6 +1845,29 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
}
}
if( info == self->id_worker.end() ) {
self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, newPriorityInfo, req.degraded );
checkOutstandingRequests( self );
} else if( info->second.details.interf.id() != w.id() || req.generation >= info->second.gen ) {
if (!info->second.reply.isSet()) {
info->second.reply.send( Never() );
}
info->second.reply = req.reply;
info->second.details.processClass = newProcessClass;
info->second.priorityInfo = newPriorityInfo;
info->second.initialClass = req.initialClass;
info->second.details.degraded = req.degraded;
info->second.gen = req.generation;
if(info->second.details.interf.id() != w.id()) {
info->second.details.interf = w;
info->second.watcher = workerAvailabilityWatch( w, newProcessClass, self );
}
checkOutstandingRequests( self );
} else {
TEST(true); // Received an old worker registration request.
}
if (req.distributorInterf.present() && !self->db.serverInfo->get().distributor.present() &&
self->clusterControllerDcId == req.distributorInterf.get().locality.dcId() &&
!self->recruitingDistributor) {
@ -1877,35 +1900,6 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
self->addActor.send(brokenPromiseToNever(req.ratekeeperInterf.get().haltRatekeeper.getReply(HaltRatekeeperRequest(self->id))));
}
}
if( info == self->id_worker.end() ) {
self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, newPriorityInfo, req.degraded );
if (!self->masterProcessId.present() && w.locality.processId() == self->db.serverInfo->get().master.locality.processId()) {
self->masterProcessId = w.locality.processId();
}
checkOutstandingRequests( self );
return;
}
if( info->second.details.interf.id() != w.id() || req.generation >= info->second.gen ) {
if (!info->second.reply.isSet()) {
info->second.reply.send( Never() );
}
info->second.reply = req.reply;
info->second.details.processClass = newProcessClass;
info->second.priorityInfo = newPriorityInfo;
info->second.initialClass = req.initialClass;
info->second.details.degraded = req.degraded;
info->second.gen = req.generation;
if(info->second.details.interf.id() != w.id()) {
info->second.details.interf = w;
info->second.watcher = workerAvailabilityWatch( w, newProcessClass, self );
}
checkOutstandingRequests( self );
return;
}
TEST(true); // Received an old worker registration request.
}
#define TIME_KEEPER_VERSION LiteralStringRef("1")