when the cluster controller is changing itself to a better dc fitness, it should notify itself first so another process does not take over

This commit is contained in:
Evan Tschannen 2018-06-29 00:10:29 -07:00
parent 899f880ce0
commit e9ac8a1039
1 changed files with 58 additions and 23 deletions

View File

@ -979,8 +979,8 @@ public:
Optional<Standalone<StringRef>> clusterControllerProcessId;
Optional<Standalone<StringRef>> clusterControllerDcId;
AsyncVar<Optional<vector<Optional<Key>>>> desiredDcIds; //desired DC priorities
AsyncVar<Optional<vector<Optional<Key>>>> changingDcIds; //current DC priorities for everyone other than the cluster controller process
AsyncVar<Optional<vector<Optional<Key>>>> changedDcIds; //current DC priority for the cluster controller process
AsyncVar<std::pair<bool,Optional<vector<Optional<Key>>>>> changingDcIds; //current DC priorities to change first, and whether that is the cluster controller
AsyncVar<std::pair<bool,Optional<vector<Optional<Key>>>>> changedDcIds; //current DC priorities to change second, and whether the cluster controller has been changed
UID id;
std::vector<RecruitFromConfigurationRequest> outstandingRecruitmentRequests;
std::vector<RecruitRemoteFromConfigurationRequest> outstandingRemoteRecruitmentRequests;
@ -1627,11 +1627,21 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
if ( w.address() == g_network->getLocalAddress() ) {
self->clusterControllerProcessId = w.locality.processId();
self->clusterControllerDcId = w.locality.dcId();
if(self->changedDcIds.get().present()) {
newPriorityInfo.dcFitness = ClusterControllerPriorityInfo::calculateDCFitness( w.locality.dcId(), self->changedDcIds.get().get() );
if(self->changingDcIds.get().first) {
if(self->changingDcIds.get().second.present()) {
newPriorityInfo.dcFitness = ClusterControllerPriorityInfo::calculateDCFitness( w.locality.dcId(), self->changingDcIds.get().second.get() );
}
} else if(self->changedDcIds.get().second.present()) {
newPriorityInfo.dcFitness = ClusterControllerPriorityInfo::calculateDCFitness( w.locality.dcId(), self->changedDcIds.get().second.get() );
}
} else {
if(!self->changingDcIds.get().first) {
if(self->changingDcIds.get().second.present()) {
newPriorityInfo.dcFitness = ClusterControllerPriorityInfo::calculateDCFitness( w.locality.dcId(), self->changingDcIds.get().second.get() );
}
} else if(self->changedDcIds.get().second.present()) {
newPriorityInfo.dcFitness = ClusterControllerPriorityInfo::calculateDCFitness( w.locality.dcId(), self->changedDcIds.get().second.get() );
}
} else if(self->changingDcIds.get().present()) {
newPriorityInfo.dcFitness = ClusterControllerPriorityInfo::calculateDCFitness( w.locality.dcId(), self->changingDcIds.get().get() );
}
// Check process class and exclusive property
@ -1938,19 +1948,32 @@ ACTOR Future<Void> updatedChangingDatacenters(ClusterControllerData *self) {
//do not change the cluster controller until all the processes have had a chance to register
Void _ = wait( delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) );
loop {
self->changingDcIds.set(self->desiredDcIds.get());
if(self->changingDcIds.get().present()) {
for ( auto& it : self->id_worker ) {
uint8_t fitness = ClusterControllerPriorityInfo::calculateDCFitness( it.second.interf.locality.dcId(), self->changingDcIds.get().get() );
if ( it.first != self->clusterControllerProcessId && it.second.priorityInfo.dcFitness != fitness ) {
it.second.priorityInfo.dcFitness = fitness;
if(!it.second.reply.isSet()) {
it.second.reply.send( RegisterWorkerReply( it.second.processClass, it.second.priorityInfo ) );
bool updateCCFirst = false;
if(self->desiredDcIds.get().present()) {
auto& worker = self->id_worker[self->clusterControllerProcessId];
uint8_t newFitness = ClusterControllerPriorityInfo::calculateDCFitness( worker.interf.locality.dcId(), self->desiredDcIds.get().get() );
updateCCFirst = worker.priorityInfo.dcFitness > newFitness;
self->changingDcIds.set(std::make_pair(updateCCFirst,self->desiredDcIds.get()));
if( !updateCCFirst ) {
for ( auto& it : self->id_worker ) {
uint8_t fitness = ClusterControllerPriorityInfo::calculateDCFitness( it.second.interf.locality.dcId(), self->changingDcIds.get().second.get() );
if ( it.first != self->clusterControllerProcessId && it.second.priorityInfo.dcFitness != fitness ) {
it.second.priorityInfo.dcFitness = fitness;
if(!it.second.reply.isSet()) {
it.second.reply.send( RegisterWorkerReply( it.second.processClass, it.second.priorityInfo ) );
}
}
}
} else if ( worker.priorityInfo.dcFitness != newFitness ) {
worker.priorityInfo.dcFitness = newFitness;
if(!worker.reply.isSet()) {
worker.reply.send( RegisterWorkerReply( worker.processClass, worker.priorityInfo ) );
}
}
} else {
self->changingDcIds.set(std::make_pair(updateCCFirst,self->desiredDcIds.get()));
}
Void _ = wait(self->desiredDcIds.onChange());
}
}
@ -1962,14 +1985,26 @@ ACTOR Future<Void> updatedChangedDatacenters(ClusterControllerData *self) {
when( Void _ = wait(self->changingDcIds.onChange()) ) { changeDelay = delay(SERVER_KNOBS->CC_CHANGE_DELAY); }
when( Void _ = wait(changeDelay) ) {
changeDelay = Never();
self->changedDcIds.set(self->changingDcIds.get());
if(self->changedDcIds.get().present()) {
auto& worker = self->id_worker[self->clusterControllerProcessId];
uint8_t fitness = ClusterControllerPriorityInfo::calculateDCFitness( worker.interf.locality.dcId(), self->changedDcIds.get().get() );
if ( worker.priorityInfo.dcFitness != fitness ) {
worker.priorityInfo.dcFitness = fitness;
if(!worker.reply.isSet()) {
worker.reply.send( RegisterWorkerReply( worker.processClass, worker.priorityInfo ) );
self->changedDcIds.set(self->changingDcIds.get());
if(self->changedDcIds.get().second.present()) {
if( self->changedDcIds.get().first ) {
for ( auto& it : self->id_worker ) {
uint8_t fitness = ClusterControllerPriorityInfo::calculateDCFitness( it.second.interf.locality.dcId(), self->changedDcIds.get().second.get() );
if ( it.first != self->clusterControllerProcessId && it.second.priorityInfo.dcFitness != fitness ) {
it.second.priorityInfo.dcFitness = fitness;
if(!it.second.reply.isSet()) {
it.second.reply.send( RegisterWorkerReply( it.second.processClass, it.second.priorityInfo ) );
}
}
}
} else {
auto& worker = self->id_worker[self->clusterControllerProcessId];
uint8_t newFitness = ClusterControllerPriorityInfo::calculateDCFitness( worker.interf.locality.dcId(), self->changedDcIds.get().second.get() );
if( worker.priorityInfo.dcFitness != newFitness ) {
worker.priorityInfo.dcFitness = newFitness;
if(!worker.reply.isSet()) {
worker.reply.send( RegisterWorkerReply( worker.processClass, worker.priorityInfo ) );
}
}
}
}