tlog recruitment will prefer non-degraded processes, however it will not choose less than desired number of tlogs to avoid degraded processes
better master exists will switch the master to avoid degraded processes
This commit is contained in:
parent
53f16b5347
commit
45fe6b369b
|
@ -264,7 +264,7 @@ public:
|
|||
}
|
||||
|
||||
std::vector<WorkerDetails> getWorkersForTlogs( DatabaseConfiguration const& conf, int32_t required, int32_t desired, IRepPolicyRef const& policy, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false, std::set<Optional<Key>> dcIds = std::set<Optional<Key>>() ) {
|
||||
std::map<ProcessClass::Fitness, vector<WorkerDetails>> fitness_workers;
|
||||
std::map<std::pair<ProcessClass::Fitness,bool>, vector<WorkerDetails>> fitness_workers;
|
||||
std::vector<WorkerDetails> results;
|
||||
std::vector<LocalityData> unavailableLocals;
|
||||
LocalitySetRef logServerSet;
|
||||
|
@ -277,7 +277,7 @@ public:
|
|||
for( auto& it : id_worker ) {
|
||||
auto fitness = it.second.details.processClass.machineClassFitness( ProcessClass::TLog );
|
||||
if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.details.interf.address()) && fitness != ProcessClass::NeverAssign && (!dcIds.size() || dcIds.count(it.second.details.interf.locality.dcId())) ) {
|
||||
fitness_workers[ fitness ].push_back(it.second.details);
|
||||
fitness_workers[ std::make_pair(fitness,it.second.details.degraded) ].push_back(it.second.details);
|
||||
}
|
||||
else {
|
||||
unavailableLocals.push_back(it.second.details.interf.locality);
|
||||
|
@ -285,47 +285,51 @@ public:
|
|||
}
|
||||
|
||||
results.reserve(results.size() + id_worker.size());
|
||||
for (int fitness = ProcessClass::BestFit; fitness != ProcessClass::NeverAssign; fitness ++)
|
||||
for (int fitness = ProcessClass::BestFit; fitness != ProcessClass::NeverAssign && !bCompleted; fitness++)
|
||||
{
|
||||
auto fitnessEnum = (ProcessClass::Fitness) fitness;
|
||||
if (fitness_workers.find(fitnessEnum) == fitness_workers.end())
|
||||
continue;
|
||||
for (auto& worker : fitness_workers[(ProcessClass::Fitness) fitness] ) {
|
||||
logServerMap->add(worker.interf.locality, &worker);
|
||||
}
|
||||
if (logServerSet->size() < required) {
|
||||
TraceEvent(SevWarn,"GWFTADTooFew", id).detail("Fitness", fitness).detail("Processes", logServerSet->size()).detail("Required", required).detail("TLogPolicy", policy->info()).detail("DesiredLogs", desired);
|
||||
}
|
||||
else if (logServerSet->size() == required || logServerSet->size() <= desired) {
|
||||
if (logServerSet->validate(policy)) {
|
||||
for (auto& object : logServerMap->getObjects()) {
|
||||
results.push_back(*object);
|
||||
}
|
||||
bCompleted = true;
|
||||
break;
|
||||
for(int addingDegraded = 0; addingDegraded < 2; addingDegraded++) {
|
||||
auto workerItr = fitness_workers.find(std::make_pair(fitnessEnum,(bool)addingDegraded));
|
||||
if (workerItr == fitness_workers.end()) {
|
||||
continue;
|
||||
}
|
||||
TraceEvent(SevWarn,"GWFTADNotAcceptable", id).detail("Fitness", fitness).detail("Processes", logServerSet->size()).detail("Required", required).detail("TLogPolicy",policy->info()).detail("DesiredLogs", desired);
|
||||
}
|
||||
// Try to select the desired size, if larger
|
||||
else {
|
||||
std::vector<LocalityEntry> bestSet;
|
||||
std::vector<LocalityData> tLocalities;
|
||||
for (auto& worker : workerItr->second ) {
|
||||
logServerMap->add(worker.interf.locality, &worker);
|
||||
}
|
||||
if (logServerSet->size() < required) {
|
||||
TraceEvent(SevWarn,"GWFTADTooFew", id).detail("Fitness", fitness).detail("Processes", logServerSet->size()).detail("Required", required).detail("TLogPolicy", policy->info()).detail("DesiredLogs", desired).detail("AddingDegraded", addingDegraded);
|
||||
}
|
||||
else if (logServerSet->size() == required || logServerSet->size() <= desired) {
|
||||
if (logServerSet->validate(policy)) {
|
||||
for (auto& object : logServerMap->getObjects()) {
|
||||
results.push_back(*object);
|
||||
}
|
||||
bCompleted = true;
|
||||
break;
|
||||
}
|
||||
TraceEvent(SevWarn,"GWFTADNotAcceptable", id).detail("Fitness", fitness).detail("Processes", logServerSet->size()).detail("Required", required).detail("TLogPolicy",policy->info()).detail("DesiredLogs", desired).detail("AddingDegraded", addingDegraded);
|
||||
}
|
||||
// Try to select the desired size, if larger
|
||||
else {
|
||||
std::vector<LocalityEntry> bestSet;
|
||||
std::vector<LocalityData> tLocalities;
|
||||
|
||||
// Try to find the best team of servers to fulfill the policy
|
||||
if (findBestPolicySet(bestSet, logServerSet, policy, desired, SERVER_KNOBS->POLICY_RATING_TESTS, SERVER_KNOBS->POLICY_GENERATIONS)) {
|
||||
results.reserve(results.size() + bestSet.size());
|
||||
for (auto& entry : bestSet) {
|
||||
auto object = logServerMap->getObject(entry);
|
||||
ASSERT(object);
|
||||
results.push_back(*object);
|
||||
tLocalities.push_back(object->interf.locality);
|
||||
// Try to find the best team of servers to fulfill the policy
|
||||
if (findBestPolicySet(bestSet, logServerSet, policy, desired, SERVER_KNOBS->POLICY_RATING_TESTS, SERVER_KNOBS->POLICY_GENERATIONS)) {
|
||||
results.reserve(results.size() + bestSet.size());
|
||||
for (auto& entry : bestSet) {
|
||||
auto object = logServerMap->getObject(entry);
|
||||
ASSERT(object);
|
||||
results.push_back(*object);
|
||||
tLocalities.push_back(object->interf.locality);
|
||||
}
|
||||
TraceEvent("GWFTADBestResults", id).detail("Fitness", fitness).detail("Processes", logServerSet->size()).detail("BestCount", bestSet.size()).detail("BestZones", ::describeZones(tLocalities))
|
||||
.detail("BestDataHalls", ::describeDataHalls(tLocalities)).detail("TLogPolicy", policy->info()).detail("TotalResults", results.size()).detail("DesiredLogs", desired).detail("AddingDegraded", addingDegraded);
|
||||
bCompleted = true;
|
||||
break;
|
||||
}
|
||||
TraceEvent("GWFTADBestResults", id).detail("Fitness", fitness).detail("Processes", logServerSet->size()).detail("BestCount", bestSet.size()).detail("BestZones", ::describeZones(tLocalities))
|
||||
.detail("BestDataHalls", ::describeDataHalls(tLocalities)).detail("TLogPolicy", policy->info()).detail("TotalResults", results.size()).detail("DesiredLogs", desired);
|
||||
bCompleted = true;
|
||||
break;
|
||||
TraceEvent(SevWarn,"GWFTADNoBest", id).detail("Fitness", fitness).detail("Processes", logServerSet->size()).detail("Required", required).detail("TLogPolicy", policy->info()).detail("DesiredLogs", desired).detail("AddingDegraded", addingDegraded);
|
||||
}
|
||||
TraceEvent(SevWarn,"GWFTADNoBest", id).detail("Fitness", fitness).detail("Processes", logServerSet->size()).detail("Required", required).detail("TLogPolicy", policy->info()).detail("DesiredLogs", desired);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -454,39 +458,44 @@ public:
|
|||
ProcessClass::Fitness worstFit;
|
||||
ProcessClass::ClusterRole role;
|
||||
int count;
|
||||
bool worstIsDegraded;
|
||||
|
||||
RoleFitness(int bestFit, int worstFit, int count, ProcessClass::ClusterRole role) : bestFit((ProcessClass::Fitness)bestFit), worstFit((ProcessClass::Fitness)worstFit), count(count), role(role) {}
|
||||
RoleFitness(int bestFit, int worstFit, int count, ProcessClass::ClusterRole role) : bestFit((ProcessClass::Fitness)bestFit), worstFit((ProcessClass::Fitness)worstFit), count(count), role(role), worstIsDegraded(false) {}
|
||||
|
||||
RoleFitness(int fitness, int count, ProcessClass::ClusterRole role) : bestFit((ProcessClass::Fitness)fitness), worstFit((ProcessClass::Fitness)fitness), count(count), role(role) {}
|
||||
RoleFitness(int fitness, int count, ProcessClass::ClusterRole role) : bestFit((ProcessClass::Fitness)fitness), worstFit((ProcessClass::Fitness)fitness), count(count), role(role), worstIsDegraded(false) {}
|
||||
|
||||
RoleFitness() : bestFit(ProcessClass::NeverAssign), worstFit(ProcessClass::NeverAssign), role(ProcessClass::NoRole), count(0) {}
|
||||
RoleFitness() : bestFit(ProcessClass::NeverAssign), worstFit(ProcessClass::NeverAssign), role(ProcessClass::NoRole), count(0), worstIsDegraded(false) {}
|
||||
|
||||
RoleFitness(RoleFitness first, RoleFitness second, ProcessClass::ClusterRole role) : bestFit(std::min(first.worstFit, second.worstFit)), worstFit(std::max(first.worstFit, second.worstFit)), count(first.count + second.count), role(role) { }
|
||||
RoleFitness(RoleFitness first, RoleFitness second, ProcessClass::ClusterRole role) : bestFit(std::min(first.worstFit, second.worstFit)), worstFit(std::max(first.worstFit, second.worstFit)), count(first.count + second.count), role(role) {
|
||||
if(first.worstFit > second.worstFit) {
|
||||
worstIsDegraded = first.worstIsDegraded;
|
||||
} else if(second.worstFit > first.worstFit) {
|
||||
worstIsDegraded = second.worstIsDegraded;
|
||||
} else {
|
||||
worstIsDegraded = first.worstIsDegraded || second.worstIsDegraded;
|
||||
}
|
||||
}
|
||||
|
||||
RoleFitness( vector<WorkerDetails> workers, ProcessClass::ClusterRole role ) : role(role) {
|
||||
worstFit = ProcessClass::BestFit;
|
||||
worstIsDegraded = false;
|
||||
bestFit = ProcessClass::NeverAssign;
|
||||
for(auto it : workers) {
|
||||
for(auto& it : workers) {
|
||||
auto thisFit = it.processClass.machineClassFitness( role );
|
||||
worstFit = std::max(worstFit, thisFit);
|
||||
if(thisFit > worstFit) {
|
||||
worstFit = thisFit;
|
||||
worstIsDegraded = it.degraded;
|
||||
} else if(thisFit == worstFit) {
|
||||
worstIsDegraded = worstIsDegraded || it.degraded;
|
||||
}
|
||||
bestFit = std::min(bestFit, thisFit);
|
||||
}
|
||||
count = workers.size();
|
||||
}
|
||||
|
||||
RoleFitness( std::vector<ProcessClass> classes, ProcessClass::ClusterRole role ) : role(role) {
|
||||
worstFit = ProcessClass::BestFit;
|
||||
bestFit = ProcessClass::NeverAssign;
|
||||
for(auto it : classes) {
|
||||
auto thisFit = it.machineClassFitness( role );
|
||||
worstFit = std::max(worstFit, thisFit);
|
||||
bestFit = std::min(bestFit, thisFit);
|
||||
}
|
||||
count = classes.size();
|
||||
}
|
||||
|
||||
bool operator < (RoleFitness const& r) const {
|
||||
if (worstFit != r.worstFit) return worstFit < r.worstFit;
|
||||
if (worstIsDegraded != r.worstIsDegraded) return r.worstIsDegraded;
|
||||
// FIXME: TLog recruitment process does not guarantee the best fit is not worsened.
|
||||
if (role != ProcessClass::TLog && role != ProcessClass::LogRouter && bestFit != r.bestFit) return bestFit < r.bestFit;
|
||||
return count > r.count;
|
||||
|
@ -494,18 +503,21 @@ public:
|
|||
|
||||
bool betterFitness (RoleFitness const& r) const {
|
||||
if (worstFit != r.worstFit) return worstFit < r.worstFit;
|
||||
if (worstIsDegraded != r.worstIsDegraded) return r.worstFit;
|
||||
if (bestFit != r.bestFit) return bestFit < r.bestFit;
|
||||
return false;
|
||||
}
|
||||
|
||||
bool betterCount (RoleFitness const& r) const {
|
||||
if(count > r.count) return true;
|
||||
return worstFit < r.worstFit;
|
||||
if(worstFit != r.worstFit) return worstFit < r.worstFit;
|
||||
if (worstIsDegraded != r.worstIsDegraded) return r.worstFit;
|
||||
return false;
|
||||
}
|
||||
|
||||
bool operator == (RoleFitness const& r) const { return worstFit == r.worstFit && bestFit == r.bestFit && count == r.count; }
|
||||
bool operator == (RoleFitness const& r) const { return worstFit == r.worstFit && bestFit == r.bestFit && count == r.count && worstIsDegraded == r.worstIsDegraded; }
|
||||
|
||||
std::string toString() const { return format("%d %d %d", bestFit, worstFit, count); }
|
||||
std::string toString() const { return format("%d %d %d %d", bestFit, worstFit, count, worstIsDegraded); }
|
||||
};
|
||||
|
||||
std::set<Optional<Standalone<StringRef>>> getDatacenters( DatabaseConfiguration const& conf, bool checkStable = false ) {
|
||||
|
@ -1753,7 +1765,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
|
|||
self->db.setDistributor( di );
|
||||
}
|
||||
if( info == self->id_worker.end() ) {
|
||||
self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, newPriorityInfo, false );
|
||||
self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, newPriorityInfo, req.degraded );
|
||||
checkOutstandingRequests( self );
|
||||
return;
|
||||
}
|
||||
|
@ -1766,6 +1778,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
|
|||
info->second.details.processClass = newProcessClass;
|
||||
info->second.priorityInfo = newPriorityInfo;
|
||||
info->second.initialClass = req.initialClass;
|
||||
info->second.details.degraded = req.degraded;
|
||||
info->second.gen = req.generation;
|
||||
|
||||
if(info->second.details.interf.id() != w.id()) {
|
||||
|
|
Loading…
Reference in New Issue