update better machine classes first to give them a higher chance of becoming the next cluster controller

This commit is contained in:
Evan Tschannen 2018-06-29 01:11:59 -07:00
parent e9ac8a1039
commit 7e68bee692
3 changed files with 67 additions and 37 deletions

View File

@ -1948,56 +1948,63 @@ ACTOR Future<Void> updatedChangingDatacenters(ClusterControllerData *self) {
//do not change the cluster controller until all the processes have had a chance to register
Void _ = wait( delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY) );
loop {
bool updateCCFirst = false;
if(self->desiredDcIds.get().present()) {
state Future<Void> onChange = self->desiredDcIds.onChange();
if(!self->desiredDcIds.get().present()) {
self->changingDcIds.set(std::make_pair(false,self->desiredDcIds.get()));
} else {
auto& worker = self->id_worker[self->clusterControllerProcessId];
uint8_t newFitness = ClusterControllerPriorityInfo::calculateDCFitness( worker.interf.locality.dcId(), self->desiredDcIds.get().get() );
updateCCFirst = worker.priorityInfo.dcFitness > newFitness;
self->changingDcIds.set(std::make_pair(updateCCFirst,self->desiredDcIds.get()));
if( !updateCCFirst ) {
for ( auto& it : self->id_worker ) {
uint8_t fitness = ClusterControllerPriorityInfo::calculateDCFitness( it.second.interf.locality.dcId(), self->changingDcIds.get().second.get() );
if ( it.first != self->clusterControllerProcessId && it.second.priorityInfo.dcFitness != fitness ) {
it.second.priorityInfo.dcFitness = fitness;
if(!it.second.reply.isSet()) {
it.second.reply.send( RegisterWorkerReply( it.second.processClass, it.second.priorityInfo ) );
}
}
}
} else if ( worker.priorityInfo.dcFitness != newFitness ) {
self->changingDcIds.set(std::make_pair(worker.priorityInfo.dcFitness > newFitness,self->desiredDcIds.get()));
if ( worker.priorityInfo.dcFitness > newFitness ) {
worker.priorityInfo.dcFitness = newFitness;
if(!worker.reply.isSet()) {
worker.reply.send( RegisterWorkerReply( worker.processClass, worker.priorityInfo ) );
}
}
} else {
self->changingDcIds.set(std::make_pair(updateCCFirst,self->desiredDcIds.get()));
}
Void _ = wait(self->desiredDcIds.onChange());
}
}
ACTOR Future<Void> updatedChangedDatacenters(ClusterControllerData *self) {
state Future<Void> changeDelay = delay(SERVER_KNOBS->CC_CHANGE_DELAY);
loop {
choose {
when( Void _ = wait(self->changingDcIds.onChange()) ) { changeDelay = delay(SERVER_KNOBS->CC_CHANGE_DELAY); }
when( Void _ = wait(changeDelay) ) {
changeDelay = Never();
self->changedDcIds.set(self->changingDcIds.get());
if(self->changedDcIds.get().second.present()) {
if( self->changedDcIds.get().first ) {
for ( auto& it : self->id_worker ) {
uint8_t fitness = ClusterControllerPriorityInfo::calculateDCFitness( it.second.interf.locality.dcId(), self->changedDcIds.get().second.get() );
} else {
state int currentFit = ProcessClass::BestFit;
while(currentFit <= ProcessClass::NeverAssign) {
bool updated = false;
for ( auto& it : self->id_worker ) {
if( ( !it.second.priorityInfo.isExcluded && it.second.priorityInfo.processClassFitness == currentFit ) || currentFit == ProcessClass::NeverAssign ) {
uint8_t fitness = ClusterControllerPriorityInfo::calculateDCFitness( it.second.interf.locality.dcId(), self->changingDcIds.get().second.get() );
if ( it.first != self->clusterControllerProcessId && it.second.priorityInfo.dcFitness != fitness ) {
updated = true;
it.second.priorityInfo.dcFitness = fitness;
if(!it.second.reply.isSet()) {
it.second.reply.send( RegisterWorkerReply( it.second.processClass, it.second.priorityInfo ) );
}
}
}
} else {
}
if(updated && currentFit < ProcessClass::NeverAssign) {
Void _ = wait( delay(SERVER_KNOBS->CC_CLASS_DELAY) );
}
currentFit++;
}
}
}
Void _ = wait(onChange);
}
}
ACTOR Future<Void> updatedChangedDatacenters(ClusterControllerData *self) {
state Future<Void> changeDelay = delay(SERVER_KNOBS->CC_CHANGE_DELAY);
state Future<Void> onChange = self->changingDcIds.onChange();
loop {
choose {
when( Void _ = wait(onChange) ) {
changeDelay = delay(SERVER_KNOBS->CC_CHANGE_DELAY);
onChange = self->changingDcIds.onChange();
}
when( Void _ = wait(changeDelay) ) {
changeDelay = Never();
onChange = self->changingDcIds.onChange();
self->changedDcIds.set(self->changingDcIds.get());
if(self->changedDcIds.get().second.present()) {
if( !self->changedDcIds.get().first ) {
auto& worker = self->id_worker[self->clusterControllerProcessId];
uint8_t newFitness = ClusterControllerPriorityInfo::calculateDCFitness( worker.interf.locality.dcId(), self->changedDcIds.get().second.get() );
if( worker.priorityInfo.dcFitness != newFitness ) {
@ -2006,6 +2013,27 @@ ACTOR Future<Void> updatedChangedDatacenters(ClusterControllerData *self) {
worker.reply.send( RegisterWorkerReply( worker.processClass, worker.priorityInfo ) );
}
}
} else {
state int currentFit = ProcessClass::BestFit;
while(currentFit <= ProcessClass::NeverAssign) {
bool updated = false;
for ( auto& it : self->id_worker ) {
if( ( !it.second.priorityInfo.isExcluded && it.second.priorityInfo.processClassFitness == currentFit ) || currentFit == ProcessClass::NeverAssign ) {
uint8_t fitness = ClusterControllerPriorityInfo::calculateDCFitness( it.second.interf.locality.dcId(), self->changedDcIds.get().second.get() );
if ( it.first != self->clusterControllerProcessId && it.second.priorityInfo.dcFitness != fitness ) {
updated = true;
it.second.priorityInfo.dcFitness = fitness;
if(!it.second.reply.isSet()) {
it.second.reply.send( RegisterWorkerReply( it.second.processClass, it.second.priorityInfo ) );
}
}
}
}
if(updated && currentFit < ProcessClass::NeverAssign) {
Void _ = wait( delay(SERVER_KNOBS->CC_CLASS_DELAY) );
}
currentFit++;
}
}
}
}

View File

@ -270,6 +270,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( SHUTDOWN_TIMEOUT, 600 ); if( randomize && BUGGIFY ) SHUTDOWN_TIMEOUT = 60.0;
init( MASTER_SPIN_DELAY, 1.0 ); if( randomize && BUGGIFY ) MASTER_SPIN_DELAY = 10.0;
init( CC_CHANGE_DELAY, 0.1 );
init( CC_CLASS_DELAY, 0.01 );
init( WAIT_FOR_GOOD_RECRUITMENT_DELAY, 1.0 );
init( WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY, 5.0 );
init( ATTEMPT_RECRUITMENT_DELAY, 0.035 );

View File

@ -211,6 +211,7 @@ public:
double SHUTDOWN_TIMEOUT;
double MASTER_SPIN_DELAY;
double CC_CHANGE_DELAY;
double CC_CLASS_DELAY;
double WAIT_FOR_GOOD_RECRUITMENT_DELAY;
double WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY;
double ATTEMPT_RECRUITMENT_DELAY;