Merge pull request #2344 from bnamasivayam/release-6.2
Fix bug where DD or RK could be halted and re-recruited in a loop for…
This commit is contained in:
commit
b1b5f88cb1
|
@ -1150,7 +1150,7 @@ public:
|
|||
return false;
|
||||
}
|
||||
|
||||
bool isProxyOrResolver(Optional<Key> processId) {
|
||||
bool isProxyOrResolverOrCC(Optional<Key> processId) {
|
||||
ASSERT(masterProcessId.present());
|
||||
if (processId == masterProcessId) return false;
|
||||
|
||||
|
@ -1161,6 +1161,8 @@ public:
|
|||
for (const ResolverInterface& interf: dbInfo.resolvers) {
|
||||
if (interf.locality.processId() == processId) return true;
|
||||
}
|
||||
if (processId == clusterControllerProcessId) return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1170,7 +1172,7 @@ public:
|
|||
if ((role != ProcessClass::DataDistributor && role != ProcessClass::Ratekeeper) || pid == masterProcessId.get()) {
|
||||
return false;
|
||||
}
|
||||
return isProxyOrResolver(pid);
|
||||
return isProxyOrResolverOrCC(pid);
|
||||
}
|
||||
|
||||
std::map< Optional<Standalone<StringRef>>, int> getUsedIds() {
|
||||
|
@ -1472,18 +1474,42 @@ void checkBetterDDOrRK(ClusterControllerData* self) {
|
|||
return;
|
||||
}
|
||||
|
||||
auto& db = self->db.serverInfo->get().read();
|
||||
auto bestFitnessForRK = self->getBestFitnessForRoleInDatacenter(ProcessClass::Ratekeeper);
|
||||
auto bestFitnessForDD = self->getBestFitnessForRoleInDatacenter(ProcessClass::DataDistributor);
|
||||
std::map<Optional<Standalone<StringRef>>, int> id_used = self->getUsedIds();
|
||||
WorkerDetails newRKWorker = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId, ProcessClass::Ratekeeper, ProcessClass::NeverAssign, self->db.config, id_used, true).worker;
|
||||
if (self->onMasterIsBetter(newRKWorker, ProcessClass::Ratekeeper)) {
|
||||
id_used[self->masterProcessId]++;
|
||||
id_used[newRKWorker.interf.locality.processId()]--;
|
||||
newRKWorker = self->id_worker[self->masterProcessId.get()].details;
|
||||
}
|
||||
WorkerDetails newDDWorker = self->getWorkerForRoleInDatacenter(self->clusterControllerDcId, ProcessClass::DataDistributor, ProcessClass::NeverAssign, self->db.config, id_used, true).worker;
|
||||
if (self->onMasterIsBetter(newDDWorker, ProcessClass::DataDistributor)) {
|
||||
newDDWorker = self->id_worker[self->masterProcessId.get()].details;
|
||||
}
|
||||
auto bestFitnessForRK = newRKWorker.processClass.machineClassFitness(ProcessClass::Ratekeeper);
|
||||
if(self->db.config.isExcludedServer(newRKWorker.interf.address())) {
|
||||
bestFitnessForRK = std::max(bestFitnessForRK, ProcessClass::ExcludeFit);
|
||||
}
|
||||
auto bestFitnessForDD = newDDWorker.processClass.machineClassFitness(ProcessClass::DataDistributor);
|
||||
if(self->db.config.isExcludedServer(newDDWorker.interf.address())) {
|
||||
bestFitnessForDD = std::max(bestFitnessForDD, ProcessClass::ExcludeFit);
|
||||
}
|
||||
//TraceEvent("CheckBetterDDorRKNewRecruits", self->id).detail("MasterProcessId", self->masterProcessId)
|
||||
//.detail("NewRecruitRKProcessId", newRKWorker.interf.locality.processId()).detail("NewRecruiteDDProcessId", newDDWorker.interf.locality.processId());
|
||||
|
||||
Optional<Standalone<StringRef>> currentRKProcessId;
|
||||
Optional<Standalone<StringRef>> currentDDProcessId;
|
||||
|
||||
auto& db = self->db.serverInfo->get().read();
|
||||
if (db.ratekeeper.present() && self->id_worker.count(db.ratekeeper.get().locality.processId()) &&
|
||||
(!self->recruitingRatekeeperID.present() || (self->recruitingRatekeeperID.get() == db.ratekeeper.get().id()))) {
|
||||
auto& rkWorker = self->id_worker[db.ratekeeper.get().locality.processId()];
|
||||
currentRKProcessId = rkWorker.details.interf.locality.processId();
|
||||
auto rkFitness = rkWorker.details.processClass.machineClassFitness(ProcessClass::Ratekeeper);
|
||||
if(rkWorker.priorityInfo.isExcluded) {
|
||||
rkFitness = ProcessClass::ExcludeFit;
|
||||
}
|
||||
if (self->isProxyOrResolver(rkWorker.details.interf.locality.processId()) || rkFitness > bestFitnessForRK) {
|
||||
if (self->isProxyOrResolverOrCC(rkWorker.details.interf.locality.processId()) || rkFitness > bestFitnessForRK
|
||||
|| (rkFitness == bestFitnessForRK && rkWorker.details.interf.locality.processId() == self->masterProcessId && newRKWorker.interf.locality.processId() != self->masterProcessId)) {
|
||||
TraceEvent("CCHaltRK", self->id).detail("RKID", db.ratekeeper.get().id())
|
||||
.detail("Excluded", rkWorker.priorityInfo.isExcluded)
|
||||
.detail("Fitness", rkFitness).detail("BestFitness", bestFitnessForRK);
|
||||
|
@ -1494,13 +1520,19 @@ void checkBetterDDOrRK(ClusterControllerData* self) {
|
|||
if (!self->recruitingDistributor && db.distributor.present() && self->id_worker.count(db.distributor.get().locality.processId())) {
|
||||
auto& ddWorker = self->id_worker[db.distributor.get().locality.processId()];
|
||||
auto ddFitness = ddWorker.details.processClass.machineClassFitness(ProcessClass::DataDistributor);
|
||||
currentDDProcessId = ddWorker.details.interf.locality.processId();
|
||||
if(ddWorker.priorityInfo.isExcluded) {
|
||||
ddFitness = ProcessClass::ExcludeFit;
|
||||
}
|
||||
if (self->isProxyOrResolver(ddWorker.details.interf.locality.processId()) || ddFitness > bestFitnessForDD) {
|
||||
if (self->isProxyOrResolverOrCC(ddWorker.details.interf.locality.processId()) || ddFitness > bestFitnessForDD
|
||||
|| (ddFitness == bestFitnessForDD && ddWorker.details.interf.locality.processId() == self->masterProcessId && newDDWorker.interf.locality.processId() != self->masterProcessId)
|
||||
|| (ddFitness == bestFitnessForDD && (newRKWorker.interf.locality.processId() != newDDWorker.interf.locality.processId())
|
||||
&& (newRKWorker.interf.locality.processId() != self->masterProcessId && newDDWorker.interf.locality.processId() != self->masterProcessId) && (currentRKProcessId.present() && currentDDProcessId == currentRKProcessId))) {
|
||||
TraceEvent("CCHaltDD", self->id).detail("DDID", db.distributor.get().id())
|
||||
.detail("Excluded", ddWorker.priorityInfo.isExcluded)
|
||||
.detail("Fitness", ddFitness).detail("BestFitness", bestFitnessForDD);
|
||||
.detail("Fitness", ddFitness).detail("BestFitness", bestFitnessForDD)
|
||||
.detail("CurrentRateKeeperProcessId", currentRKProcessId.present() ? currentRKProcessId.get() : LiteralStringRef("None"))
|
||||
.detail("CurrentDDProcessId", currentDDProcessId);
|
||||
ddWorker.haltDistributor = brokenPromiseToNever(db.distributor.get().haltDataDistributor.getReply(HaltDataDistributorRequest(self->id)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1246,6 +1246,7 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
|
|||
|
||||
bool requiresExtraDBMachines = extraDB && g_simulator.extraDB->toString() != conn.toString();
|
||||
int assignedMachines = 0, nonVersatileMachines = 0;
|
||||
std::vector<ProcessClass::ClassType> processClassesSubSet = {ProcessClass::UnsetClass, ProcessClass::ResolutionClass, ProcessClass::MasterClass};
|
||||
for( int dc = 0; dc < dataCenters; dc++ ) {
|
||||
//FIXME: test unset dcID
|
||||
Optional<Standalone<StringRef>> dcUID = StringRef(format("%d", dc));
|
||||
|
@ -1270,7 +1271,7 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
|
|||
if(assignedMachines < 4)
|
||||
processClass = ProcessClass((ProcessClass::ClassType) deterministicRandom()->randomInt(0, 2), ProcessClass::CommandLineSource); //Unset or Storage
|
||||
else if(assignedMachines == 4 && !simconfig.db.regions.size())
|
||||
processClass = ProcessClass((ProcessClass::ClassType) (deterministicRandom()->randomInt(0, 2) * ProcessClass::ResolutionClass), ProcessClass::CommandLineSource); //Unset or Resolution
|
||||
processClass = ProcessClass(processClassesSubSet[deterministicRandom()->randomInt(0, processClassesSubSet.size())], ProcessClass::CommandLineSource); //Unset or Resolution or Master
|
||||
else
|
||||
processClass = ProcessClass((ProcessClass::ClassType) deterministicRandom()->randomInt(0, 3), ProcessClass::CommandLineSource); //Unset, Storage, or Transaction
|
||||
if (processClass == ProcessClass::ResolutionClass) // *can't* be assigned to other roles, even in an emergency
|
||||
|
|
|
@ -1424,17 +1424,16 @@ struct ConsistencyCheckWorkload : TestWorkload
|
|||
}
|
||||
|
||||
// Check DataDistributor
|
||||
ProcessClass::Fitness bestDistributorFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[masterDcId], ProcessClass::DataDistributor);
|
||||
if (db.distributor.present() && (!nonExcludedWorkerProcessMap.count(db.distributor.get().address()) || nonExcludedWorkerProcessMap[db.distributor.get().address()].processClass.machineClassFitness(ProcessClass::DataDistributor) != bestDistributorFitness)) {
|
||||
TraceEvent("ConsistencyCheck_DistributorNotBest").detail("BestDataDistributorFitness", bestDistributorFitness)
|
||||
ProcessClass::Fitness fitnessLowerBound = allWorkerProcessMap[db.master.address()].processClass.machineClassFitness(ProcessClass::DataDistributor);
|
||||
if (db.distributor.present() && (!nonExcludedWorkerProcessMap.count(db.distributor.get().address()) || nonExcludedWorkerProcessMap[db.distributor.get().address()].processClass.machineClassFitness(ProcessClass::DataDistributor) > fitnessLowerBound)) {
|
||||
TraceEvent("ConsistencyCheck_DistributorNotBest").detail("DataDistributorFitnessLowerBound", fitnessLowerBound)
|
||||
.detail("ExistingDistributorFitness", nonExcludedWorkerProcessMap.count(db.distributor.get().address()) ? nonExcludedWorkerProcessMap[db.distributor.get().address()].processClass.machineClassFitness(ProcessClass::DataDistributor) : -1);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check Ratekeeper
|
||||
ProcessClass::Fitness bestRatekeeperFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[masterDcId], ProcessClass::Ratekeeper);
|
||||
if (db.ratekeeper.present() && (!nonExcludedWorkerProcessMap.count(db.ratekeeper.get().address()) || nonExcludedWorkerProcessMap[db.ratekeeper.get().address()].processClass.machineClassFitness(ProcessClass::Ratekeeper) != bestRatekeeperFitness)) {
|
||||
TraceEvent("ConsistencyCheck_RatekeeperNotBest").detail("BestRatekeeperFitness", bestRatekeeperFitness)
|
||||
if (db.ratekeeper.present() && (!nonExcludedWorkerProcessMap.count(db.ratekeeper.get().address()) || nonExcludedWorkerProcessMap[db.ratekeeper.get().address()].processClass.machineClassFitness(ProcessClass::Ratekeeper) > fitnessLowerBound)) {
|
||||
TraceEvent("ConsistencyCheck_RatekeeperNotBest").detail("BestRatekeeperFitness", fitnessLowerBound)
|
||||
.detail("ExistingRatekeeperFitness", nonExcludedWorkerProcessMap.count(db.ratekeeper.get().address()) ? nonExcludedWorkerProcessMap[db.ratekeeper.get().address()].processClass.machineClassFitness(ProcessClass::Ratekeeper) : -1);
|
||||
return false;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue