Fix bug where DD or RK could be halted and re-recruited in a loop for certain valid process class configurations. Specifically, recruitment of DD or RK takes into account that master process is preferred over proxy, resolver or cc.

But check for better DD only looks for better machine class ignoring that the new recruit could share a proxy or resolver or CC. Also try to balance the distribution of the DD and RK role if there are enough processes to do so.
This commit is contained in:
Balachandar Namasivayam 2019-11-12 14:22:36 -08:00
parent d76070e37d
commit f5282f2c7e
3 changed files with 47 additions and 22 deletions

View File

@ -1150,7 +1150,7 @@ public:
return false;
}
bool isProxyOrResolver(Optional<Key> processId) {
bool isProxyOrResolverOrCC(Optional<Key> processId) {
ASSERT(masterProcessId.present());
if (processId == masterProcessId) return false;
@ -1161,6 +1161,8 @@ public:
for (const ResolverInterface& interf: dbInfo.resolvers) {
if (interf.locality.processId() == processId) return true;
}
if (processId == clusterControllerProcessId) return true;
return false;
}
@ -1170,7 +1172,7 @@ public:
if ((role != ProcessClass::DataDistributor && role != ProcessClass::Ratekeeper) || pid == masterProcessId.get()) {
return false;
}
return isProxyOrResolver(pid);
return isProxyOrResolverOrCC(pid);
}
std::map< Optional<Standalone<StringRef>>, int> getUsedIds() {
@ -1472,18 +1474,36 @@ void checkBetterDDOrRK(ClusterControllerData* self) {
return;
}
std::map<Optional<Standalone<StringRef>>, int> id_used = self->getUsedIds();
WorkerDetails newRKWorker = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId, ProcessClass::Ratekeeper, ProcessClass::NeverAssign, self->db.config, id_used, true).worker;
if (self->onMasterIsBetter(newRKWorker, ProcessClass::Ratekeeper)) {
newRKWorker = self->id_worker[self->masterProcessId.get()].details;
}
WorkerDetails newDDWorker = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId, ProcessClass::DataDistributor, ProcessClass::NeverAssign, self->db.config, id_used, true).worker;
if (self->onMasterIsBetter(newDDWorker, ProcessClass::DataDistributor)) {
newDDWorker = self->id_worker[self->masterProcessId.get()].details;
}
auto bestFitnessForRK = newRKWorker.processClass.machineClassFitness(ProcessClass::Ratekeeper);
if(self->db.config.isExcludedServer(newRKWorker.interf.address())) {
bestFitnessForRK = std::max(bestFitnessForRK, ProcessClass::ExcludeFit);
}
auto bestFitnessForDD = newDDWorker.processClass.machineClassFitness(ProcessClass::DataDistributor);
if(self->db.config.isExcludedServer(newDDWorker.interf.address())) {
bestFitnessForDD = std::max(bestFitnessForDD, ProcessClass::ExcludeFit);
}
Optional<Standalone<StringRef>> currentRKProcessId;
Optional<Standalone<StringRef>> currentDDProcessId;
auto& db = self->db.serverInfo->get().read();
auto bestFitnessForRK = self->getBestFitnessForRoleInDatacenter(ProcessClass::Ratekeeper);
auto bestFitnessForDD = self->getBestFitnessForRoleInDatacenter(ProcessClass::DataDistributor);
if (db.ratekeeper.present() && self->id_worker.count(db.ratekeeper.get().locality.processId()) &&
(!self->recruitingRatekeeperID.present() || (self->recruitingRatekeeperID.get() == db.ratekeeper.get().id()))) {
auto& rkWorker = self->id_worker[db.ratekeeper.get().locality.processId()];
currentRKProcessId = rkWorker.details.interf.locality.processId();
auto rkFitness = rkWorker.details.processClass.machineClassFitness(ProcessClass::Ratekeeper);
if(rkWorker.priorityInfo.isExcluded) {
rkFitness = ProcessClass::ExcludeFit;
}
if (self->isProxyOrResolver(rkWorker.details.interf.locality.processId()) || rkFitness > bestFitnessForRK) {
if (self->isProxyOrResolverOrCC(rkWorker.details.interf.locality.processId()) || rkFitness > bestFitnessForRK
|| (rkFitness == bestFitnessForRK && rkWorker.details.interf.locality.processId() == self->masterProcessId && newRKWorker.interf.locality.processId() != self->masterProcessId)) {
TraceEvent("CCHaltRK", self->id).detail("RKID", db.ratekeeper.get().id())
.detail("Excluded", rkWorker.priorityInfo.isExcluded)
.detail("Fitness", rkFitness).detail("BestFitness", bestFitnessForRK);
@ -1494,10 +1514,13 @@ void checkBetterDDOrRK(ClusterControllerData* self) {
if (!self->recruitingDistributor && db.distributor.present() && self->id_worker.count(db.distributor.get().locality.processId())) {
auto& ddWorker = self->id_worker[db.distributor.get().locality.processId()];
auto ddFitness = ddWorker.details.processClass.machineClassFitness(ProcessClass::DataDistributor);
currentDDProcessId = ddWorker.details.interf.locality.processId();
if(ddWorker.priorityInfo.isExcluded) {
ddFitness = ProcessClass::ExcludeFit;
}
if (self->isProxyOrResolver(ddWorker.details.interf.locality.processId()) || ddFitness > bestFitnessForDD) {
if (self->isProxyOrResolverOrCC(ddWorker.details.interf.locality.processId()) || ddFitness > bestFitnessForDD
|| (ddFitness == bestFitnessForDD && ddWorker.details.interf.locality.processId() == self->masterProcessId && newDDWorker.interf.locality.processId() != self->masterProcessId)
|| (ddFitness == bestFitnessForDD && (newRKWorker.interf.locality.processId() != newDDWorker.interf.locality.processId()) && (currentDDProcessId.present() && currentRKProcessId.present() && currentDDProcessId == currentRKProcessId))) {
TraceEvent("CCHaltDD", self->id).detail("DDID", db.distributor.get().id())
.detail("Excluded", ddWorker.priorityInfo.isExcluded)
.detail("Fitness", ddFitness).detail("BestFitness", bestFitnessForDD);

View File

@ -1246,6 +1246,7 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
bool requiresExtraDBMachines = extraDB && g_simulator.extraDB->toString() != conn.toString();
int assignedMachines = 0, nonVersatileMachines = 0;
std::vector<ProcessClass::ClassType> processClassesSubSet = {ProcessClass::UnsetClass, ProcessClass::ResolutionClass, ProcessClass::MasterClass};
for( int dc = 0; dc < dataCenters; dc++ ) {
//FIXME: test unset dcID
Optional<Standalone<StringRef>> dcUID = StringRef(format("%d", dc));
@ -1270,7 +1271,7 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
if(assignedMachines < 4)
processClass = ProcessClass((ProcessClass::ClassType) deterministicRandom()->randomInt(0, 2), ProcessClass::CommandLineSource); //Unset or Storage
else if(assignedMachines == 4 && !simconfig.db.regions.size())
processClass = ProcessClass((ProcessClass::ClassType) (deterministicRandom()->randomInt(0, 2) * ProcessClass::ResolutionClass), ProcessClass::CommandLineSource); //Unset or Resolution
processClass = ProcessClass(processClassesSubSet[deterministicRandom()->randomInt(0, processClassesSubSet.size())], ProcessClass::CommandLineSource); //Unset or Resolution or Master
else
processClass = ProcessClass((ProcessClass::ClassType) deterministicRandom()->randomInt(0, 3), ProcessClass::CommandLineSource); //Unset, Storage, or Transaction
if (processClass == ProcessClass::ResolutionClass) // *can't* be assigned to other roles, even in an emergency

View File

@ -1423,21 +1423,22 @@ struct ConsistencyCheckWorkload : TestWorkload
}
}
// Check DataDistributor
ProcessClass::Fitness bestDistributorFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[masterDcId], ProcessClass::DataDistributor);
if (db.distributor.present() && (!nonExcludedWorkerProcessMap.count(db.distributor.get().address()) || nonExcludedWorkerProcessMap[db.distributor.get().address()].processClass.machineClassFitness(ProcessClass::DataDistributor) != bestDistributorFitness)) {
TraceEvent("ConsistencyCheck_DistributorNotBest").detail("BestDataDistributorFitness", bestDistributorFitness)
.detail("ExistingDistributorFitness", nonExcludedWorkerProcessMap.count(db.distributor.get().address()) ? nonExcludedWorkerProcessMap[db.distributor.get().address()].processClass.machineClassFitness(ProcessClass::DataDistributor) : -1);
return false;
}
// TODO: Need more sophisticated checks for DD and Ratekeeper
// // Check DataDistributor
// ProcessClass::Fitness bestDistributorFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[masterDcId], ProcessClass::DataDistributor);
// if (db.distributor.present() && (!nonExcludedWorkerProcessMap.count(db.distributor.get().address()) || nonExcludedWorkerProcessMap[db.distributor.get().address()].processClass.machineClassFitness(ProcessClass::DataDistributor) != bestDistributorFitness)) {
// TraceEvent("ConsistencyCheck_DistributorNotBest").detail("BestDataDistributorFitness", bestDistributorFitness)
// .detail("ExistingDistributorFitness", nonExcludedWorkerProcessMap.count(db.distributor.get().address()) ? nonExcludedWorkerProcessMap[db.distributor.get().address()].processClass.machineClassFitness(ProcessClass::DataDistributor) : -1);
// return false;
// }
// Check Ratekeeper
ProcessClass::Fitness bestRatekeeperFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[masterDcId], ProcessClass::Ratekeeper);
if (db.ratekeeper.present() && (!nonExcludedWorkerProcessMap.count(db.ratekeeper.get().address()) || nonExcludedWorkerProcessMap[db.ratekeeper.get().address()].processClass.machineClassFitness(ProcessClass::Ratekeeper) != bestRatekeeperFitness)) {
TraceEvent("ConsistencyCheck_RatekeeperNotBest").detail("BestRatekeeperFitness", bestRatekeeperFitness)
.detail("ExistingRatekeeperFitness", nonExcludedWorkerProcessMap.count(db.ratekeeper.get().address()) ? nonExcludedWorkerProcessMap[db.ratekeeper.get().address()].processClass.machineClassFitness(ProcessClass::Ratekeeper) : -1);
return false;
}
// // Check Ratekeeper
// ProcessClass::Fitness bestRatekeeperFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[masterDcId], ProcessClass::Ratekeeper);
// if (db.ratekeeper.present() && (!nonExcludedWorkerProcessMap.count(db.ratekeeper.get().address()) || nonExcludedWorkerProcessMap[db.ratekeeper.get().address()].processClass.machineClassFitness(ProcessClass::Ratekeeper) != bestRatekeeperFitness)) {
// TraceEvent("ConsistencyCheck_RatekeeperNotBest").detail("BestRatekeeperFitness", bestRatekeeperFitness)
// .detail("ExistingRatekeeperFitness", nonExcludedWorkerProcessMap.count(db.ratekeeper.get().address()) ? nonExcludedWorkerProcessMap[db.ratekeeper.get().address()].processClass.machineClassFitness(ProcessClass::Ratekeeper) : -1);
// return false;
// }
// TODO: Check Tlog