changed preferredSharing to be ordered, so that recruitment will always share with the same other role when everything else is equal

This commit is contained in:
Evan Tschannen 2021-04-26 09:57:46 -07:00
parent 9ca2c3b6c4
commit ccfc77f6fb
1 changed files with 93 additions and 75 deletions

View File

@ -1174,15 +1174,15 @@ public:
return bestFitness; return bestFitness;
} }
WorkerFitnessInfo getWorkerForRoleInDatacenter( WorkerFitnessInfo getWorkerForRoleInDatacenter(Optional<Standalone<StringRef>> const& dcId,
Optional<Standalone<StringRef>> const& dcId, ProcessClass::ClusterRole role,
ProcessClass::ClusterRole role, ProcessClass::Fitness unacceptableFitness,
ProcessClass::Fitness unacceptableFitness, DatabaseConfiguration const& conf,
DatabaseConfiguration const& conf, std::map<Optional<Standalone<StringRef>>, int>& id_used,
std::map<Optional<Standalone<StringRef>>, int>& id_used, std::map<Optional<Standalone<StringRef>>, int> preferredSharing =
Optional<Standalone<StringRef>> preferredSharing = Optional<Standalone<StringRef>>(), std::map<Optional<Standalone<StringRef>>, int>(),
bool checkStable = false) { bool checkStable = false) {
std::map<std::tuple<ProcessClass::Fitness, int, bool, bool>, vector<WorkerDetails>> fitness_workers; std::map<std::tuple<ProcessClass::Fitness, int, bool, int>, vector<WorkerDetails>> fitness_workers;
for (auto& it : id_worker) { for (auto& it : id_worker) {
auto fitness = it.second.details.processClass.machineClassFitness(role); auto fitness = it.second.details.processClass.machineClassFitness(role);
@ -1191,10 +1191,11 @@ public:
} }
if (workerAvailable(it.second, checkStable) && fitness < unacceptableFitness && if (workerAvailable(it.second, checkStable) && fitness < unacceptableFitness &&
it.second.details.interf.locality.dcId() == dcId) { it.second.details.interf.locality.dcId() == dcId) {
auto sharing = preferredSharing.find(it.first);
fitness_workers[std::make_tuple(fitness, fitness_workers[std::make_tuple(fitness,
id_used[it.first], id_used[it.first],
isLongLivedStateless(it.first), isLongLivedStateless(it.first),
preferredSharing != it.first)] sharing != preferredSharing.end() ? sharing->second : 1e6)]
.push_back(it.second.details); .push_back(it.second.details);
} }
} }
@ -1216,10 +1217,11 @@ public:
int amount, int amount,
DatabaseConfiguration const& conf, DatabaseConfiguration const& conf,
std::map<Optional<Standalone<StringRef>>, int>& id_used, std::map<Optional<Standalone<StringRef>>, int>& id_used,
std::map<Optional<Standalone<StringRef>>, int> preferredSharing =
std::map<Optional<Standalone<StringRef>>, int>(),
Optional<WorkerFitnessInfo> minWorker = Optional<WorkerFitnessInfo>(), Optional<WorkerFitnessInfo> minWorker = Optional<WorkerFitnessInfo>(),
bool checkStable = false) { bool checkStable = false) {
std::map<std::pair<ProcessClass::Fitness, int>, std::pair<vector<WorkerDetails>, vector<WorkerDetails>>> std::map<std::tuple<ProcessClass::Fitness, int, bool, int>, vector<WorkerDetails>> fitness_workers;
fitness_workers;
vector<WorkerDetails> results; vector<WorkerDetails> results;
if (minWorker.present()) { if (minWorker.present()) {
results.push_back(minWorker.get().worker); results.push_back(minWorker.get().worker);
@ -1237,24 +1239,22 @@ public:
(it.second.details.interf.id() != minWorker.get().worker.interf.id() && (it.second.details.interf.id() != minWorker.get().worker.interf.id() &&
(fitness < minWorker.get().fitness || (fitness < minWorker.get().fitness ||
(fitness == minWorker.get().fitness && id_used[it.first] <= minWorker.get().used))))) { (fitness == minWorker.get().fitness && id_used[it.first] <= minWorker.get().used))))) {
if (isLongLivedStateless(it.first)) { auto sharing = preferredSharing.find(it.first);
fitness_workers[std::make_pair(fitness, id_used[it.first])].second.push_back(it.second.details); fitness_workers[std::make_tuple(fitness,
} else { id_used[it.first],
fitness_workers[std::make_pair(fitness, id_used[it.first])].first.push_back(it.second.details); isLongLivedStateless(it.first),
} sharing != preferredSharing.end() ? sharing->second : 1e6)]
.push_back(it.second.details);
} }
} }
for (auto& it : fitness_workers) { for (auto& it : fitness_workers) {
for (int j = 0; j < 2; j++) { deterministicRandom()->randomShuffle(it.second);
auto& w = j == 0 ? it.second.first : it.second.second; for (int i = 0; i < it.second.size(); i++) {
deterministicRandom()->randomShuffle(w); results.push_back(it.second[i]);
for (int i = 0; i < w.size(); i++) { id_used[it.second[i].interf.locality.processId()]++;
results.push_back(w[i]); if (results.size() == amount)
id_used[w[i].interf.locality.processId()]++; return results;
if (results.size() == amount)
return results;
}
} }
} }
@ -1475,20 +1475,16 @@ public:
} }
} }
std::map<Optional<Standalone<StringRef>>, int> preferredSharing;
auto first_commit_proxy = getWorkerForRoleInDatacenter( auto first_commit_proxy = getWorkerForRoleInDatacenter(
dcId, ProcessClass::CommitProxy, ProcessClass::ExcludeFit, req.configuration, id_used); dcId, ProcessClass::CommitProxy, ProcessClass::ExcludeFit, req.configuration, id_used, preferredSharing);
auto first_grv_proxy = getWorkerForRoleInDatacenter(dcId, preferredSharing[first_commit_proxy.worker.interf.locality.processId()] = 0;
ProcessClass::GrvProxy, auto first_grv_proxy = getWorkerForRoleInDatacenter(
ProcessClass::ExcludeFit, dcId, ProcessClass::GrvProxy, ProcessClass::ExcludeFit, req.configuration, id_used, preferredSharing);
req.configuration, preferredSharing[first_grv_proxy.worker.interf.locality.processId()] = 1;
id_used, auto first_resolver = getWorkerForRoleInDatacenter(
first_commit_proxy.worker.interf.locality.processId()); dcId, ProcessClass::Resolver, ProcessClass::ExcludeFit, req.configuration, id_used, preferredSharing);
auto first_resolver = getWorkerForRoleInDatacenter(dcId, preferredSharing[first_resolver.worker.interf.locality.processId()] = 2;
ProcessClass::Resolver,
ProcessClass::ExcludeFit,
req.configuration,
id_used,
first_commit_proxy.worker.interf.locality.processId());
// If one of the first process recruitments is forced to share a process, allow all of next recruitments // If one of the first process recruitments is forced to share a process, allow all of next recruitments
// to also share a process. // to also share a process.
@ -1502,18 +1498,21 @@ public:
req.configuration.getDesiredCommitProxies(), req.configuration.getDesiredCommitProxies(),
req.configuration, req.configuration,
id_used, id_used,
preferredSharing,
first_commit_proxy); first_commit_proxy);
auto grv_proxies = getWorkersForRoleInDatacenter(dcId, auto grv_proxies = getWorkersForRoleInDatacenter(dcId,
ProcessClass::GrvProxy, ProcessClass::GrvProxy,
req.configuration.getDesiredGrvProxies(), req.configuration.getDesiredGrvProxies(),
req.configuration, req.configuration,
id_used, id_used,
preferredSharing,
first_grv_proxy); first_grv_proxy);
auto resolvers = getWorkersForRoleInDatacenter(dcId, auto resolvers = getWorkersForRoleInDatacenter(dcId,
ProcessClass::Resolver, ProcessClass::Resolver,
req.configuration.getDesiredResolvers(), req.configuration.getDesiredResolvers(),
req.configuration, req.configuration,
id_used, id_used,
preferredSharing,
first_resolver); first_resolver);
for (int i = 0; i < commit_proxies.size(); i++) for (int i = 0; i < commit_proxies.size(); i++)
result.commitProxies.push_back(commit_proxies[i].interf); result.commitProxies.push_back(commit_proxies[i].interf);
@ -1681,22 +1680,28 @@ public:
// SOMEDAY: recruitment in other DCs besides the clusterControllerDcID will not account for the // SOMEDAY: recruitment in other DCs besides the clusterControllerDcID will not account for the
// processes used by the master and cluster controller properly. // processes used by the master and cluster controller properly.
auto used = id_used; auto used = id_used;
auto first_commit_proxy = getWorkerForRoleInDatacenter( std::map<Optional<Standalone<StringRef>>, int> preferredSharing;
dcId, ProcessClass::CommitProxy, ProcessClass::ExcludeFit, req.configuration, used); auto first_commit_proxy = getWorkerForRoleInDatacenter(dcId,
auto first_grv_proxy = ProcessClass::CommitProxy,
getWorkerForRoleInDatacenter(dcId, ProcessClass::ExcludeFit,
ProcessClass::GrvProxy, req.configuration,
ProcessClass::ExcludeFit, used,
req.configuration, preferredSharing);
used, preferredSharing[first_commit_proxy.worker.interf.locality.processId()] = 0;
first_commit_proxy.worker.interf.locality.processId()); auto first_grv_proxy = getWorkerForRoleInDatacenter(dcId,
auto first_resolver = ProcessClass::GrvProxy,
getWorkerForRoleInDatacenter(dcId, ProcessClass::ExcludeFit,
ProcessClass::Resolver, req.configuration,
ProcessClass::ExcludeFit, used,
req.configuration, preferredSharing);
used, preferredSharing[first_grv_proxy.worker.interf.locality.processId()] = 1;
first_commit_proxy.worker.interf.locality.processId()); auto first_resolver = getWorkerForRoleInDatacenter(dcId,
ProcessClass::Resolver,
ProcessClass::ExcludeFit,
req.configuration,
used,
preferredSharing);
preferredSharing[first_resolver.worker.interf.locality.processId()] = 2;
// If one of the first process recruitments is forced to share a process, allow all of next // If one of the first process recruitments is forced to share a process, allow all of next
// recruitments to also share a process. // recruitments to also share a process.
@ -1710,6 +1715,7 @@ public:
req.configuration.getDesiredCommitProxies(), req.configuration.getDesiredCommitProxies(),
req.configuration, req.configuration,
used, used,
preferredSharing,
first_commit_proxy); first_commit_proxy);
auto grv_proxies = getWorkersForRoleInDatacenter(dcId, auto grv_proxies = getWorkersForRoleInDatacenter(dcId,
@ -1717,6 +1723,7 @@ public:
req.configuration.getDesiredGrvProxies(), req.configuration.getDesiredGrvProxies(),
req.configuration, req.configuration,
used, used,
preferredSharing,
first_grv_proxy); first_grv_proxy);
auto resolvers = getWorkersForRoleInDatacenter(dcId, auto resolvers = getWorkersForRoleInDatacenter(dcId,
@ -1724,6 +1731,7 @@ public:
req.configuration.getDesiredResolvers(), req.configuration.getDesiredResolvers(),
req.configuration, req.configuration,
used, used,
preferredSharing,
first_resolver); first_resolver);
auto fitness = std::make_tuple(RoleFitness(commit_proxies, ProcessClass::CommitProxy, used), auto fitness = std::make_tuple(RoleFitness(commit_proxies, ProcessClass::CommitProxy, used),
@ -1829,14 +1837,14 @@ public:
ProcessClass::ExcludeFit, ProcessClass::ExcludeFit,
db.config, db.config,
id_used, id_used,
Optional<Standalone<StringRef>>(), std::map<Optional<Standalone<StringRef>>, int>(),
true); true);
getWorkerForRoleInDatacenter(regions[0].dcId, getWorkerForRoleInDatacenter(regions[0].dcId,
ProcessClass::Master, ProcessClass::Master,
ProcessClass::ExcludeFit, ProcessClass::ExcludeFit,
db.config, db.config,
id_used, id_used,
Optional<Standalone<StringRef>>(), std::map<Optional<Standalone<StringRef>>, int>(),
true); true);
std::set<Optional<Key>> primaryDC; std::set<Optional<Key>> primaryDC;
@ -1858,21 +1866,21 @@ public:
ProcessClass::ExcludeFit, ProcessClass::ExcludeFit,
db.config, db.config,
id_used, id_used,
Optional<Standalone<StringRef>>(), std::map<Optional<Standalone<StringRef>>, int>(),
true); true);
getWorkerForRoleInDatacenter(regions[0].dcId, getWorkerForRoleInDatacenter(regions[0].dcId,
ProcessClass::CommitProxy, ProcessClass::CommitProxy,
ProcessClass::ExcludeFit, ProcessClass::ExcludeFit,
db.config, db.config,
id_used, id_used,
Optional<Standalone<StringRef>>(), std::map<Optional<Standalone<StringRef>>, int>(),
true); true);
getWorkerForRoleInDatacenter(regions[0].dcId, getWorkerForRoleInDatacenter(regions[0].dcId,
ProcessClass::GrvProxy, ProcessClass::GrvProxy,
ProcessClass::ExcludeFit, ProcessClass::ExcludeFit,
db.config, db.config,
id_used, id_used,
Optional<Standalone<StringRef>>(), std::map<Optional<Standalone<StringRef>>, int>(),
true); true);
vector<Optional<Key>> dcPriority; vector<Optional<Key>> dcPriority;
@ -2090,7 +2098,7 @@ public:
ProcessClass::NeverAssign, ProcessClass::NeverAssign,
db.config, db.config,
id_used, id_used,
Optional<Standalone<StringRef>>(), std::map<Optional<Standalone<StringRef>>, int>(),
true); true);
auto newMasterFit = mworker.worker.processClass.machineClassFitness(ProcessClass::Master); auto newMasterFit = mworker.worker.processClass.machineClassFitness(ProcessClass::Master);
if (db.config.isExcludedServer(mworker.worker.interf.addresses())) { if (db.config.isExcludedServer(mworker.worker.interf.addresses())) {
@ -2250,15 +2258,17 @@ public:
RoleFitness oldLogRoutersFit(log_routers, ProcessClass::LogRouter, old_id_used); RoleFitness oldLogRoutersFit(log_routers, ProcessClass::LogRouter, old_id_used);
RoleFitness newLogRoutersFit = oldLogRoutersFit; RoleFitness newLogRoutersFit = oldLogRoutersFit;
if (db.config.usableRegions > 1 && dbi.recoveryState == RecoveryState::FULLY_RECOVERED) { if (db.config.usableRegions > 1 && dbi.recoveryState == RecoveryState::FULLY_RECOVERED) {
newLogRoutersFit = RoleFitness(getWorkersForRoleInDatacenter(*remoteDC.begin(), newLogRoutersFit =
ProcessClass::LogRouter, RoleFitness(getWorkersForRoleInDatacenter(*remoteDC.begin(),
newRouterCount, ProcessClass::LogRouter,
db.config, newRouterCount,
id_used, db.config,
Optional<WorkerFitnessInfo>(), id_used,
true), std::map<Optional<Standalone<StringRef>>, int>(),
ProcessClass::LogRouter, Optional<WorkerFitnessInfo>(),
id_used); true),
ProcessClass::LogRouter,
id_used);
} }
if (oldLogRoutersFit.count < oldRouterCount) { if (oldLogRoutersFit.count < oldRouterCount) {
@ -2276,27 +2286,31 @@ public:
RoleFitness oldGrvProxyFit(grvProxyClasses, ProcessClass::GrvProxy, old_id_used); RoleFitness oldGrvProxyFit(grvProxyClasses, ProcessClass::GrvProxy, old_id_used);
RoleFitness oldResolverFit(resolverClasses, ProcessClass::Resolver, old_id_used); RoleFitness oldResolverFit(resolverClasses, ProcessClass::Resolver, old_id_used);
std::map<Optional<Standalone<StringRef>>, int> preferredSharing;
auto first_commit_proxy = getWorkerForRoleInDatacenter(clusterControllerDcId, auto first_commit_proxy = getWorkerForRoleInDatacenter(clusterControllerDcId,
ProcessClass::CommitProxy, ProcessClass::CommitProxy,
ProcessClass::ExcludeFit, ProcessClass::ExcludeFit,
db.config, db.config,
id_used, id_used,
Optional<Standalone<StringRef>>(), preferredSharing,
true); true);
preferredSharing[first_commit_proxy.worker.interf.locality.processId()] = 0;
auto first_grv_proxy = getWorkerForRoleInDatacenter(clusterControllerDcId, auto first_grv_proxy = getWorkerForRoleInDatacenter(clusterControllerDcId,
ProcessClass::GrvProxy, ProcessClass::GrvProxy,
ProcessClass::ExcludeFit, ProcessClass::ExcludeFit,
db.config, db.config,
id_used, id_used,
first_commit_proxy.worker.interf.locality.processId(), preferredSharing,
true); true);
preferredSharing[first_grv_proxy.worker.interf.locality.processId()] = 1;
auto first_resolver = getWorkerForRoleInDatacenter(clusterControllerDcId, auto first_resolver = getWorkerForRoleInDatacenter(clusterControllerDcId,
ProcessClass::Resolver, ProcessClass::Resolver,
ProcessClass::ExcludeFit, ProcessClass::ExcludeFit,
db.config, db.config,
id_used, id_used,
first_commit_proxy.worker.interf.locality.processId(), preferredSharing,
true); true);
preferredSharing[first_resolver.worker.interf.locality.processId()] = 2;
auto maxUsed = std::max({ first_commit_proxy.used, first_grv_proxy.used, first_resolver.used }); auto maxUsed = std::max({ first_commit_proxy.used, first_grv_proxy.used, first_resolver.used });
first_commit_proxy.used = maxUsed; first_commit_proxy.used = maxUsed;
first_grv_proxy.used = maxUsed; first_grv_proxy.used = maxUsed;
@ -2306,6 +2320,7 @@ public:
db.config.getDesiredCommitProxies(), db.config.getDesiredCommitProxies(),
db.config, db.config,
id_used, id_used,
preferredSharing,
first_commit_proxy, first_commit_proxy,
true); true);
auto grv_proxies = getWorkersForRoleInDatacenter(clusterControllerDcId, auto grv_proxies = getWorkersForRoleInDatacenter(clusterControllerDcId,
@ -2313,6 +2328,7 @@ public:
db.config.getDesiredGrvProxies(), db.config.getDesiredGrvProxies(),
db.config, db.config,
id_used, id_used,
preferredSharing,
first_grv_proxy, first_grv_proxy,
true); true);
auto resolvers = getWorkersForRoleInDatacenter(clusterControllerDcId, auto resolvers = getWorkersForRoleInDatacenter(clusterControllerDcId,
@ -2320,6 +2336,7 @@ public:
db.config.getDesiredResolvers(), db.config.getDesiredResolvers(),
db.config, db.config,
id_used, id_used,
preferredSharing,
first_resolver, first_resolver,
true); true);
@ -2336,6 +2353,7 @@ public:
nBackup, nBackup,
db.config, db.config,
id_used, id_used,
std::map<Optional<Standalone<StringRef>>, int>(),
Optional<WorkerFitnessInfo>(), Optional<WorkerFitnessInfo>(),
true), true),
ProcessClass::Backup, ProcessClass::Backup,
@ -2775,7 +2793,7 @@ void checkBetterDDOrRK(ClusterControllerData* self) {
ProcessClass::NeverAssign, ProcessClass::NeverAssign,
self->db.config, self->db.config,
id_used, id_used,
Optional<Standalone<StringRef>>(), std::map<Optional<Standalone<StringRef>>, int>(),
true) true)
.worker; .worker;
if (self->onMasterIsBetter(newRKWorker, ProcessClass::Ratekeeper)) { if (self->onMasterIsBetter(newRKWorker, ProcessClass::Ratekeeper)) {
@ -2791,7 +2809,7 @@ void checkBetterDDOrRK(ClusterControllerData* self) {
ProcessClass::NeverAssign, ProcessClass::NeverAssign,
self->db.config, self->db.config,
id_used, id_used,
Optional<Standalone<StringRef>>(), std::map<Optional<Standalone<StringRef>>, int>(),
true) true)
.worker; .worker;
if (self->onMasterIsBetter(newDDWorker, ProcessClass::DataDistributor)) { if (self->onMasterIsBetter(newDDWorker, ProcessClass::DataDistributor)) {