Take cluster controller role into consideration when recruiting workers

This commit is contained in:
Yichi Chiang 2017-10-25 10:35:46 -07:00
parent defdc6550d
commit 5fcef911f0
1 changed files with 8 additions and 4 deletions

View File

@ -188,9 +188,10 @@ public:
}
//FIXME: get master in the same datacenter as the proxies and resolvers for ratekeeper, however this is difficult because the master is recruited before we know the cluster's configuration
std::pair<WorkerInterface, ProcessClass> getMasterWorker( DatabaseConfiguration const& conf, bool checkStable = false ) {
std::pair<WorkerInterface, ProcessClass> getMasterWorker( DatabaseConfiguration const& conf, NetworkAddress clusterControllerAddress, bool checkStable = false ) {
ProcessClass::Fitness bestFit = ProcessClass::NeverAssign;
Optional<std::pair<WorkerInterface, ProcessClass>> bestInfo;
bool bestIsClusterController = false;
int numEquivalent = 1;
for( auto& it : id_worker ) {
auto fit = it.second.processClass.machineClassFitness( ProcessClass::Master );
@ -198,10 +199,11 @@ public:
fit = std::max(fit, ProcessClass::WorstFit);
}
if( workerAvailable(it.second, checkStable) && fit != ProcessClass::NeverAssign ) {
if( fit < bestFit ) {
if( fit < bestFit || (fit == bestFit && bestIsClusterController) ) {
bestInfo = std::make_pair(it.second.interf, it.second.processClass);
bestFit = fit;
numEquivalent = 1;
bestIsClusterController = clusterControllerAddress == it.second.interf.address();
}
else if( fit != ProcessClass::NeverAssign && fit == bestFit && g_random->random01() < 1.0/++numEquivalent )
bestInfo = std::make_pair(it.second.interf, it.second.processClass);
@ -632,6 +634,7 @@ public:
result.storageServers.push_back(storageServers[i].first);
}
id_used[db.serverInfo->get().myLocality.processId()]++;
id_used[masterProcessId]++;
auto tlogs = getWorkersForTlogsAcrossDatacenters( req.configuration, id_used );
for(int i = 0; i < tlogs.size(); i++)
@ -697,6 +700,7 @@ public:
if(masterWorker == id_worker.end())
return false;
id_used[db.serverInfo->get().myLocality.processId()]++;
id_used[masterProcessId]++;
ProcessClass::Fitness oldMasterFit = masterWorker->second.processClass.machineClassFitness( ProcessClass::Master );
@ -704,7 +708,7 @@ public:
oldMasterFit = std::max(oldMasterFit, ProcessClass::WorstFit);
}
ProcessClass::Fitness newMasterFit = getMasterWorker(db.config, true).second.machineClassFitness( ProcessClass::Master );
ProcessClass::Fitness newMasterFit = getMasterWorker(db.config, g_network->getLocalAddress(), true).second.machineClassFitness( ProcessClass::Master );
if(dbi.recoveryState < RecoveryState::FULLY_RECOVERED) {
if(oldMasterFit > newMasterFit) {
@ -834,7 +838,7 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
try {
state double recoveryStart = now();
TraceEvent("CCWDB", cluster->id).detail("Recruiting", "Master");
state std::pair<WorkerInterface, ProcessClass> masterWorker = cluster->getMasterWorker(db->config);
state std::pair<WorkerInterface, ProcessClass> masterWorker = cluster->getMasterWorker(db->config, g_network->getLocalAddress());
if( masterWorker.second.machineClassFitness( ProcessClass::Master ) > SERVER_KNOBS->EXPECTED_MASTER_FITNESS && now() - cluster->startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY ) {
TraceEvent("CCWDB", cluster->id).detail("Fitness", masterWorker.second.machineClassFitness( ProcessClass::Master ));
Void _ = wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );