Re-enable CheckDesiredClasses after making necessary changes for multi-region setup.
Fixed a couple of bugs 1) A rare race condition where a worker is being roles even after it died. 2) Fix how RoleFitness is calculated for TLog and LogRouter. Only worst fitness is compared to see if a better fit is available.
This commit is contained in:
parent
acfb3bd2de
commit
a8e2e75cd5
|
@ -28,7 +28,7 @@ struct ProcessClass {
|
||||||
// This enum is stored in restartInfo.ini for upgrade tests, so be very careful about changing the existing items!
|
// This enum is stored in restartInfo.ini for upgrade tests, so be very careful about changing the existing items!
|
||||||
enum ClassType { UnsetClass, StorageClass, TransactionClass, ResolutionClass, TesterClass, ProxyClass, MasterClass, StatelessClass, LogClass, ClusterControllerClass, LogRouterClass, InvalidClass = -1 };
|
enum ClassType { UnsetClass, StorageClass, TransactionClass, ResolutionClass, TesterClass, ProxyClass, MasterClass, StatelessClass, LogClass, ClusterControllerClass, LogRouterClass, InvalidClass = -1 };
|
||||||
enum Fitness { BestFit, GoodFit, UnsetFit, OkayFit, WorstFit, ExcludeFit, NeverAssign }; //cannot be larger than 7 because of leader election mask
|
enum Fitness { BestFit, GoodFit, UnsetFit, OkayFit, WorstFit, ExcludeFit, NeverAssign }; //cannot be larger than 7 because of leader election mask
|
||||||
enum ClusterRole { Storage, TLog, Proxy, Master, Resolver, LogRouter, ClusterController };
|
enum ClusterRole { Storage, TLog, Proxy, Master, Resolver, LogRouter, ClusterController, NoRole };
|
||||||
enum ClassSource { CommandLineSource, AutoSource, DBSource, InvalidSource = -1 };
|
enum ClassSource { CommandLineSource, AutoSource, DBSource, InvalidSource = -1 };
|
||||||
int16_t _class;
|
int16_t _class;
|
||||||
int16_t _source;
|
int16_t _source;
|
||||||
|
|
|
@ -438,17 +438,18 @@ public:
|
||||||
struct RoleFitness {
|
struct RoleFitness {
|
||||||
ProcessClass::Fitness bestFit;
|
ProcessClass::Fitness bestFit;
|
||||||
ProcessClass::Fitness worstFit;
|
ProcessClass::Fitness worstFit;
|
||||||
|
ProcessClass::ClusterRole role;
|
||||||
int count;
|
int count;
|
||||||
|
|
||||||
RoleFitness(int bestFit, int worstFit, int count) : bestFit((ProcessClass::Fitness)bestFit), worstFit((ProcessClass::Fitness)worstFit), count(count) {}
|
RoleFitness(int bestFit, int worstFit, int count, ProcessClass::ClusterRole role) : bestFit((ProcessClass::Fitness)bestFit), worstFit((ProcessClass::Fitness)worstFit), count(count), role(role) {}
|
||||||
|
|
||||||
RoleFitness(int fitness, int count) : bestFit((ProcessClass::Fitness)fitness), worstFit((ProcessClass::Fitness)fitness), count(count) {}
|
RoleFitness(int fitness, int count, ProcessClass::ClusterRole role) : bestFit((ProcessClass::Fitness)fitness), worstFit((ProcessClass::Fitness)fitness), count(count), role(role) {}
|
||||||
|
|
||||||
RoleFitness() : bestFit(ProcessClass::NeverAssign), worstFit(ProcessClass::NeverAssign), count(0) {}
|
RoleFitness() : bestFit(ProcessClass::NeverAssign), worstFit(ProcessClass::NeverAssign), role(ProcessClass::NoRole), count(0) {}
|
||||||
|
|
||||||
RoleFitness(RoleFitness first, RoleFitness second) : bestFit(std::min(first.worstFit, second.worstFit)), worstFit(std::max(first.worstFit, second.worstFit)), count(first.count + second.count) {}
|
RoleFitness(RoleFitness first, RoleFitness second, ProcessClass::ClusterRole role) : bestFit(std::min(first.worstFit, second.worstFit)), worstFit(std::max(first.worstFit, second.worstFit)), count(first.count + second.count), role(role) { }
|
||||||
|
|
||||||
RoleFitness( vector<std::pair<WorkerInterface, ProcessClass>> workers, ProcessClass::ClusterRole role ) {
|
RoleFitness( vector<std::pair<WorkerInterface, ProcessClass>> workers, ProcessClass::ClusterRole role ) : role(role) {
|
||||||
worstFit = ProcessClass::BestFit;
|
worstFit = ProcessClass::BestFit;
|
||||||
bestFit = ProcessClass::NeverAssign;
|
bestFit = ProcessClass::NeverAssign;
|
||||||
for(auto it : workers) {
|
for(auto it : workers) {
|
||||||
|
@ -459,7 +460,7 @@ public:
|
||||||
count = workers.size();
|
count = workers.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
RoleFitness( std::vector<ProcessClass> classes, ProcessClass::ClusterRole role ) {
|
RoleFitness( std::vector<ProcessClass> classes, ProcessClass::ClusterRole role ) : role(role) {
|
||||||
worstFit = ProcessClass::BestFit;
|
worstFit = ProcessClass::BestFit;
|
||||||
bestFit = ProcessClass::NeverAssign;
|
bestFit = ProcessClass::NeverAssign;
|
||||||
for(auto it : classes) {
|
for(auto it : classes) {
|
||||||
|
@ -472,7 +473,8 @@ public:
|
||||||
|
|
||||||
bool operator < (RoleFitness const& r) const {
|
bool operator < (RoleFitness const& r) const {
|
||||||
if (worstFit != r.worstFit) return worstFit < r.worstFit;
|
if (worstFit != r.worstFit) return worstFit < r.worstFit;
|
||||||
if (bestFit != r.bestFit) return bestFit < r.bestFit;
|
// FIXME: TLog recruitment process does not guarantee the best fit is not worsened.
|
||||||
|
if ((role != ProcessClass::TLog || role != ProcessClass::LogRouter) && bestFit != r.bestFit) return bestFit < r.bestFit;
|
||||||
return count > r.count;
|
return count > r.count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -489,7 +491,7 @@ public:
|
||||||
|
|
||||||
bool operator == (RoleFitness const& r) const { return worstFit == r.worstFit && bestFit == r.bestFit && count == r.count; }
|
bool operator == (RoleFitness const& r) const { return worstFit == r.worstFit && bestFit == r.bestFit && count == r.count; }
|
||||||
|
|
||||||
std::string toString() const { return format("%d %d &d", bestFit, worstFit, count); }
|
std::string toString() const { return format("%d %d %d", bestFit, worstFit, count); }
|
||||||
};
|
};
|
||||||
|
|
||||||
std::set<Optional<Standalone<StringRef>>> getDatacenters( DatabaseConfiguration const& conf, bool checkStable = false ) {
|
std::set<Optional<Standalone<StringRef>>> getDatacenters( DatabaseConfiguration const& conf, bool checkStable = false ) {
|
||||||
|
@ -532,8 +534,8 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
if( now() - remoteStartTime.get() < SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY &&
|
if( now() - remoteStartTime.get() < SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY &&
|
||||||
( ( RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredRemoteLogs()).betterCount(RoleFitness(remoteLogs, ProcessClass::TLog)) ) ||
|
( ( RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredRemoteLogs(), ProcessClass::TLog).betterCount(RoleFitness(remoteLogs, ProcessClass::TLog)) ) ||
|
||||||
( RoleFitness(SERVER_KNOBS->EXPECTED_LOG_ROUTER_FITNESS, req.logRouterCount).betterCount(RoleFitness(logRouters, ProcessClass::LogRouter)) ) ) ) {
|
( RoleFitness(SERVER_KNOBS->EXPECTED_LOG_ROUTER_FITNESS, req.logRouterCount, ProcessClass::LogRouter).betterCount(RoleFitness(logRouters, ProcessClass::LogRouter)) ) ) ) {
|
||||||
throw operation_failed();
|
throw operation_failed();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -600,10 +602,10 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
|
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
|
||||||
( RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()).betterCount(RoleFitness(tlogs, ProcessClass::TLog)) ||
|
( RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs(), ProcessClass::TLog).betterCount(RoleFitness(tlogs, ProcessClass::TLog)) ||
|
||||||
( region.satelliteTLogReplicationFactor > 0 && RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredSatelliteLogs(dcId)).betterCount(RoleFitness(satelliteLogs, ProcessClass::TLog)) ) ||
|
( region.satelliteTLogReplicationFactor > 0 && RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredSatelliteLogs(dcId), ProcessClass::TLog).betterCount(RoleFitness(satelliteLogs, ProcessClass::TLog)) ) ||
|
||||||
RoleFitness(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, req.configuration.getDesiredProxies()).betterCount(RoleFitness(proxies, ProcessClass::Proxy)) ||
|
RoleFitness(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, req.configuration.getDesiredProxies(), ProcessClass::Proxy).betterCount(RoleFitness(proxies, ProcessClass::Proxy)) ||
|
||||||
RoleFitness(SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredResolvers()).betterCount(RoleFitness(resolvers, ProcessClass::Resolver)) ) ) {
|
RoleFitness(SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS, req.configuration.getDesiredResolvers(), ProcessClass::Resolver).betterCount(RoleFitness(resolvers, ProcessClass::Resolver)) ) ) {
|
||||||
return operation_failed();
|
return operation_failed();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -705,7 +707,7 @@ public:
|
||||||
proxies.push_back(first_proxy.worker);
|
proxies.push_back(first_proxy.worker);
|
||||||
resolvers.push_back(first_resolver.worker);
|
resolvers.push_back(first_resolver.worker);
|
||||||
|
|
||||||
auto fitness = RoleFitness( RoleFitness(proxies, ProcessClass::Proxy), RoleFitness(resolvers, ProcessClass::Resolver) );
|
auto fitness = RoleFitness( RoleFitness(proxies, ProcessClass::Proxy), RoleFitness(resolvers, ProcessClass::Resolver), ProcessClass::NoRole );
|
||||||
|
|
||||||
if(dcId == clusterControllerDcId) {
|
if(dcId == clusterControllerDcId) {
|
||||||
bestFitness = fitness;
|
bestFitness = fitness;
|
||||||
|
@ -750,8 +752,8 @@ public:
|
||||||
.detail("DesiredResolvers", req.configuration.getDesiredResolvers()).detail("ActualResolvers", result.resolvers.size());
|
.detail("DesiredResolvers", req.configuration.getDesiredResolvers()).detail("ActualResolvers", result.resolvers.size());
|
||||||
|
|
||||||
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
|
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
|
||||||
( RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs()).betterCount(RoleFitness(tlogs, ProcessClass::TLog)) ||
|
( RoleFitness(SERVER_KNOBS->EXPECTED_TLOG_FITNESS, req.configuration.getDesiredLogs(), ProcessClass::TLog).betterCount(RoleFitness(tlogs, ProcessClass::TLog)) ||
|
||||||
RoleFitness(std::min(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS), std::max(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS), req.configuration.getDesiredProxies()+req.configuration.getDesiredResolvers()).betterCount(bestFitness) ) ) {
|
RoleFitness(std::min(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS), std::max(SERVER_KNOBS->EXPECTED_PROXY_FITNESS, SERVER_KNOBS->EXPECTED_RESOLVER_FITNESS), req.configuration.getDesiredProxies()+req.configuration.getDesiredResolvers(), ProcessClass::NoRole).betterCount(bestFitness) ) ) {
|
||||||
throw operation_failed();
|
throw operation_failed();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -945,10 +947,11 @@ public:
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
RoleFitness oldRemoteTLogFit(remote_tlogs, ProcessClass::TLog);
|
RoleFitness oldRemoteTLogFit(remote_tlogs, ProcessClass::TLog);
|
||||||
RoleFitness newRemoteTLogFit((db.config.usableRegions > 1 && dbi.recoveryState == RecoveryState::FULLY_RECOVERED) ? getWorkersForTlogs(db.config, db.config.getRemoteTLogReplicationFactor(), db.config.getDesiredRemoteLogs(), db.config.getRemoteTLogPolicy(), id_used, true, remoteDC) : remote_tlogs, ProcessClass::TLog);
|
RoleFitness newRemoteTLogFit(
|
||||||
|
(db.config.usableRegions > 1 && dbi.recoveryState == RecoveryState::FULLY_RECOVERED) ?
|
||||||
|
getWorkersForTlogs(db.config, db.config.getRemoteTLogReplicationFactor(), db.config.getDesiredRemoteLogs(), db.config.getRemoteTLogPolicy(), id_used, true, remoteDC)
|
||||||
|
: remote_tlogs, ProcessClass::TLog);
|
||||||
if(oldRemoteTLogFit < newRemoteTLogFit) return false;
|
if(oldRemoteTLogFit < newRemoteTLogFit) return false;
|
||||||
|
|
||||||
int oldRouterCount = oldTLogFit.count * std::max<int>(1, db.config.desiredLogRouterCount / std::max(1,oldTLogFit.count));
|
int oldRouterCount = oldTLogFit.count * std::max<int>(1, db.config.desiredLogRouterCount / std::max(1,oldTLogFit.count));
|
||||||
int newRouterCount = newTLogFit.count * std::max<int>(1, db.config.desiredLogRouterCount / std::max(1,newTLogFit.count));
|
int newRouterCount = newTLogFit.count * std::max<int>(1, db.config.desiredLogRouterCount / std::max(1,newTLogFit.count));
|
||||||
RoleFitness oldLogRoutersFit(log_routers, ProcessClass::LogRouter);
|
RoleFitness oldLogRoutersFit(log_routers, ProcessClass::LogRouter);
|
||||||
|
@ -960,11 +963,9 @@ public:
|
||||||
if(newLogRoutersFit.count < newRouterCount) {
|
if(newLogRoutersFit.count < newRouterCount) {
|
||||||
newLogRoutersFit.worstFit = ProcessClass::NeverAssign;
|
newLogRoutersFit.worstFit = ProcessClass::NeverAssign;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(oldLogRoutersFit < newLogRoutersFit) return false;
|
if(oldLogRoutersFit < newLogRoutersFit) return false;
|
||||||
|
|
||||||
// Check proxy/resolver fitness
|
// Check proxy/resolver fitness
|
||||||
RoleFitness oldInFit(RoleFitness(proxyClasses, ProcessClass::Proxy), RoleFitness(resolverClasses, ProcessClass::Resolver));
|
RoleFitness oldInFit(RoleFitness(proxyClasses, ProcessClass::Proxy), RoleFitness(resolverClasses, ProcessClass::Resolver), ProcessClass::NoRole);
|
||||||
|
|
||||||
auto first_resolver = getWorkerForRoleInDatacenter( clusterControllerDcId, ProcessClass::Resolver, ProcessClass::ExcludeFit, db.config, id_used, true );
|
auto first_resolver = getWorkerForRoleInDatacenter( clusterControllerDcId, ProcessClass::Resolver, ProcessClass::ExcludeFit, db.config, id_used, true );
|
||||||
auto first_proxy = getWorkerForRoleInDatacenter( clusterControllerDcId, ProcessClass::Proxy, ProcessClass::ExcludeFit, db.config, id_used, true );
|
auto first_proxy = getWorkerForRoleInDatacenter( clusterControllerDcId, ProcessClass::Proxy, ProcessClass::ExcludeFit, db.config, id_used, true );
|
||||||
|
@ -974,10 +975,8 @@ public:
|
||||||
proxies.push_back(first_proxy.worker);
|
proxies.push_back(first_proxy.worker);
|
||||||
resolvers.push_back(first_resolver.worker);
|
resolvers.push_back(first_resolver.worker);
|
||||||
|
|
||||||
RoleFitness newInFit(RoleFitness(proxies, ProcessClass::Proxy), RoleFitness(resolvers, ProcessClass::Resolver));
|
RoleFitness newInFit(RoleFitness(proxies, ProcessClass::Proxy), RoleFitness(resolvers, ProcessClass::Resolver), ProcessClass::NoRole);
|
||||||
|
|
||||||
if(oldInFit.betterFitness(newInFit)) return false;
|
if(oldInFit.betterFitness(newInFit)) return false;
|
||||||
|
|
||||||
if(oldTLogFit > newTLogFit || oldInFit > newInFit || (oldSatelliteFallback && !newSatelliteFallback) || oldSatelliteTLogFit > newSatelliteTLogFit || oldRemoteTLogFit > newRemoteTLogFit || oldLogRoutersFit > newLogRoutersFit) {
|
if(oldTLogFit > newTLogFit || oldInFit > newInFit || (oldSatelliteFallback && !newSatelliteFallback) || oldSatelliteTLogFit > newSatelliteTLogFit || oldRemoteTLogFit > newRemoteTLogFit || oldLogRoutersFit > newLogRoutersFit) {
|
||||||
TraceEvent("BetterMasterExists", id).detail("OldMasterFit", oldMasterFit).detail("NewMasterFit", mworker.fitness)
|
TraceEvent("BetterMasterExists", id).detail("OldMasterFit", oldMasterFit).detail("NewMasterFit", mworker.fitness)
|
||||||
.detail("OldTLogFit", oldTLogFit.toString()).detail("NewTLogFit", newTLogFit.toString())
|
.detail("OldTLogFit", oldTLogFit.toString()).detail("NewTLogFit", newTLogFit.toString())
|
||||||
|
@ -1321,6 +1320,9 @@ ACTOR Future<Void> rebootAndCheck( ClusterControllerData* cluster, Optional<Stan
|
||||||
ACTOR Future<Void> workerAvailabilityWatch( WorkerInterface worker, ProcessClass startingClass, ClusterControllerData* cluster ) {
|
ACTOR Future<Void> workerAvailabilityWatch( WorkerInterface worker, ProcessClass startingClass, ClusterControllerData* cluster ) {
|
||||||
state Future<Void> failed = worker.address() == g_network->getLocalAddress() ? Never() : waitFailureClient( worker.waitFailure, SERVER_KNOBS->WORKER_FAILURE_TIME );
|
state Future<Void> failed = worker.address() == g_network->getLocalAddress() ? Never() : waitFailureClient( worker.waitFailure, SERVER_KNOBS->WORKER_FAILURE_TIME );
|
||||||
cluster->updateWorkerList.set( worker.locality.processId(), ProcessData(worker.locality, startingClass, worker.address()) );
|
cluster->updateWorkerList.set( worker.locality.processId(), ProcessData(worker.locality, startingClass, worker.address()) );
|
||||||
|
// This switching avoids a race where the worker can be added to id_worker map after the workerAvailabilityWatch fails for the worker.
|
||||||
|
wait(delay(0));
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
choose {
|
choose {
|
||||||
when( wait( IFailureMonitor::failureMonitor().onStateEqual( worker.storage.getEndpoint(), FailureStatus(IFailureMonitor::failureMonitor().getState( worker.storage.getEndpoint() ).isAvailable()) ) ) ) {
|
when( wait( IFailureMonitor::failureMonitor().onStateEqual( worker.storage.getEndpoint(), FailureStatus(IFailureMonitor::failureMonitor().getState( worker.storage.getEndpoint() ).isAvailable()) ) ) ) {
|
||||||
|
|
|
@ -242,10 +242,9 @@ struct ConsistencyCheckWorkload : TestWorkload
|
||||||
bool hasExtraStores = wait( self->checkForExtraDataStores(cx, self) );
|
bool hasExtraStores = wait( self->checkForExtraDataStores(cx, self) );
|
||||||
|
|
||||||
//Check that each machine is operating as its desired class
|
//Check that each machine is operating as its desired class
|
||||||
//FIXME: re-enable
|
bool usingDesiredClasses = wait(self->checkUsingDesiredClasses(cx, self));
|
||||||
//bool usingDesiredClasses = wait(self->checkUsingDesiredClasses(cx, self));
|
if(!usingDesiredClasses)
|
||||||
//if(!usingDesiredClasses)
|
self->testFailure("Cluster has machine(s) not using requested classes");
|
||||||
// self->testFailure("Cluster has machine(s) not using requested classes");
|
|
||||||
|
|
||||||
bool workerListCorrect = wait( self->checkWorkerList(cx, self) );
|
bool workerListCorrect = wait( self->checkWorkerList(cx, self) );
|
||||||
if(!workerListCorrect)
|
if(!workerListCorrect)
|
||||||
|
@ -1202,7 +1201,7 @@ struct ConsistencyCheckWorkload : TestWorkload
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static ProcessClass::Fitness getBestAvailableFitness(std::set<ProcessClass::ClassType>& availableClassTypes, ProcessClass::ClusterRole role) {
|
static ProcessClass::Fitness getBestAvailableFitness(std::vector<ProcessClass::ClassType>& availableClassTypes, ProcessClass::ClusterRole role) {
|
||||||
ProcessClass::Fitness bestAvailableFitness = ProcessClass::NeverAssign;
|
ProcessClass::Fitness bestAvailableFitness = ProcessClass::NeverAssign;
|
||||||
for (auto classType : availableClassTypes) {
|
for (auto classType : availableClassTypes) {
|
||||||
bestAvailableFitness = std::min(bestAvailableFitness, ProcessClass(classType, ProcessClass::InvalidSource).machineClassFitness(role));
|
bestAvailableFitness = std::min(bestAvailableFitness, ProcessClass(classType, ProcessClass::InvalidSource).machineClassFitness(role));
|
||||||
|
@ -1211,67 +1210,139 @@ struct ConsistencyCheckWorkload : TestWorkload
|
||||||
return bestAvailableFitness;
|
return bestAvailableFitness;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <class T>
|
||||||
|
static std::string getOptionalString(Optional<T> opt) {
|
||||||
|
if (opt.present())
|
||||||
|
return opt.get().toString();
|
||||||
|
return "NotSet";
|
||||||
|
}
|
||||||
|
|
||||||
|
typedef std::pair<WorkerInterface, ProcessClass> WorkerClassPair;
|
||||||
//Returns true if all machines in the cluster that specified a desired class are operating in that class
|
//Returns true if all machines in the cluster that specified a desired class are operating in that class
|
||||||
ACTOR Future<bool> checkUsingDesiredClasses(Database cx, ConsistencyCheckWorkload *self)
|
ACTOR Future<bool> checkUsingDesiredClasses(Database cx, ConsistencyCheckWorkload *self) {
|
||||||
{
|
state Optional<Key> expectedPrimaryDcId;
|
||||||
state vector<std::pair<WorkerInterface, ProcessClass>> allWorkers = wait( getWorkers( self->dbInfo ) );
|
state Optional<Key> expectedRemoteDcId;
|
||||||
state vector<std::pair<WorkerInterface, ProcessClass>> nonExcludedWorkers = wait( getWorkers( self->dbInfo, GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY ) );
|
state DatabaseConfiguration config = wait(getDatabaseConfiguration(cx));
|
||||||
state vector<StorageServerInterface> storageServers = wait( getStorageServers( cx ) );
|
state vector<WorkerClassPair> allWorkers = wait(getWorkers(self->dbInfo));
|
||||||
|
state vector<WorkerClassPair> nonExcludedWorkers = wait(getWorkers(self->dbInfo, GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY));
|
||||||
auto& db = self->dbInfo->get();
|
auto& db = self->dbInfo->get();
|
||||||
|
|
||||||
std::set<ProcessClass::ClassType> allClassTypes;
|
std::map<NetworkAddress, WorkerClassPair> allWorkerProcessMap;
|
||||||
std::map<NetworkAddress, ProcessClass> allWorkerProcessMap;
|
std::map<Optional<Key>, std::vector<ProcessClass::ClassType>> dcToAllClassTypes;
|
||||||
for (auto worker : allWorkers) {
|
for (auto worker : allWorkers) {
|
||||||
allClassTypes.insert(worker.second.classType());
|
allWorkerProcessMap[worker.first.address()] = worker;
|
||||||
allWorkerProcessMap[worker.first.address()] = worker.second;
|
Optional<Key> dc = worker.first.locality._data[LocalityData::keyDcId];
|
||||||
|
if (!dcToAllClassTypes.count(dc))
|
||||||
|
dcToAllClassTypes.insert({});
|
||||||
|
dcToAllClassTypes[dc].push_back(worker.second.classType());
|
||||||
}
|
}
|
||||||
|
|
||||||
std::set<ProcessClass::ClassType> nonExcludedClassTypes;
|
std::map<NetworkAddress, WorkerClassPair> nonExcludedWorkerProcessMap;
|
||||||
std::map<NetworkAddress, ProcessClass> nonExcludedWorkerProcessMap;
|
std::map<Optional<Key>, std::vector<ProcessClass::ClassType>> dcToNonExcludedClassTypes;
|
||||||
for (auto worker : nonExcludedWorkers) {
|
for (auto worker : nonExcludedWorkers) {
|
||||||
nonExcludedClassTypes.insert(worker.second.classType());
|
nonExcludedWorkerProcessMap[worker.first.address()] = worker;
|
||||||
nonExcludedWorkerProcessMap[worker.first.address()] = worker.second;
|
Optional<Key> dc = worker.first.locality._data[LocalityData::keyDcId];
|
||||||
|
if (!dcToNonExcludedClassTypes.count(dc))
|
||||||
|
dcToNonExcludedClassTypes.insert({});
|
||||||
|
dcToNonExcludedClassTypes[dc].push_back(worker.second.classType());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check cluster controller
|
if (!allWorkerProcessMap.count(db.clusterInterface.clientInterface.address())) {
|
||||||
ProcessClass::Fitness bestClusterControllerFitness = getBestAvailableFitness(nonExcludedClassTypes, ProcessClass::ClusterController);
|
TraceEvent("ConsistencyCheck_CCNotInWorkerList").detail("CCAddress", db.clusterInterface.clientInterface.address().toString());
|
||||||
if (!nonExcludedWorkerProcessMap.count(db.clusterInterface.clientInterface.address()) || nonExcludedWorkerProcessMap[db.clusterInterface.clientInterface.address()].machineClassFitness(ProcessClass::ClusterController) != bestClusterControllerFitness) {
|
return false;
|
||||||
TraceEvent("ConsistencyCheck_ClusterControllerNotBest").detail("BestClusterControllerFitness", bestClusterControllerFitness).detail("ExistingClusterControllerFit", nonExcludedWorkerProcessMap.count(db.clusterInterface.clientInterface.address()) ? nonExcludedWorkerProcessMap[db.clusterInterface.clientInterface.address()].machineClassFitness(ProcessClass::ClusterController) : -1);
|
}
|
||||||
|
if (!allWorkerProcessMap.count(db.master.address())) {
|
||||||
|
TraceEvent("ConsistencyCheck_MasterNotInWorkerList").detail("MasterAddress", db.master.address().toString());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check master
|
Optional<Key> ccDcId = allWorkerProcessMap[db.clusterInterface.clientInterface.address()].first.locality._data[LocalityData::keyDcId];
|
||||||
ProcessClass::Fitness bestMasterFitness = getBestAvailableFitness(nonExcludedClassTypes, ProcessClass::Master);
|
Optional<Key> masterDcId = allWorkerProcessMap[db.master.address()].first.locality._data[LocalityData::keyDcId];
|
||||||
|
|
||||||
|
if (ccDcId != masterDcId) {
|
||||||
|
TraceEvent("ConsistencyCheck_CCAndMasterNotInSameDC").detail("ClusterControllerDcId", getOptionalString(ccDcId)).detail("MasterDcId", getOptionalString(masterDcId));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// Check if master and cluster controller are in the desired DC for fearless cluster when running under simulation
|
||||||
|
// FIXME: g_simulator.datacenterDead could return false positives. Relaxing checks until it is fixed.
|
||||||
|
if (g_network->isSimulated() && config.usableRegions> 1 && g_simulator.primaryDcId.present() &&
|
||||||
|
!g_simulator.datacenterDead(g_simulator.primaryDcId) && !g_simulator.datacenterDead(g_simulator.remoteDcId)) {
|
||||||
|
expectedPrimaryDcId = config.regions[0].dcId;
|
||||||
|
expectedRemoteDcId = config.regions[1].dcId;
|
||||||
|
// If the priorities are equal, either could be the primary
|
||||||
|
if (config.regions[0].priority == config.regions[1].priority) {
|
||||||
|
expectedPrimaryDcId = masterDcId;
|
||||||
|
expectedRemoteDcId = config.regions[0].dcId == expectedPrimaryDcId.get() ? config.regions[1].dcId : config.regions[0].dcId;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ccDcId != expectedPrimaryDcId) {
|
||||||
|
TraceEvent("ConsistencyCheck_ClusterControllerDcNotBest").detail("PreferredDcId", getOptionalString(expectedPrimaryDcId)).detail("ExistingDcId", getOptionalString(ccDcId));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (masterDcId != expectedPrimaryDcId) {
|
||||||
|
TraceEvent("ConsistencyCheck_MasterDcNotBest").detail("PreferredDcId", getOptionalString(expectedPrimaryDcId)).detail("ExistingDcId", getOptionalString(masterDcId));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check CC
|
||||||
|
ProcessClass::Fitness bestClusterControllerFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[ccDcId], ProcessClass::ClusterController);
|
||||||
|
if (!nonExcludedWorkerProcessMap.count(db.clusterInterface.clientInterface.address()) || nonExcludedWorkerProcessMap[db.clusterInterface.clientInterface.address()].second.machineClassFitness(ProcessClass::ClusterController) != bestClusterControllerFitness) {
|
||||||
|
TraceEvent("ConsistencyCheck_ClusterControllerNotBest").detail("BestClusterControllerFitness", bestClusterControllerFitness).detail("ExistingClusterControllerFit", nonExcludedWorkerProcessMap.count(db.clusterInterface.clientInterface.address()) ? nonExcludedWorkerProcessMap[db.clusterInterface.clientInterface.address()].second.machineClassFitness(ProcessClass::ClusterController) : -1);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check Master
|
||||||
|
ProcessClass::Fitness bestMasterFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[masterDcId], ProcessClass::Master);
|
||||||
if (bestMasterFitness == ProcessClass::NeverAssign) {
|
if (bestMasterFitness == ProcessClass::NeverAssign) {
|
||||||
bestMasterFitness = getBestAvailableFitness(allClassTypes, ProcessClass::Master);
|
bestMasterFitness = getBestAvailableFitness(dcToAllClassTypes[masterDcId], ProcessClass::Master);
|
||||||
if (bestMasterFitness != ProcessClass::NeverAssign) {
|
if (bestMasterFitness != ProcessClass::NeverAssign) {
|
||||||
bestMasterFitness = ProcessClass::ExcludeFit;
|
bestMasterFitness = ProcessClass::ExcludeFit;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!allWorkerProcessMap.count(db.master.address()) || (!nonExcludedWorkerProcessMap.count(db.master.address()) && bestMasterFitness != ProcessClass::ExcludeFit) || nonExcludedWorkerProcessMap[db.master.address()].machineClassFitness(ProcessClass::Master) != bestMasterFitness) {
|
if ((!nonExcludedWorkerProcessMap.count(db.master.address()) && bestMasterFitness != ProcessClass::ExcludeFit) || nonExcludedWorkerProcessMap[db.master.address()].second.machineClassFitness(ProcessClass::Master) != bestMasterFitness) {
|
||||||
TraceEvent("ConsistencyCheck_MasterNotBest").detail("BestMasterFitness", bestMasterFitness).detail("ExistingMasterFit", nonExcludedWorkerProcessMap.count(db.master.address()) ? nonExcludedWorkerProcessMap[db.master.address()].machineClassFitness(ProcessClass::Master) : -1);
|
TraceEvent("ConsistencyCheck_MasterNotBest").detail("BestMasterFitness", bestMasterFitness).detail("ExistingMasterFit", nonExcludedWorkerProcessMap.count(db.master.address()) ? nonExcludedWorkerProcessMap[db.master.address()].second.machineClassFitness(ProcessClass::Master) : -1);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check master proxy
|
// Check proxy
|
||||||
ProcessClass::Fitness bestMasterProxyFitness = getBestAvailableFitness(nonExcludedClassTypes, ProcessClass::Proxy);
|
ProcessClass::Fitness bestMasterProxyFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[masterDcId], ProcessClass::Proxy);
|
||||||
for (auto masterProxy : db.client.proxies) {
|
for (auto masterProxy : db.client.proxies) {
|
||||||
if (!nonExcludedWorkerProcessMap.count(masterProxy.address()) || nonExcludedWorkerProcessMap[masterProxy.address()].machineClassFitness(ProcessClass::Proxy) != bestMasterProxyFitness) {
|
if (!nonExcludedWorkerProcessMap.count(masterProxy.address()) || nonExcludedWorkerProcessMap[masterProxy.address()].second.machineClassFitness(ProcessClass::Proxy) != bestMasterProxyFitness) {
|
||||||
TraceEvent("ConsistencyCheck_ProxyNotBest").detail("BestMasterProxyFitness", bestMasterProxyFitness).detail("ExistingMasterProxyFitness", nonExcludedWorkerProcessMap.count(masterProxy.address()) ? nonExcludedWorkerProcessMap[masterProxy.address()].machineClassFitness(ProcessClass::Proxy) : -1);
|
TraceEvent("ConsistencyCheck_ProxyNotBest").detail("BestMasterProxyFitness", bestMasterProxyFitness).detail("ExistingMasterProxyFitness", nonExcludedWorkerProcessMap.count(masterProxy.address()) ? nonExcludedWorkerProcessMap[masterProxy.address()].second.machineClassFitness(ProcessClass::Proxy) : -1);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check resolver
|
// Check resolver
|
||||||
ProcessClass::Fitness bestResolverFitness = getBestAvailableFitness(nonExcludedClassTypes, ProcessClass::Resolver);
|
ProcessClass::Fitness bestResolverFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[masterDcId], ProcessClass::Resolver);
|
||||||
for (auto resolver : db.resolvers) {
|
for (auto resolver : db.resolvers) {
|
||||||
if (!nonExcludedWorkerProcessMap.count(resolver.address()) || nonExcludedWorkerProcessMap[resolver.address()].machineClassFitness(ProcessClass::Resolver) != bestResolverFitness) {
|
if (!nonExcludedWorkerProcessMap.count(resolver.address()) || nonExcludedWorkerProcessMap[resolver.address()].second.machineClassFitness(ProcessClass::Resolver) != bestResolverFitness) {
|
||||||
TraceEvent("ConsistencyCheck_ResolverNotBest").detail("BestResolverFitness", bestResolverFitness).detail("ExistingResolverFitness", nonExcludedWorkerProcessMap.count(resolver.address()) ? nonExcludedWorkerProcessMap[resolver.address()].machineClassFitness(ProcessClass::Resolver) : -1);
|
TraceEvent("ConsistencyCheck_ResolverNotBest").detail("BestResolverFitness", bestResolverFitness).detail("ExistingResolverFitness", nonExcludedWorkerProcessMap.count(resolver.address()) ? nonExcludedWorkerProcessMap[resolver.address()].second.machineClassFitness(ProcessClass::Resolver) : -1);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check LogRouter
|
||||||
|
if (g_network->isSimulated() && config.usableRegions> 1 && g_simulator.primaryDcId.present() &&
|
||||||
|
!g_simulator.datacenterDead(g_simulator.primaryDcId) && !g_simulator.datacenterDead(g_simulator.remoteDcId)) {
|
||||||
|
for (auto &tlogSet : db.logSystemConfig.tLogs) {
|
||||||
|
if (!tlogSet.isLocal && tlogSet.logRouters.size()) {
|
||||||
|
for (auto &logRouter : tlogSet.logRouters) {
|
||||||
|
if (!nonExcludedWorkerProcessMap.count(logRouter.interf().address())) {
|
||||||
|
TraceEvent("ConsistencyCheck_LogRouterNotInNonExcludedWorkers").detail("Id", logRouter.id());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (logRouter.interf().locality.dcId() != expectedRemoteDcId) {
|
||||||
|
TraceEvent("ConsistencyCheck_LogRouterNotBestDC").detail("expectedDC", getOptionalString(expectedRemoteDcId)).detail("ActualDC", getOptionalString(logRouter.interf().locality.dcId()));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: Check Tlog
|
// TODO: Check Tlog
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
|
Loading…
Reference in New Issue