Merge pull request #2241 from etschannen/feature-recruitment-cleanup

Fixed a few small issues with recruitment logic on the cluster controller
This commit is contained in:
Evan Tschannen 2019-10-16 16:25:42 -07:00 committed by GitHub
commit a85f69c62f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 89 additions and 43 deletions

View File

@ -599,7 +599,8 @@ Regions are configured in FoundationDB as a json document. For example::
"datacenters":[{
"id":"WC1",
"priority":1,
"satellite":1
"satellite":1,
"satellite_logs":2
}],
"satellite_redundancy_mode":"one_satellite_double",
"satellite_logs":2
@ -659,7 +660,8 @@ This is the region configuration that implements the example::
},{
"id":"WC2",
"priority":0,
"satellite":1
"satellite":1,
"satellite_logs":2
}],
"satellite_redundancy_mode":"one_satellite_double"
},{

View File

@ -498,7 +498,8 @@
"datacenters":[{
"id":"mr",
"priority":1,
"satellite":1
"satellite":1,
"satellite_logs":2
}],
"satellite_redundancy_mode":{
"$enum":[

View File

@ -73,6 +73,7 @@ void parse( std::vector<RegionInfo>* regions, ValueRef const& v ) {
s.get("id", idStr);
satInfo.dcId = idStr;
s.get("priority", satInfo.priority);
s.tryGet("satellite_logs", satInfo.satelliteDesiredTLogCount);
info.satellites.push_back(satInfo);
} else {
if (foundNonSatelliteDatacenter) throw invalid_option();
@ -365,6 +366,9 @@ StatusArray DatabaseConfiguration::getRegionJSON() const {
satObj["id"] = s.dcId.toString();
satObj["priority"] = s.priority;
satObj["satellite"] = 1;
if(s.satelliteDesiredTLogCount != -1) {
satObj["satellite_logs"] = s.satelliteDesiredTLogCount;
}
dcArr.push_back(satObj);
}

View File

@ -32,6 +32,7 @@
struct SatelliteInfo {
Key dcId;
int32_t priority;
int32_t satelliteDesiredTLogCount = -1;
SatelliteInfo() : priority(0) {}
@ -41,7 +42,7 @@ struct SatelliteInfo {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, dcId, priority);
serializer(ar, dcId, priority, satelliteDesiredTLogCount);
}
};

View File

@ -67,7 +67,7 @@ std::map<std::string, std::string> configForToken( std::string const& mode ) {
std::string key = mode.substr(0, pos);
std::string value = mode.substr(pos+1);
if( (key == "logs" || key == "proxies" || key == "resolvers" || key == "remote_logs" || key == "log_routers" || key == "satellite_logs" || key == "usable_regions" || key == "repopulate_anti_quorum") && isInteger(value) ) {
if( (key == "logs" || key == "proxies" || key == "resolvers" || key == "remote_logs" || key == "log_routers" || key == "usable_regions" || key == "repopulate_anti_quorum") && isInteger(value) ) {
out[p+key] = value;
}

View File

@ -522,7 +522,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"datacenters":[{
"id":"mr",
"priority":1,
"satellite":1
"satellite":1,
"satellite_logs":2
}],
"satellite_redundancy_mode":{
"$enum":[
@ -732,7 +733,8 @@ const KeyRef JSONSchemas::clusterConfigurationSchema = LiteralStringRef(R"config
"datacenters":[{
"id":"mr",
"priority":1,
"satellite":1
"satellite":1,
"satellite_logs":2
}],
"satellite_redundancy_mode":{
"$enum":[

View File

@ -398,8 +398,14 @@ public:
try {
bool remoteDCUsedAsSatellite = false;
std::set<Optional<Key>> satelliteDCs;
int32_t desiredSatelliteTLogs = 0;
for(int s = startDC; s < std::min<int>(startDC + (satelliteFallback ? region.satelliteTLogUsableDcsFallback : region.satelliteTLogUsableDcs), region.satellites.size()); s++) {
satelliteDCs.insert(region.satellites[s].dcId);
if(region.satellites[s].satelliteDesiredTLogCount == -1 || desiredSatelliteTLogs == -1) {
desiredSatelliteTLogs = -1;
} else {
desiredSatelliteTLogs += region.satellites[s].satelliteDesiredTLogCount;
}
if (region.satellites[s].dcId == remoteRegion.dcId) {
remoteDCUsedAsSatellite = true;
}
@ -413,9 +419,9 @@ public:
std::transform(remoteLogs.begin(), remoteLogs.end(), std::back_inserter(exclusionWorkerIds), [](const WorkerDetails &in) { return in.interf.id(); });
}
if(satelliteFallback) {
return getWorkersForTlogs( conf, region.satelliteTLogReplicationFactorFallback, conf.getDesiredSatelliteLogs(region.dcId)*region.satelliteTLogUsableDcsFallback/region.satelliteTLogUsableDcs, region.satelliteTLogPolicyFallback, id_used, checkStable, satelliteDCs, exclusionWorkerIds);
return getWorkersForTlogs( conf, region.satelliteTLogReplicationFactorFallback, desiredSatelliteTLogs>0 ? desiredSatelliteTLogs : conf.getDesiredSatelliteLogs(region.dcId)*region.satelliteTLogUsableDcsFallback/region.satelliteTLogUsableDcs, region.satelliteTLogPolicyFallback, id_used, checkStable, satelliteDCs, exclusionWorkerIds);
} else {
return getWorkersForTlogs( conf, region.satelliteTLogReplicationFactor, conf.getDesiredSatelliteLogs(region.dcId), region.satelliteTLogPolicy, id_used, checkStable, satelliteDCs, exclusionWorkerIds);
return getWorkersForTlogs( conf, region.satelliteTLogReplicationFactor, desiredSatelliteTLogs>0 ? desiredSatelliteTLogs : conf.getDesiredSatelliteLogs(region.dcId), region.satelliteTLogPolicy, id_used, checkStable, satelliteDCs, exclusionWorkerIds);
}
} catch (Error &e) {
if(e.code() != error_code_no_more_servers) {
@ -462,7 +468,7 @@ public:
deterministicRandom()->randomShuffle(w);
for( int i=0; i < w.size(); i++ ) {
id_used[w[i].interf.locality.processId()]++;
return WorkerFitnessInfo(w[i], it.first.first, it.first.second);
return WorkerFitnessInfo(w[i], std::max(ProcessClass::GoodFit, it.first.first), it.first.second);
}
}
}
@ -518,18 +524,8 @@ public:
RoleFitness() : bestFit(ProcessClass::NeverAssign), worstFit(ProcessClass::NeverAssign), role(ProcessClass::NoRole), count(0), worstIsDegraded(false) {}
RoleFitness(RoleFitness first, RoleFitness second, ProcessClass::ClusterRole role) : bestFit(std::min(first.worstFit, second.worstFit)), worstFit(std::max(first.worstFit, second.worstFit)), count(first.count + second.count), role(role) {
if(first.worstFit > second.worstFit) {
worstIsDegraded = first.worstIsDegraded;
} else if(second.worstFit > first.worstFit) {
worstIsDegraded = second.worstIsDegraded;
} else {
worstIsDegraded = first.worstIsDegraded || second.worstIsDegraded;
}
}
RoleFitness( vector<WorkerDetails> workers, ProcessClass::ClusterRole role ) : role(role) {
worstFit = ProcessClass::BestFit;
worstFit = ProcessClass::GoodFit;
worstIsDegraded = false;
bestFit = ProcessClass::NeverAssign;
for(auto& it : workers) {
@ -776,7 +772,7 @@ public:
auto datacenters = getDatacenters( req.configuration );
RoleFitness bestFitness;
std::pair<RoleFitness,RoleFitness> bestFitness;
int numEquivalent = 1;
Optional<Key> bestDC;
@ -793,7 +789,7 @@ public:
proxies.push_back(first_proxy.worker);
resolvers.push_back(first_resolver.worker);
auto fitness = RoleFitness( RoleFitness(proxies, ProcessClass::Proxy), RoleFitness(resolvers, ProcessClass::Resolver), ProcessClass::NoRole );
auto fitness = std::make_pair( RoleFitness(proxies, ProcessClass::Proxy), RoleFitness(resolvers, ProcessClass::Resolver) );
if(dcId == clusterControllerDcId) {
bestFitness = fitness;
@ -839,7 +835,8 @@ public:
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
( RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs(), ProcessClass::TLog).betterCount(RoleFitness(tlogs, ProcessClass::TLog)) ||
RoleFitness(std::min(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS), std::max(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS), req.configuration.getDesiredProxies()+req.configuration.getDesiredResolvers(), ProcessClass::NoRole).betterCount(bestFitness) ) ) {
RoleFitness(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, req.configuration.getDesiredProxies(), ProcessClass::Proxy).betterCount(bestFitness.first) ||
RoleFitness(SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredResolvers(), ProcessClass::Resolver).betterCount(bestFitness.second) ) ) {
throw operation_failed();
}
@ -985,10 +982,14 @@ public:
std::map< Optional<Standalone<StringRef>>, int> id_used;
id_used[clusterControllerProcessId]++;
WorkerFitnessInfo mworker = getWorkerForRoleInDatacenter(clusterControllerDcId, ProcessClass::Master, ProcessClass::NeverAssign, db.config, id_used, true);
auto newMasterFit = mworker.worker.processClass.machineClassFitness( ProcessClass::Master );
if(db.config.isExcludedServer(mworker.worker.interf.address())) {
newMasterFit = std::max(newMasterFit, ProcessClass::ExcludeFit);
}
if ( oldMasterFit < mworker.fitness )
if ( oldMasterFit < newMasterFit )
return false;
if ( oldMasterFit > mworker.fitness || ( dbi.master.locality.processId() == clusterControllerProcessId && mworker.worker.interf.locality.processId() != clusterControllerProcessId ) )
if ( oldMasterFit > newMasterFit || ( dbi.master.locality.processId() == clusterControllerProcessId && mworker.worker.interf.locality.processId() != clusterControllerProcessId ) )
return true;
std::set<Optional<Key>> primaryDC;
@ -1018,6 +1019,7 @@ public:
if(oldTLogFit < newTLogFit) return false;
bool oldSatelliteFallback = false;
for(auto& logSet : dbi.logSystemConfig.tLogs) {
if(logSet.isLocal && logSet.locality == tagLocalitySatellite) {
oldSatelliteFallback = logSet.tLogPolicy->info() != region.satelliteTLogPolicy->info();
@ -1031,11 +1033,42 @@ public:
auto newSatelliteTLogs = region.satelliteTLogReplicationFactor > 0 ? getWorkersForSatelliteLogs(db.config, region, remoteRegion, id_used, newSatelliteFallback, true) : satellite_tlogs;
RoleFitness newSatelliteTLogFit(newSatelliteTLogs, ProcessClass::TLog);
if(oldSatelliteTLogFit < newSatelliteTLogFit)
return false;
std::map<Optional<Key>,int32_t> satellite_priority;
for(auto& r : region.satellites) {
satellite_priority[r.dcId] = r.priority;
}
int32_t oldSatelliteRegionFit = std::numeric_limits<int32_t>::max();
for(auto& it : satellite_tlogs) {
if(satellite_priority.count(it.interf.locality.dcId())) {
oldSatelliteRegionFit = std::min(oldSatelliteRegionFit, satellite_priority[it.interf.locality.dcId()]);
} else {
oldSatelliteRegionFit = -1;
}
}
int32_t newSatelliteRegionFit = std::numeric_limits<int32_t>::max();
for(auto& it : newSatelliteTLogs) {
if(satellite_priority.count(it.interf.locality.dcId())) {
newSatelliteRegionFit = std::min(newSatelliteRegionFit, satellite_priority[it.interf.locality.dcId()]);
} else {
newSatelliteRegionFit = -1;
}
}
if(oldSatelliteFallback && !newSatelliteFallback)
return true;
if(!oldSatelliteFallback && newSatelliteFallback)
return false;
if(oldSatelliteRegionFit < newSatelliteRegionFit)
return true;
if(oldSatelliteRegionFit > newSatelliteRegionFit)
return false;
if(oldSatelliteTLogFit < newSatelliteTLogFit)
return false;
RoleFitness oldRemoteTLogFit(remote_tlogs, ProcessClass::TLog);
std::vector<UID> exclusionWorkerIds;
auto fn = [](const WorkerDetails &in) { return in.interf.id(); };
@ -1059,7 +1092,7 @@ public:
}
if(oldLogRoutersFit < newLogRoutersFit) return false;
// Check proxy/resolver fitness
RoleFitness oldInFit(RoleFitness(proxyClasses, ProcessClass::Proxy), RoleFitness(resolverClasses, ProcessClass::Resolver), ProcessClass::NoRole);
std::pair<RoleFitness,RoleFitness> oldInFit = std::make_pair(RoleFitness(proxyClasses, ProcessClass::Proxy), RoleFitness(resolverClasses, ProcessClass::Resolver));
auto first_resolver = getWorkerForRoleInDatacenter( clusterControllerDcId, ProcessClass::Resolver, ProcessClass::ExcludeFit, db.config, id_used, true );
auto first_proxy = getWorkerForRoleInDatacenter( clusterControllerDcId, ProcessClass::Proxy, ProcessClass::ExcludeFit, db.config, id_used, true );
@ -1069,12 +1102,13 @@ public:
proxies.push_back(first_proxy.worker);
resolvers.push_back(first_resolver.worker);
RoleFitness newInFit(RoleFitness(proxies, ProcessClass::Proxy), RoleFitness(resolvers, ProcessClass::Resolver), ProcessClass::NoRole);
if(oldInFit.betterFitness(newInFit)) return false;
std::pair<RoleFitness,RoleFitness> newInFit = std::make_pair(RoleFitness(proxies, ProcessClass::Proxy), RoleFitness(resolvers, ProcessClass::Resolver));
if(oldInFit.first.betterFitness(newInFit.first) || oldInFit.second.betterFitness(newInFit.second)) return false;
if(oldTLogFit > newTLogFit || oldInFit > newInFit || (oldSatelliteFallback && !newSatelliteFallback) || oldSatelliteTLogFit > newSatelliteTLogFit || oldRemoteTLogFit > newRemoteTLogFit || oldLogRoutersFit > newLogRoutersFit) {
TraceEvent("BetterMasterExists", id).detail("OldMasterFit", oldMasterFit).detail("NewMasterFit", mworker.fitness)
TraceEvent("BetterMasterExists", id).detail("OldMasterFit", oldMasterFit).detail("NewMasterFit", newMasterFit)
.detail("OldTLogFit", oldTLogFit.toString()).detail("NewTLogFit", newTLogFit.toString())
.detail("OldInFit", oldInFit.toString()).detail("NewInFit", newInFit.toString())
.detail("OldProxyFit", oldInFit.first.toString()).detail("NewProxyFit", newInFit.first.toString())
.detail("OldResolverFit", oldInFit.second.toString()).detail("NewResolverFit", newInFit.second.toString())
.detail("OldSatelliteFit", oldSatelliteTLogFit.toString()).detail("NewSatelliteFit", newSatelliteTLogFit.toString())
.detail("OldRemoteFit", oldRemoteTLogFit.toString()).detail("NewRemoteFit", newRemoteTLogFit.toString())
.detail("OldRouterFit", oldLogRoutersFit.toString()).detail("NewRouterFit", newLogRoutersFit.toString())

View File

@ -945,11 +945,8 @@ void SimulationConfig::generateNormalConfig(int minimumReplication, int minimumR
}
}
if (deterministicRandom()->random01() < 0.25) {
int logs = deterministicRandom()->randomInt(1,7);
primaryObj["satellite_logs"] = logs;
remoteObj["satellite_logs"] = logs;
}
if (deterministicRandom()->random01() < 0.25) primaryObj["satellite_logs"] = deterministicRandom()->randomInt(1,7);
if (deterministicRandom()->random01() < 0.25) remoteObj["satellite_logs"] = deterministicRandom()->randomInt(1,7);
//We cannot run with a remote DC when MAX_READ_TRANSACTION_LIFE_VERSIONS is too small, because the log routers will not be able to keep up.
if (minimumRegions <= 1 && (deterministicRandom()->random01() < 0.25 || SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS < SERVER_KNOBS->VERSIONS_PER_SECOND)) {
@ -998,12 +995,14 @@ void SimulationConfig::generateNormalConfig(int minimumReplication, int minimumR
primarySatelliteObj["id"] = useNormalDCsAsSatellites ? "1" : "2";
primarySatelliteObj["priority"] = 1;
primarySatelliteObj["satellite"] = 1;
if (deterministicRandom()->random01() < 0.25) primarySatelliteObj["satellite_logs"] = deterministicRandom()->randomInt(1,7);
primaryDcArr.push_back(primarySatelliteObj);
StatusObject remoteSatelliteObj;
remoteSatelliteObj["id"] = useNormalDCsAsSatellites ? "0" : "3";
remoteSatelliteObj["priority"] = 1;
remoteSatelliteObj["satellite"] = 1;
if (deterministicRandom()->random01() < 0.25) remoteSatelliteObj["satellite_logs"] = deterministicRandom()->randomInt(1,7);
remoteDcArr.push_back(remoteSatelliteObj);
if (datacenters > 4) {
@ -1011,12 +1010,14 @@ void SimulationConfig::generateNormalConfig(int minimumReplication, int minimumR
primarySatelliteObjB["id"] = useNormalDCsAsSatellites ? "2" : "4";
primarySatelliteObjB["priority"] = 1;
primarySatelliteObjB["satellite"] = 1;
if (deterministicRandom()->random01() < 0.25) primarySatelliteObjB["satellite_logs"] = deterministicRandom()->randomInt(1,7);
primaryDcArr.push_back(primarySatelliteObjB);
StatusObject remoteSatelliteObjB;
remoteSatelliteObjB["id"] = useNormalDCsAsSatellites ? "2" : "5";
remoteSatelliteObjB["priority"] = 1;
remoteSatelliteObjB["satellite"] = 1;
if (deterministicRandom()->random01() < 0.25) remoteSatelliteObjB["satellite_logs"] = deterministicRandom()->randomInt(1,7);
remoteDcArr.push_back(remoteSatelliteObjB);
}
if (useNormalDCsAsSatellites) {

View File

@ -75,12 +75,14 @@ std::string generateRegions() {
primarySatelliteObj["id"] = "2";
primarySatelliteObj["priority"] = 1;
primarySatelliteObj["satellite"] = 1;
if (deterministicRandom()->random01() < 0.25) primarySatelliteObj["satellite_logs"] = deterministicRandom()->randomInt(1,7);
primaryDcArr.push_back(primarySatelliteObj);
StatusObject remoteSatelliteObj;
remoteSatelliteObj["id"] = "3";
remoteSatelliteObj["priority"] = 1;
remoteSatelliteObj["satellite"] = 1;
if (deterministicRandom()->random01() < 0.25) remoteSatelliteObj["satellite_logs"] = deterministicRandom()->randomInt(1,7);
remoteDcArr.push_back(remoteSatelliteObj);
if(g_simulator.physicalDatacenters > 5 && deterministicRandom()->random01() < 0.5) {
@ -88,12 +90,14 @@ std::string generateRegions() {
primarySatelliteObjB["id"] = "4";
primarySatelliteObjB["priority"] = 1;
primarySatelliteObjB["satellite"] = 1;
if (deterministicRandom()->random01() < 0.25) primarySatelliteObjB["satellite_logs"] = deterministicRandom()->randomInt(1,7);
primaryDcArr.push_back(primarySatelliteObjB);
StatusObject remoteSatelliteObjB;
remoteSatelliteObjB["id"] = "5";
remoteSatelliteObjB["priority"] = 1;
remoteSatelliteObjB["satellite"] = 1;
if (deterministicRandom()->random01() < 0.25) remoteSatelliteObjB["satellite_logs"] = deterministicRandom()->randomInt(1,7);
remoteDcArr.push_back(remoteSatelliteObjB);
int satellite_replication_type = deterministicRandom()->randomInt(0,3);
@ -146,11 +150,8 @@ std::string generateRegions() {
}
}
if (deterministicRandom()->random01() < 0.25) {
int logs = deterministicRandom()->randomInt(1,7);
primaryObj["satellite_logs"] = logs;
remoteObj["satellite_logs"] = logs;
}
if (deterministicRandom()->random01() < 0.25) primaryObj["satellite_logs"] = deterministicRandom()->randomInt(1,7);
if (deterministicRandom()->random01() < 0.25) remoteObj["satellite_logs"] = deterministicRandom()->randomInt(1,7);
int remote_replication_type = deterministicRandom()->randomInt(0, 4);
switch (remote_replication_type) {