replaced std::pair<WorkerInterface,ProcessClass> with a struct named WorkerDetails

This commit is contained in:
Evan Tschannen 2019-03-08 11:25:07 -05:00
parent 82d957e0bb
commit 710a64dc4e
16 changed files with 289 additions and 278 deletions

View File

@ -51,37 +51,35 @@ struct WorkerInfo : NonCopyable {
Generation gen;
int reboots;
double lastAvailableTime;
WorkerInterface interf;
ProcessClass initialClass;
ProcessClass processClass;
ClusterControllerPriorityInfo priorityInfo;
WorkerDetails details;
WorkerInfo() : gen(-1), reboots(0), lastAvailableTime(now()), priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown) {}
WorkerInfo( Future<Void> watcher, ReplyPromise<RegisterWorkerReply> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo ) :
watcher(watcher), reply(reply), gen(gen), reboots(0), lastAvailableTime(now()), interf(interf), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo) {}
WorkerInfo( Future<Void> watcher, ReplyPromise<RegisterWorkerReply> reply, Generation gen, WorkerInterface interf, ProcessClass initialClass, ProcessClass processClass, ClusterControllerPriorityInfo priorityInfo, bool degraded ) :
watcher(watcher), reply(reply), gen(gen), reboots(0), lastAvailableTime(now()), initialClass(initialClass), priorityInfo(priorityInfo), details(interf, processClass, degraded) {}
WorkerInfo( WorkerInfo&& r ) noexcept(true) : watcher(std::move(r.watcher)), reply(std::move(r.reply)), gen(r.gen),
reboots(r.reboots), lastAvailableTime(r.lastAvailableTime), interf(std::move(r.interf)), initialClass(r.initialClass), processClass(r.processClass), priorityInfo(r.priorityInfo) {}
reboots(r.reboots), lastAvailableTime(r.lastAvailableTime), initialClass(r.initialClass), priorityInfo(r.priorityInfo), details(std::move(r.details)) {}
void operator=( WorkerInfo&& r ) noexcept(true) {
watcher = std::move(r.watcher);
reply = std::move(r.reply);
gen = r.gen;
reboots = r.reboots;
lastAvailableTime = r.lastAvailableTime;
interf = std::move(r.interf);
initialClass = r.initialClass;
processClass = r.processClass;
priorityInfo = r.priorityInfo;
details = std::move(r.details);
}
};
struct WorkerFitnessInfo {
std::pair<WorkerInterface, ProcessClass> worker;
WorkerDetails worker;
ProcessClass::Fitness fitness;
int used;
WorkerFitnessInfo() : fitness(ProcessClass::NeverAssign), used(0) {}
WorkerFitnessInfo(std::pair<WorkerInterface, ProcessClass> worker, ProcessClass::Fitness fitness, int used) : worker(worker), fitness(fitness), used(used) {}
WorkerFitnessInfo(WorkerDetails worker, ProcessClass::Fitness fitness, int used) : worker(worker), fitness(fitness), used(used) {}
};
class ClusterControllerData {
@ -184,35 +182,35 @@ public:
};
bool workerAvailable( WorkerInfo const& worker, bool checkStable ) {
return ( now() - startTime < 2 * FLOW_KNOBS->SERVER_REQUEST_INTERVAL ) || ( IFailureMonitor::failureMonitor().getState(worker.interf.storage.getEndpoint()).isAvailable() && ( !checkStable || worker.reboots < 2 ) );
return ( now() - startTime < 2 * FLOW_KNOBS->SERVER_REQUEST_INTERVAL ) || ( IFailureMonitor::failureMonitor().getState(worker.details.interf.storage.getEndpoint()).isAvailable() && ( !checkStable || worker.reboots < 2 ) );
}
std::pair<WorkerInterface, ProcessClass> getStorageWorker( RecruitStorageRequest const& req ) {
WorkerDetails getStorageWorker( RecruitStorageRequest const& req ) {
std::set<Optional<Standalone<StringRef>>> excludedMachines( req.excludeMachines.begin(), req.excludeMachines.end() );
std::set<Optional<Standalone<StringRef>>> includeDCs( req.includeDCs.begin(), req.includeDCs.end() );
std::set<AddressExclusion> excludedAddresses( req.excludeAddresses.begin(), req.excludeAddresses.end() );
for( auto& it : id_worker )
if( workerAvailable( it.second, false ) &&
!excludedMachines.count(it.second.interf.locality.zoneId()) &&
( includeDCs.size() == 0 || includeDCs.count(it.second.interf.locality.dcId()) ) &&
!addressExcluded(excludedAddresses, it.second.interf.address()) &&
it.second.processClass.machineClassFitness( ProcessClass::Storage ) <= ProcessClass::UnsetFit ) {
return std::make_pair(it.second.interf, it.second.processClass);
!excludedMachines.count(it.second.details.interf.locality.zoneId()) &&
( includeDCs.size() == 0 || includeDCs.count(it.second.details.interf.locality.dcId()) ) &&
!addressExcluded(excludedAddresses, it.second.details.interf.address()) &&
it.second.details.processClass.machineClassFitness( ProcessClass::Storage ) <= ProcessClass::UnsetFit ) {
return it.second.details;
}
if( req.criticalRecruitment ) {
ProcessClass::Fitness bestFit = ProcessClass::NeverAssign;
Optional<std::pair<WorkerInterface, ProcessClass>> bestInfo;
Optional<WorkerDetails> bestInfo;
for( auto& it : id_worker ) {
ProcessClass::Fitness fit = it.second.processClass.machineClassFitness( ProcessClass::Storage );
ProcessClass::Fitness fit = it.second.details.processClass.machineClassFitness( ProcessClass::Storage );
if( workerAvailable( it.second, false ) &&
!excludedMachines.count(it.second.interf.locality.zoneId()) &&
( includeDCs.size() == 0 || includeDCs.count(it.second.interf.locality.dcId()) ) &&
!addressExcluded(excludedAddresses, it.second.interf.address()) &&
!excludedMachines.count(it.second.details.interf.locality.zoneId()) &&
( includeDCs.size() == 0 || includeDCs.count(it.second.details.interf.locality.dcId()) ) &&
!addressExcluded(excludedAddresses, it.second.details.interf.address()) &&
fit < bestFit ) {
bestFit = fit;
bestInfo = std::make_pair(it.second.interf, it.second.processClass);
bestInfo = it.second.details;
}
}
@ -224,23 +222,23 @@ public:
throw no_more_servers();
}
std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForSeedServers( DatabaseConfiguration const& conf, IRepPolicyRef const& policy, Optional<Optional<Standalone<StringRef>>> const& dcId = Optional<Optional<Standalone<StringRef>>>() ) {
std::map<ProcessClass::Fitness, vector<std::pair<WorkerInterface, ProcessClass>>> fitness_workers;
std::vector<std::pair<WorkerInterface, ProcessClass>> results;
LocalitySetRef logServerSet = Reference<LocalitySet>(new LocalityMap<std::pair<WorkerInterface, ProcessClass>>());
LocalityMap<std::pair<WorkerInterface, ProcessClass>>* logServerMap = (LocalityMap<std::pair<WorkerInterface, ProcessClass>>*) logServerSet.getPtr();
std::vector<WorkerDetails> getWorkersForSeedServers( DatabaseConfiguration const& conf, IRepPolicyRef const& policy, Optional<Optional<Standalone<StringRef>>> const& dcId = Optional<Optional<Standalone<StringRef>>>() ) {
std::map<ProcessClass::Fitness, vector<WorkerDetails>> fitness_workers;
std::vector<WorkerDetails> results;
LocalitySetRef logServerSet = Reference<LocalitySet>(new LocalityMap<WorkerDetails>());
LocalityMap<WorkerDetails>* logServerMap = (LocalityMap<WorkerDetails>*) logServerSet.getPtr();
bool bCompleted = false;
for( auto& it : id_worker ) {
auto fitness = it.second.processClass.machineClassFitness( ProcessClass::Storage );
if( workerAvailable(it.second, false) && !conf.isExcludedServer(it.second.interf.address()) && fitness != ProcessClass::NeverAssign && ( !dcId.present() || it.second.interf.locality.dcId()==dcId.get() ) ) {
fitness_workers[ fitness ].push_back(std::make_pair(it.second.interf, it.second.processClass));
auto fitness = it.second.details.processClass.machineClassFitness( ProcessClass::Storage );
if( workerAvailable(it.second, false) && !conf.isExcludedServer(it.second.details.interf.address()) && fitness != ProcessClass::NeverAssign && ( !dcId.present() || it.second.details.interf.locality.dcId()==dcId.get() ) ) {
fitness_workers[ fitness ].push_back(it.second.details);
}
}
for( auto& it : fitness_workers ) {
for (auto& worker : it.second ) {
logServerMap->add(worker.first.locality, &worker);
logServerMap->add(worker.interf.locality, &worker);
}
std::vector<LocalityEntry> bestSet;
@ -265,24 +263,24 @@ public:
return results;
}
std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogs( DatabaseConfiguration const& conf, int32_t required, int32_t desired, IRepPolicyRef const& policy, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false, std::set<Optional<Key>> dcIds = std::set<Optional<Key>>() ) {
std::map<ProcessClass::Fitness, vector<std::pair<WorkerInterface, ProcessClass>>> fitness_workers;
std::vector<std::pair<WorkerInterface, ProcessClass>> results;
std::vector<WorkerDetails> getWorkersForTlogs( DatabaseConfiguration const& conf, int32_t required, int32_t desired, IRepPolicyRef const& policy, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false, std::set<Optional<Key>> dcIds = std::set<Optional<Key>>() ) {
std::map<ProcessClass::Fitness, vector<WorkerDetails>> fitness_workers;
std::vector<WorkerDetails> results;
std::vector<LocalityData> unavailableLocals;
LocalitySetRef logServerSet;
LocalityMap<std::pair<WorkerInterface, ProcessClass>>* logServerMap;
LocalityMap<WorkerDetails>* logServerMap;
bool bCompleted = false;
logServerSet = Reference<LocalitySet>(new LocalityMap<std::pair<WorkerInterface, ProcessClass>>());
logServerMap = (LocalityMap<std::pair<WorkerInterface, ProcessClass>>*) logServerSet.getPtr();
logServerSet = Reference<LocalitySet>(new LocalityMap<WorkerDetails>());
logServerMap = (LocalityMap<WorkerDetails>*) logServerSet.getPtr();
for( auto& it : id_worker ) {
auto fitness = it.second.processClass.machineClassFitness( ProcessClass::TLog );
if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.interf.address()) && fitness != ProcessClass::NeverAssign && (!dcIds.size() || dcIds.count(it.second.interf.locality.dcId())) ) {
fitness_workers[ fitness ].push_back(std::make_pair(it.second.interf, it.second.processClass));
auto fitness = it.second.details.processClass.machineClassFitness( ProcessClass::TLog );
if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.details.interf.address()) && fitness != ProcessClass::NeverAssign && (!dcIds.size() || dcIds.count(it.second.details.interf.locality.dcId())) ) {
fitness_workers[ fitness ].push_back(it.second.details);
}
else {
unavailableLocals.push_back(it.second.interf.locality);
unavailableLocals.push_back(it.second.details.interf.locality);
}
}
@ -293,7 +291,7 @@ public:
if (fitness_workers.find(fitnessEnum) == fitness_workers.end())
continue;
for (auto& worker : fitness_workers[(ProcessClass::Fitness) fitness] ) {
logServerMap->add(worker.first.locality, &worker);
logServerMap->add(worker.interf.locality, &worker);
}
if (logServerSet->size() < required) {
TraceEvent(SevWarn,"GWFTADTooFew", id).detail("Fitness", fitness).detail("Processes", logServerSet->size()).detail("Required", required).detail("TLogPolicy", policy->info()).detail("DesiredLogs", desired);
@ -320,7 +318,7 @@ public:
auto object = logServerMap->getObject(entry);
ASSERT(object);
results.push_back(*object);
tLocalities.push_back(object->first.locality);
tLocalities.push_back(object->interf.locality);
}
TraceEvent("GWFTADBestResults", id).detail("Fitness", fitness).detail("Processes", logServerSet->size()).detail("BestCount", bestSet.size()).detail("BestZones", ::describeZones(tLocalities))
.detail("BestDataHalls", ::describeDataHalls(tLocalities)).detail("TLogPolicy", policy->info()).detail("TotalResults", results.size()).detail("DesiredLogs", desired);
@ -335,7 +333,7 @@ public:
if (!bCompleted) {
std::vector<LocalityData> tLocalities;
for (auto& object : logServerMap->getObjects()) {
tLocalities.push_back(object->first.locality);
tLocalities.push_back(object->interf.locality);
}
TraceEvent(SevWarn, "GetTLogTeamFailed").detail("Policy", policy->info()).detail("Processes", logServerSet->size()).detail("Workers", id_worker.size()).detail("FitnessGroups", fitness_workers.size())
@ -349,7 +347,7 @@ public:
}
for (auto& result : results) {
id_used[result.first.locality.processId()]++;
id_used[result.interf.locality.processId()]++;
}
TraceEvent("GetTLogTeamDone").detail("Completed", bCompleted).detail("Policy", policy->info()).detail("Results", results.size()).detail("Processes", logServerSet->size()).detail("Workers", id_worker.size())
@ -362,7 +360,7 @@ public:
}
//FIXME: This logic will fallback unnecessarily when usable dcs > 1 because it does not check all combinations of potential satellite locations
std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForSatelliteLogs( const DatabaseConfiguration& conf, const RegionInfo& region, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool& satelliteFallback, bool checkStable = false ) {
std::vector<WorkerDetails> getWorkersForSatelliteLogs( const DatabaseConfiguration& conf, const RegionInfo& region, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool& satelliteFallback, bool checkStable = false ) {
int startDC = 0;
loop {
if(startDC > 0 && startDC >= region.satellites.size() + 1 - (satelliteFallback ? region.satelliteTLogUsableDcsFallback : region.satelliteTLogUsableDcs)) {
@ -399,15 +397,15 @@ public:
}
WorkerFitnessInfo getWorkerForRoleInDatacenter(Optional<Standalone<StringRef>> const& dcId, ProcessClass::ClusterRole role, ProcessClass::Fitness unacceptableFitness, DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, bool checkStable = false ) {
std::map<std::pair<ProcessClass::Fitness,int>, vector<std::pair<WorkerInterface, ProcessClass>>> fitness_workers;
std::map<std::pair<ProcessClass::Fitness,int>, vector<WorkerDetails>> fitness_workers;
for( auto& it : id_worker ) {
auto fitness = it.second.processClass.machineClassFitness( role );
if(conf.isExcludedServer(it.second.interf.address())) {
auto fitness = it.second.details.processClass.machineClassFitness( role );
if(conf.isExcludedServer(it.second.details.interf.address())) {
fitness = std::max(fitness, ProcessClass::ExcludeFit);
}
if( workerAvailable(it.second, checkStable) && fitness < unacceptableFitness && it.second.interf.locality.dcId()==dcId ) {
fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].push_back(std::make_pair(it.second.interf, it.second.processClass));
if( workerAvailable(it.second, checkStable) && fitness < unacceptableFitness && it.second.details.interf.locality.dcId()==dcId ) {
fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].push_back(it.second.details);
}
}
@ -415,7 +413,7 @@ public:
auto& w = it.second;
g_random->randomShuffle(w);
for( int i=0; i < w.size(); i++ ) {
id_used[w[i].first.locality.processId()]++;
id_used[w[i].interf.locality.processId()]++;
return WorkerFitnessInfo(w[i], it.first.first, it.first.second);
}
}
@ -423,17 +421,17 @@ public:
throw no_more_servers();
}
vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForRoleInDatacenter(Optional<Standalone<StringRef>> const& dcId, ProcessClass::ClusterRole role, int amount, DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, Optional<WorkerFitnessInfo> minWorker = Optional<WorkerFitnessInfo>(), bool checkStable = false ) {
std::map<std::pair<ProcessClass::Fitness,int>, vector<std::pair<WorkerInterface, ProcessClass>>> fitness_workers;
vector<std::pair<WorkerInterface, ProcessClass>> results;
vector<WorkerDetails> getWorkersForRoleInDatacenter(Optional<Standalone<StringRef>> const& dcId, ProcessClass::ClusterRole role, int amount, DatabaseConfiguration const& conf, std::map< Optional<Standalone<StringRef>>, int>& id_used, Optional<WorkerFitnessInfo> minWorker = Optional<WorkerFitnessInfo>(), bool checkStable = false ) {
std::map<std::pair<ProcessClass::Fitness,int>, vector<WorkerDetails>> fitness_workers;
vector<WorkerDetails> results;
if (amount <= 0)
return results;
for( auto& it : id_worker ) {
auto fitness = it.second.processClass.machineClassFitness( role );
if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.interf.address()) && it.second.interf.locality.dcId() == dcId &&
( !minWorker.present() || ( it.second.interf.id() != minWorker.get().worker.first.id() && ( fitness < minWorker.get().fitness || (fitness == minWorker.get().fitness && id_used[it.first] <= minWorker.get().used ) ) ) ) ) {
fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].push_back(std::make_pair(it.second.interf, it.second.processClass));
auto fitness = it.second.details.processClass.machineClassFitness( role );
if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.details.interf.address()) && it.second.details.interf.locality.dcId() == dcId &&
( !minWorker.present() || ( it.second.details.interf.id() != minWorker.get().worker.interf.id() && ( fitness < minWorker.get().fitness || (fitness == minWorker.get().fitness && id_used[it.first] <= minWorker.get().used ) ) ) ) ) {
fitness_workers[ std::make_pair(fitness, id_used[it.first]) ].push_back(it.second.details);
}
}
@ -442,7 +440,7 @@ public:
g_random->randomShuffle(w);
for( int i=0; i < w.size(); i++ ) {
results.push_back(w[i]);
id_used[w[i].first.locality.processId()]++;
id_used[w[i].interf.locality.processId()]++;
if( results.size() == amount )
return results;
}
@ -465,11 +463,11 @@ public:
RoleFitness(RoleFitness first, RoleFitness second, ProcessClass::ClusterRole role) : bestFit(std::min(first.worstFit, second.worstFit)), worstFit(std::max(first.worstFit, second.worstFit)), count(first.count + second.count), role(role) { }
RoleFitness( vector<std::pair<WorkerInterface, ProcessClass>> workers, ProcessClass::ClusterRole role ) : role(role) {
RoleFitness( vector<WorkerDetails> workers, ProcessClass::ClusterRole role ) : role(role) {
worstFit = ProcessClass::BestFit;
bestFit = ProcessClass::NeverAssign;
for(auto it : workers) {
auto thisFit = it.second.machineClassFitness( role );
auto thisFit = it.processClass.machineClassFitness( role );
worstFit = std::max(worstFit, thisFit);
bestFit = std::min(bestFit, thisFit);
}
@ -513,8 +511,8 @@ public:
std::set<Optional<Standalone<StringRef>>> getDatacenters( DatabaseConfiguration const& conf, bool checkStable = false ) {
std::set<Optional<Standalone<StringRef>>> result;
for( auto& it : id_worker )
if( workerAvailable( it.second, checkStable ) && !conf.isExcludedServer( it.second.interf.address() ) )
result.insert(it.second.interf.locality.dcId());
if( workerAvailable( it.second, checkStable ) && !conf.isExcludedServer( it.second.details.interf.address() ) )
result.insert(it.second.details.interf.locality.dcId());
return result;
}
@ -537,12 +535,12 @@ public:
auto remoteLogs = getWorkersForTlogs( req.configuration, req.configuration.getRemoteTLogReplicationFactor(), req.configuration.getDesiredRemoteLogs(), req.configuration.getRemoteTLogPolicy(), id_used, false, remoteDC );
for(int i = 0; i < remoteLogs.size(); i++) {
result.remoteTLogs.push_back(remoteLogs[i].first);
result.remoteTLogs.push_back(remoteLogs[i].interf);
}
auto logRouters = getWorkersForRoleInDatacenter( req.dcId, ProcessClass::LogRouter, req.logRouterCount, req.configuration, id_used );
for(int i = 0; i < logRouters.size(); i++) {
result.logRouters.push_back(logRouters[i].first);
result.logRouters.push_back(logRouters[i].interf);
}
if(!remoteStartTime.present()) {
@ -587,20 +585,20 @@ public:
if(req.recruitSeedServers) {
auto primaryStorageServers = getWorkersForSeedServers( req.configuration, req.configuration.storagePolicy, dcId );
for(int i = 0; i < primaryStorageServers.size(); i++) {
result.storageServers.push_back(primaryStorageServers[i].first);
result.storageServers.push_back(primaryStorageServers[i].interf);
}
}
auto tlogs = getWorkersForTlogs( req.configuration, req.configuration.tLogReplicationFactor, req.configuration.getDesiredLogs(), req.configuration.tLogPolicy, id_used, false, primaryDC );
for(int i = 0; i < tlogs.size(); i++) {
result.tLogs.push_back(tlogs[i].first);
result.tLogs.push_back(tlogs[i].interf);
}
std::vector<std::pair<WorkerInterface, ProcessClass>> satelliteLogs;
std::vector<WorkerDetails> satelliteLogs;
if(region.satelliteTLogReplicationFactor > 0) {
satelliteLogs = getWorkersForSatelliteLogs( req.configuration, region, id_used, result.satelliteFallback );
for(int i = 0; i < satelliteLogs.size(); i++) {
result.satelliteTLogs.push_back(satelliteLogs[i].first);
result.satelliteTLogs.push_back(satelliteLogs[i].interf);
}
}
@ -614,13 +612,13 @@ public:
resolvers.push_back(first_resolver.worker);
for(int i = 0; i < resolvers.size(); i++)
result.resolvers.push_back(resolvers[i].first);
result.resolvers.push_back(resolvers[i].interf);
for(int i = 0; i < proxies.size(); i++)
result.proxies.push_back(proxies[i].first);
result.proxies.push_back(proxies[i].interf);
auto oldLogRouters = getWorkersForRoleInDatacenter( dcId, ProcessClass::LogRouter, req.maxOldLogRouters, req.configuration, id_used );
for(int i = 0; i < oldLogRouters.size(); i++) {
result.oldLogRouters.push_back(oldLogRouters[i].first);
result.oldLogRouters.push_back(oldLogRouters[i].interf);
}
if( now() - startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY &&
@ -699,13 +697,13 @@ public:
updateKnownIds(&id_used);
auto tlogs = getWorkersForTlogs( req.configuration, req.configuration.tLogReplicationFactor, req.configuration.getDesiredLogs(), req.configuration.tLogPolicy, id_used );
for(int i = 0; i < tlogs.size(); i++) {
result.tLogs.push_back(tlogs[i].first);
result.tLogs.push_back(tlogs[i].interf);
}
if(req.recruitSeedServers) {
auto primaryStorageServers = getWorkersForSeedServers( req.configuration, req.configuration.storagePolicy );
for(int i = 0; i < primaryStorageServers.size(); i++)
result.storageServers.push_back(primaryStorageServers[i].first);
result.storageServers.push_back(primaryStorageServers[i].interf);
}
auto datacenters = getDatacenters( req.configuration );
@ -733,13 +731,13 @@ public:
bestFitness = fitness;
bestDC = dcId;
for(int i = 0; i < resolvers.size(); i++)
result.resolvers.push_back(resolvers[i].first);
result.resolvers.push_back(resolvers[i].interf);
for(int i = 0; i < proxies.size(); i++)
result.proxies.push_back(proxies[i].first);
result.proxies.push_back(proxies[i].interf);
auto oldLogRouters = getWorkersForRoleInDatacenter( dcId, ProcessClass::LogRouter, req.maxOldLogRouters, req.configuration, used );
for(int i = 0; i < oldLogRouters.size(); i++) {
result.oldLogRouters.push_back(oldLogRouters[i].first);
result.oldLogRouters.push_back(oldLogRouters[i].interf);
}
break;
} else {
@ -851,10 +849,10 @@ public:
}
// Get tlog processes
std::vector<std::pair<WorkerInterface, ProcessClass>> tlogs;
std::vector<std::pair<WorkerInterface, ProcessClass>> remote_tlogs;
std::vector<std::pair<WorkerInterface, ProcessClass>> satellite_tlogs;
std::vector<std::pair<WorkerInterface, ProcessClass>> log_routers;
std::vector<WorkerDetails> tlogs;
std::vector<WorkerDetails> remote_tlogs;
std::vector<WorkerDetails> satellite_tlogs;
std::vector<WorkerDetails> log_routers;
std::set<NetworkAddress> logRouterAddresses;
for( auto& logSet : dbi.logSystemConfig.tLogs ) {
@ -866,12 +864,12 @@ public:
return true;
if(logSet.isLocal && logSet.locality == tagLocalitySatellite) {
satellite_tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
satellite_tlogs.push_back(tlogWorker->second.details);
}
else if(logSet.isLocal) {
tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
tlogs.push_back(tlogWorker->second.details);
} else {
remote_tlogs.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
remote_tlogs.push_back(tlogWorker->second.details);
}
}
@ -881,37 +879,37 @@ public:
return false;
if ( tlogWorker->second.priorityInfo.isExcluded )
return true;
if( !logRouterAddresses.count( tlogWorker->second.interf.address() ) ) {
logRouterAddresses.insert( tlogWorker->second.interf.address() );
log_routers.push_back(std::make_pair(tlogWorker->second.interf, tlogWorker->second.processClass));
if( !logRouterAddresses.count( tlogWorker->second.details.interf.address() ) ) {
logRouterAddresses.insert( tlogWorker->second.details.interf.address() );
log_routers.push_back(tlogWorker->second.details);
}
}
}
// Get proxy classes
std::vector<ProcessClass> proxyClasses;
std::vector<WorkerDetails> proxyClasses;
for(auto& it : dbi.client.proxies ) {
auto proxyWorker = id_worker.find(it.locality.processId());
if ( proxyWorker == id_worker.end() )
return false;
if ( proxyWorker->second.priorityInfo.isExcluded )
return true;
proxyClasses.push_back(proxyWorker->second.processClass);
proxyClasses.push_back(proxyWorker->second.details);
}
// Get resolver classes
std::vector<ProcessClass> resolverClasses;
std::vector<WorkerDetails> resolverClasses;
for(auto& it : dbi.resolvers ) {
auto resolverWorker = id_worker.find(it.locality.processId());
if ( resolverWorker == id_worker.end() )
return false;
if ( resolverWorker->second.priorityInfo.isExcluded )
return true;
resolverClasses.push_back(resolverWorker->second.processClass);
resolverClasses.push_back(resolverWorker->second.details);
}
// Check master fitness. Don't return false if master is excluded in case all the processes are excluded, we still need master for recovery.
ProcessClass::Fitness oldMasterFit = masterWorker->second.processClass.machineClassFitness( ProcessClass::Master );
ProcessClass::Fitness oldMasterFit = masterWorker->second.details.processClass.machineClassFitness( ProcessClass::Master );
if(db.config.isExcludedServer(dbi.master.address())) {
oldMasterFit = std::max(oldMasterFit, ProcessClass::ExcludeFit);
}
@ -925,7 +923,7 @@ public:
if ( oldMasterFit < mworker.fitness )
return false;
if ( oldMasterFit > mworker.fitness || ( dbi.master.locality.processId() == clusterControllerProcessId && mworker.worker.first.locality.processId() != clusterControllerProcessId ) )
if ( oldMasterFit > mworker.fitness || ( dbi.master.locality.processId() == clusterControllerProcessId && mworker.worker.interf.locality.processId() != clusterControllerProcessId ) )
return true;
std::set<Optional<Key>> primaryDC;
@ -1107,9 +1105,9 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
id_used[cluster->db.serverInfo->get().distributor.get().locality.processId()]++;
}
state WorkerFitnessInfo masterWorker = cluster->getWorkerForRoleInDatacenter(cluster->clusterControllerDcId, ProcessClass::Master, ProcessClass::NeverAssign, db->config, id_used);
if( ( masterWorker.worker.second.machineClassFitness( ProcessClass::Master ) > SERVER_KNOBS->EXPECTED_MASTER_FITNESS || masterWorker.worker.first.locality.processId() == cluster->clusterControllerProcessId )
if( ( masterWorker.worker.processClass.machineClassFitness( ProcessClass::Master ) > SERVER_KNOBS->EXPECTED_MASTER_FITNESS || masterWorker.worker.interf.locality.processId() == cluster->clusterControllerProcessId )
&& now() - cluster->startTime < SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY ) {
TraceEvent("CCWDB", cluster->id).detail("Fitness", masterWorker.worker.second.machineClassFitness( ProcessClass::Master ));
TraceEvent("CCWDB", cluster->id).detail("Fitness", masterWorker.worker.processClass.machineClassFitness( ProcessClass::Master ));
wait( delay(SERVER_KNOBS->ATTEMPT_RECRUITMENT_DELAY) );
continue;
}
@ -1117,9 +1115,9 @@ ACTOR Future<Void> clusterWatchDatabase( ClusterControllerData* cluster, Cluster
rmq.lifetime = db->serverInfo->get().masterLifetime;
rmq.forceRecovery = db->forceRecovery;
cluster->masterProcessId = masterWorker.worker.first.locality.processId();
cluster->masterProcessId = masterWorker.worker.interf.locality.processId();
cluster->db.unfinishedRecoveries++;
state Future<ErrorOr<MasterInterface>> fNewMaster = masterWorker.worker.first.master.tryGetReply( rmq );
state Future<ErrorOr<MasterInterface>> fNewMaster = masterWorker.worker.interf.master.tryGetReply( rmq );
wait( ready(fNewMaster) || db->forceMasterFailure.onTrigger() );
if (fNewMaster.isReady() && fNewMaster.get().present()) {
TraceEvent("CCWDB", cluster->id).detail("Recruited", fNewMaster.get().get().id());
@ -1296,8 +1294,8 @@ void checkOutstandingStorageRequests( ClusterControllerData* self ) {
auto worker = self->getStorageWorker(req.first);
RecruitStorageReply rep;
rep.worker = worker.first;
rep.processClass = worker.second;
rep.worker = worker.interf;
rep.processClass = worker.processClass;
req.first.reply.send( rep );
swapAndPop( &self->outstandingStorageRequests, i-- );
}
@ -1379,7 +1377,7 @@ ACTOR Future<Void> workerAvailabilityWatch( WorkerInterface worker, ProcessClass
when( wait( failed ) ) { // remove workers that have failed
WorkerInfo& failedWorkerInfo = cluster->id_worker[ worker.locality.processId() ];
if (!failedWorkerInfo.reply.isSet()) {
failedWorkerInfo.reply.send( RegisterWorkerReply(failedWorkerInfo.processClass, failedWorkerInfo.priorityInfo) );
failedWorkerInfo.reply.send( RegisterWorkerReply(failedWorkerInfo.details.processClass, failedWorkerInfo.priorityInfo) );
}
cluster->id_worker.erase( worker.locality.processId() );
cluster->updateWorkerList.set( worker.locality.processId(), Optional<ProcessData>() );
@ -1544,8 +1542,8 @@ void clusterRecruitStorage( ClusterControllerData* self, RecruitStorageRequest r
throw no_more_servers();
auto worker = self->getStorageWorker(req);
RecruitStorageReply rep;
rep.worker = worker.first;
rep.processClass = worker.second;
rep.worker = worker.interf;
rep.processClass = worker.processClass;
req.reply.send( rep );
} catch ( Error& e ) {
if (e.code() == error_code_no_more_servers) {
@ -1636,11 +1634,11 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
self->gotFullyRecoveredConfig = true;
db->fullyRecoveredConfig = req.configuration.get();
for ( auto& it : self->id_worker ) {
bool isExcludedFromConfig = db->fullyRecoveredConfig.isExcludedServer(it.second.interf.address());
bool isExcludedFromConfig = db->fullyRecoveredConfig.isExcludedServer(it.second.details.interf.address());
if ( it.second.priorityInfo.isExcluded != isExcludedFromConfig ) {
it.second.priorityInfo.isExcluded = isExcludedFromConfig;
if( !it.second.reply.isSet() ) {
it.second.reply.send( RegisterWorkerReply( it.second.processClass, it.second.priorityInfo ) );
it.second.reply.send( RegisterWorkerReply( it.second.details.processClass, it.second.priorityInfo ) );
}
}
}
@ -1727,7 +1725,7 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
}
// Check process class and exclusive property
if ( info == self->id_worker.end() || info->second.interf.id() != w.id() || req.generation >= info->second.gen ) {
if ( info == self->id_worker.end() || info->second.details.interf.id() != w.id() || req.generation >= info->second.gen ) {
if ( self->gotProcessClasses ) {
auto classIter = self->id_class.find(w.locality.processId());
@ -1755,23 +1753,23 @@ void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
self->db.setDistributor( di );
}
if( info == self->id_worker.end() ) {
self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, newPriorityInfo );
self->id_worker[w.locality.processId()] = WorkerInfo( workerAvailabilityWatch( w, newProcessClass, self ), req.reply, req.generation, w, req.initialClass, newProcessClass, newPriorityInfo, false );
checkOutstandingRequests( self );
return;
}
if( info->second.interf.id() != w.id() || req.generation >= info->second.gen ) {
if( info->second.details.interf.id() != w.id() || req.generation >= info->second.gen ) {
if (!info->second.reply.isSet()) {
info->second.reply.send( Never() );
}
info->second.reply = req.reply;
info->second.processClass = newProcessClass;
info->second.details.processClass = newProcessClass;
info->second.priorityInfo = newPriorityInfo;
info->second.initialClass = req.initialClass;
info->second.gen = req.generation;
if(info->second.interf.id() != w.id()) {
info->second.interf = w;
if(info->second.details.interf.id() != w.id()) {
info->second.details.interf = w;
info->second.watcher = workerAvailabilityWatch( w, newProcessClass, self );
}
checkOutstandingRequests( self );
@ -1893,9 +1891,9 @@ ACTOR Future<Void> statusServer(FutureStream< StatusRequest> requests,
}
// Get status but trap errors to send back to client.
vector<std::pair<WorkerInterface, ProcessClass>> workers;
vector<WorkerDetails> workers;
for(auto& it : self->id_worker)
workers.push_back(std::make_pair(it.second.interf, it.second.processClass));
workers.push_back(it.second.details);
std::vector<NetworkAddress> incompatibleConnections;
for(auto it = self->db.incompatibleConnections.begin(); it != self->db.incompatibleConnections.end();) {
@ -1992,11 +1990,11 @@ ACTOR Future<Void> monitorProcessClasses(ClusterControllerData *self) {
}
if (newProcessClass != w.second.processClass) {
w.second.processClass = newProcessClass;
if (newProcessClass != w.second.details.processClass) {
w.second.details.processClass = newProcessClass;
w.second.priorityInfo.processClassFitness = newProcessClass.machineClassFitness(ProcessClass::ClusterController);
if (!w.second.reply.isSet()) {
w.second.reply.send( RegisterWorkerReply(w.second.processClass, w.second.priorityInfo) );
w.second.reply.send( RegisterWorkerReply(w.second.details.processClass, w.second.priorityInfo) );
}
}
}
@ -2098,14 +2096,14 @@ ACTOR Future<Void> updatedChangingDatacenters(ClusterControllerData *self) {
self->changingDcIds.set(std::make_pair(false,self->desiredDcIds.get()));
} else {
auto& worker = self->id_worker[self->clusterControllerProcessId];
uint8_t newFitness = ClusterControllerPriorityInfo::calculateDCFitness( worker.interf.locality.dcId(), self->desiredDcIds.get().get() );
uint8_t newFitness = ClusterControllerPriorityInfo::calculateDCFitness( worker.details.interf.locality.dcId(), self->desiredDcIds.get().get() );
self->changingDcIds.set(std::make_pair(worker.priorityInfo.dcFitness > newFitness,self->desiredDcIds.get()));
TraceEvent("UpdateChangingDatacenter", self->id).detail("OldFitness", worker.priorityInfo.dcFitness).detail("NewFitness", newFitness);
if ( worker.priorityInfo.dcFitness > newFitness ) {
worker.priorityInfo.dcFitness = newFitness;
if(!worker.reply.isSet()) {
worker.reply.send( RegisterWorkerReply( worker.processClass, worker.priorityInfo ) );
worker.reply.send( RegisterWorkerReply( worker.details.processClass, worker.priorityInfo ) );
}
} else {
state int currentFit = ProcessClass::BestFit;
@ -2113,12 +2111,12 @@ ACTOR Future<Void> updatedChangingDatacenters(ClusterControllerData *self) {
bool updated = false;
for ( auto& it : self->id_worker ) {
if( ( !it.second.priorityInfo.isExcluded && it.second.priorityInfo.processClassFitness == currentFit ) || currentFit == ProcessClass::NeverAssign ) {
uint8_t fitness = ClusterControllerPriorityInfo::calculateDCFitness( it.second.interf.locality.dcId(), self->changingDcIds.get().second.get() );
uint8_t fitness = ClusterControllerPriorityInfo::calculateDCFitness( it.second.details.interf.locality.dcId(), self->changingDcIds.get().second.get() );
if ( it.first != self->clusterControllerProcessId && it.second.priorityInfo.dcFitness != fitness ) {
updated = true;
it.second.priorityInfo.dcFitness = fitness;
if(!it.second.reply.isSet()) {
it.second.reply.send( RegisterWorkerReply( it.second.processClass, it.second.priorityInfo ) );
it.second.reply.send( RegisterWorkerReply( it.second.details.processClass, it.second.priorityInfo ) );
}
}
}
@ -2153,11 +2151,11 @@ ACTOR Future<Void> updatedChangedDatacenters(ClusterControllerData *self) {
TraceEvent("UpdateChangedDatacenter", self->id).detail("CCFirst", self->changedDcIds.get().first);
if( !self->changedDcIds.get().first ) {
auto& worker = self->id_worker[self->clusterControllerProcessId];
uint8_t newFitness = ClusterControllerPriorityInfo::calculateDCFitness( worker.interf.locality.dcId(), self->changedDcIds.get().second.get() );
uint8_t newFitness = ClusterControllerPriorityInfo::calculateDCFitness( worker.details.interf.locality.dcId(), self->changedDcIds.get().second.get() );
if( worker.priorityInfo.dcFitness != newFitness ) {
worker.priorityInfo.dcFitness = newFitness;
if(!worker.reply.isSet()) {
worker.reply.send( RegisterWorkerReply( worker.processClass, worker.priorityInfo ) );
worker.reply.send( RegisterWorkerReply( worker.details.processClass, worker.priorityInfo ) );
}
}
} else {
@ -2166,12 +2164,12 @@ ACTOR Future<Void> updatedChangedDatacenters(ClusterControllerData *self) {
bool updated = false;
for ( auto& it : self->id_worker ) {
if( ( !it.second.priorityInfo.isExcluded && it.second.priorityInfo.processClassFitness == currentFit ) || currentFit == ProcessClass::NeverAssign ) {
uint8_t fitness = ClusterControllerPriorityInfo::calculateDCFitness( it.second.interf.locality.dcId(), self->changedDcIds.get().second.get() );
uint8_t fitness = ClusterControllerPriorityInfo::calculateDCFitness( it.second.details.interf.locality.dcId(), self->changedDcIds.get().second.get() );
if ( it.first != self->clusterControllerProcessId && it.second.priorityInfo.dcFitness != fitness ) {
updated = true;
it.second.priorityInfo.dcFitness = fitness;
if(!it.second.reply.isSet()) {
it.second.reply.send( RegisterWorkerReply( it.second.processClass, it.second.priorityInfo ) );
it.second.reply.send( RegisterWorkerReply( it.second.details.processClass, it.second.priorityInfo ) );
}
}
}
@ -2323,11 +2321,11 @@ ACTOR Future<DataDistributorInterface> startDataDistributor( ClusterControllerDa
std::map<Optional<Standalone<StringRef>>, int> id_used = self->getUsedIds();
state WorkerFitnessInfo data_distributor = self->getWorkerForRoleInDatacenter(dcId, ProcessClass::DataDistributor, ProcessClass::NeverAssign, self->db.config, id_used);
req.reqId = g_random->randomUniqueID();
TraceEvent("ClusterController_DataDistributorRecruit", req.reqId).detail("Addr", data_distributor.worker.first.address());
TraceEvent("ClusterController_DataDistributorRecruit", req.reqId).detail("Addr", data_distributor.worker.interf.address());
ErrorOr<DataDistributorInterface> distributor = wait( data_distributor.worker.first.dataDistributor.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_DISTRIBUTOR_JOIN_DELAY, 0) );
ErrorOr<DataDistributorInterface> distributor = wait( data_distributor.worker.interf.dataDistributor.getReplyUnlessFailedFor(req, SERVER_KNOBS->WAIT_FOR_DISTRIBUTOR_JOIN_DELAY, 0) );
if (distributor.present()) {
TraceEvent("ClusterController_DataDistributorRecruited", req.reqId).detail("Addr", data_distributor.worker.first.address());
TraceEvent("ClusterController_DataDistributorRecruited", req.reqId).detail("Addr", data_distributor.worker.interf.address());
return distributor.get();
}
}
@ -2417,19 +2415,19 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
registerWorker( req, &self );
}
when( GetWorkersRequest req = waitNext( interf.getWorkers.getFuture() ) ) {
vector<std::pair<WorkerInterface, ProcessClass>> workers;
vector<WorkerDetails> workers;
auto masterAddr = self.db.serverInfo->get().master.address();
for(auto& it : self.id_worker) {
if ( (req.flags & GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY) && self.db.config.isExcludedServer(it.second.interf.address()) ) {
if ( (req.flags & GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY) && self.db.config.isExcludedServer(it.second.details.interf.address()) ) {
continue;
}
if ( (req.flags & GetWorkersRequest::TESTER_CLASS_ONLY) && it.second.processClass.classType() != ProcessClass::TesterClass ) {
if ( (req.flags & GetWorkersRequest::TESTER_CLASS_ONLY) && it.second.details.processClass.classType() != ProcessClass::TesterClass ) {
continue;
}
workers.push_back(std::make_pair(it.second.interf, it.second.processClass));
workers.push_back(it.second.details);
}
req.reply.send( workers );
@ -2437,8 +2435,8 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
when( GetClientWorkersRequest req = waitNext( interf.clientInterface.getClientWorkers.getFuture() ) ) {
vector<ClientWorkerInterface> workers;
for(auto& it : self.id_worker) {
if (it.second.processClass.classType() != ProcessClass::TesterClass) {
workers.push_back(it.second.interf.clientInterface);
if (it.second.details.processClass.classType() != ProcessClass::TesterClass) {
workers.push_back(it.second.details.interf.clientInterface);
}
}
req.reply.send(workers);
@ -2446,7 +2444,7 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
when( wait( coordinationPingDelay ) ) {
CoordinationPingMessage message(self.id, step++);
for(auto& it : self.id_worker)
it.second.interf.coordinationPing.send(message);
it.second.details.interf.coordinationPing.send(message);
coordinationPingDelay = delay( SERVER_KNOBS->WORKER_COORDINATION_PING_DELAY );
TraceEvent("CoordinationPingSent", self.id).detail("TimeStep", message.timeStep);
}

View File

@ -184,7 +184,7 @@ struct GetWorkersRequest {
enum { TESTER_CLASS_ONLY = 0x1, NON_EXCLUDED_PROCESSES_ONLY = 0x2 };
int flags;
ReplyPromise<vector<std::pair<WorkerInterface, ProcessClass>>> reply;
ReplyPromise<vector<WorkerDetails>> reply;
GetWorkersRequest() : flags(0) {}
explicit GetWorkersRequest(int fl) : flags(fl) {}

View File

@ -30,10 +30,10 @@
#include "fdbclient/ManagementAPI.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
ACTOR Future<vector<std::pair<WorkerInterface, ProcessClass>>> getWorkers( Reference<AsyncVar<ServerDBInfo>> dbInfo, int flags = 0 ) {
ACTOR Future<vector<WorkerDetails>> getWorkers( Reference<AsyncVar<ServerDBInfo>> dbInfo, int flags = 0 ) {
loop {
choose {
when( vector<std::pair<WorkerInterface, ProcessClass>> w = wait( brokenPromiseToNever( dbInfo->get().clusterInterface.getWorkers.getReply( GetWorkersRequest( flags ) ) ) ) ) {
when( vector<WorkerDetails> w = wait( brokenPromiseToNever( dbInfo->get().clusterInterface.getWorkers.getReply( GetWorkersRequest( flags ) ) ) ) ) {
return w;
}
when( wait( dbInfo->onChange() ) ) {}
@ -46,12 +46,12 @@ ACTOR Future<WorkerInterface> getMasterWorker( Database cx, Reference<AsyncVar<S
TraceEvent("GetMasterWorker").detail("Stage", "GettingWorkers");
loop {
state vector<std::pair<WorkerInterface, ProcessClass>> workers = wait( getWorkers( dbInfo ) );
state vector<WorkerDetails> workers = wait( getWorkers( dbInfo ) );
for( int i = 0; i < workers.size(); i++ ) {
if( workers[i].first.address() == dbInfo->get().master.address() ) {
TraceEvent("GetMasterWorker").detail("Stage", "GotWorkers").detail("MasterId", dbInfo->get().master.id()).detail("WorkerId", workers[i].first.id());
return workers[i].first;
if( workers[i].interf.address() == dbInfo->get().master.address() ) {
TraceEvent("GetMasterWorker").detail("Stage", "GotWorkers").detail("MasterId", dbInfo->get().master.id()).detail("WorkerId", workers[i].interf.id());
return workers[i].interf;
}
}
@ -69,15 +69,15 @@ ACTOR Future<WorkerInterface> getDataDistributorWorker( Database cx, Reference<A
TraceEvent("GetDataDistributorWorker").detail("Stage", "GettingWorkers");
loop {
state vector<std::pair<WorkerInterface, ProcessClass>> workers = wait( getWorkers( dbInfo ) );
state vector<WorkerDetails> workers = wait( getWorkers( dbInfo ) );
if (!dbInfo->get().distributor.present()) continue;
for( int i = 0; i < workers.size(); i++ ) {
if( workers[i].first.address() == dbInfo->get().distributor.get().address() ) {
if( workers[i].interf.address() == dbInfo->get().distributor.get().address() ) {
TraceEvent("GetDataDistributorWorker").detail("Stage", "GotWorkers")
.detail("DataDistributorId", dbInfo->get().distributor.get().id())
.detail("WorkerId", workers[i].first.id());
return workers[i].first;
.detail("WorkerId", workers[i].interf.id());
return workers[i].interf;
}
}
@ -128,10 +128,10 @@ int64_t getQueueSize( const TraceEventFields& md ) {
ACTOR Future<int64_t> getMaxTLogQueueSize( Database cx, Reference<AsyncVar<ServerDBInfo>> dbInfo ) {
TraceEvent("MaxTLogQueueSize").detail("Stage", "ContactingLogs");
state std::vector<std::pair<WorkerInterface, ProcessClass>> workers = wait(getWorkers(dbInfo));
state std::vector<WorkerDetails> workers = wait(getWorkers(dbInfo));
std::map<NetworkAddress, WorkerInterface> workersMap;
for(auto worker : workers) {
workersMap[worker.first.address()] = worker.first;
workersMap[worker.interf.address()] = worker.interf;
}
state std::vector<Future<TraceEventFields>> messages;
@ -189,14 +189,14 @@ ACTOR Future<int64_t> getMaxStorageServerQueueSize( Database cx, Reference<Async
TraceEvent("MaxStorageServerQueueSize").detail("Stage", "ContactingStorageServers");
Future<std::vector<StorageServerInterface>> serversFuture = getStorageServers(cx);
state Future<std::vector<std::pair<WorkerInterface, ProcessClass>>> workersFuture = getWorkers(dbInfo);
state Future<std::vector<WorkerDetails>> workersFuture = getWorkers(dbInfo);
state std::vector<StorageServerInterface> servers = wait(serversFuture);
state std::vector<std::pair<WorkerInterface, ProcessClass>> workers = wait(workersFuture);
state std::vector<WorkerDetails> workers = wait(workersFuture);
std::map<NetworkAddress, WorkerInterface> workersMap;
for(auto worker : workers) {
workersMap[worker.first.address()] = worker.first;
workersMap[worker.interf.address()] = worker.interf;
}
state std::vector<Future<TraceEventFields>> messages;

View File

@ -35,7 +35,7 @@ Future<int64_t> getDataDistributionQueueSize( Database const &cx, Reference<Asyn
Future<bool> getTeamCollectionValid(Database const& cx, WorkerInterface const&);
Future<bool> getTeamCollectionValid(Database const& cx, Reference<AsyncVar<struct ServerDBInfo>> const&);
Future<vector<StorageServerInterface>> getStorageServers( Database const& cx, bool const &use_system_priority = false);
Future<vector<std::pair<WorkerInterface, ProcessClass>>> getWorkers( Reference<AsyncVar<ServerDBInfo>> const& dbInfo, int const& flags = 0 );
Future<vector<WorkerDetails>> getWorkers( Reference<AsyncVar<ServerDBInfo>> const& dbInfo, int const& flags = 0 );
Future<WorkerInterface> getMasterWorker( Database const& cx, Reference<AsyncVar<ServerDBInfo>> const& dbInfo );
Future<Void> repairDeadDatacenter(Database const& cx, Reference<AsyncVar<ServerDBInfo>> const& dbInfo, std::string const& context);

View File

@ -100,12 +100,12 @@ ACTOR static Future< Optional<TraceEventFields> > latestEventOnWorker(WorkerInte
}
}
ACTOR static Future< Optional< std::pair<WorkerEvents, std::set<std::string>> > > latestEventOnWorkers(std::vector<std::pair<WorkerInterface, ProcessClass>> workers, std::string eventName) {
ACTOR static Future< Optional< std::pair<WorkerEvents, std::set<std::string>> > > latestEventOnWorkers(std::vector<WorkerDetails> workers, std::string eventName) {
try {
state vector<Future<ErrorOr<TraceEventFields>>> eventTraces;
for (int c = 0; c < workers.size(); c++) {
EventLogRequest req = eventName.size() > 0 ? EventLogRequest(Standalone<StringRef>(eventName)) : EventLogRequest();
eventTraces.push_back(errorOr(timeoutError(workers[c].first.eventLogRequest.getReply(req), 2.0)));
eventTraces.push_back(errorOr(timeoutError(workers[c].interf.eventLogRequest.getReply(req), 2.0)));
}
wait(waitForAll(eventTraces));
@ -116,11 +116,11 @@ ACTOR static Future< Optional< std::pair<WorkerEvents, std::set<std::string>> >
for (int i = 0; i < eventTraces.size(); i++) {
const ErrorOr<TraceEventFields>& v = eventTraces[i].get();
if (v.isError()){
failed.insert(workers[i].first.address().toString());
results[workers[i].first.address()] = TraceEventFields();
failed.insert(workers[i].interf.address().toString());
results[workers[i].interf.address()] = TraceEventFields();
}
else {
results[workers[i].first.address()] = v.get();
results[workers[i].interf.address()] = v.get();
}
}
@ -135,26 +135,26 @@ ACTOR static Future< Optional< std::pair<WorkerEvents, std::set<std::string>> >
throw;
}
}
static Future< Optional< std::pair<WorkerEvents, std::set<std::string>> > > latestErrorOnWorkers(std::vector<std::pair<WorkerInterface, ProcessClass>> workers) {
static Future< Optional< std::pair<WorkerEvents, std::set<std::string>> > > latestErrorOnWorkers(std::vector<WorkerDetails> workers) {
return latestEventOnWorkers( workers, "" );
}
static Optional<std::pair<WorkerInterface, ProcessClass>> getWorker(std::vector<std::pair<WorkerInterface, ProcessClass>> const& workers, NetworkAddress const& address) {
static Optional<WorkerDetails> getWorker(std::vector<WorkerDetails> const& workers, NetworkAddress const& address) {
try {
for (int c = 0; c < workers.size(); c++)
if (address == workers[c].first.address())
if (address == workers[c].interf.address())
return workers[c];
return Optional<std::pair<WorkerInterface, ProcessClass>>();
return Optional<WorkerDetails>();
}
catch (Error &e){
return Optional<std::pair<WorkerInterface, ProcessClass>>();
return Optional<WorkerDetails>();
}
}
static Optional<std::pair<WorkerInterface, ProcessClass>> getWorker(std::map<NetworkAddress, std::pair<WorkerInterface, ProcessClass>> const& workersMap, NetworkAddress const& address) {
static Optional<WorkerDetails> getWorker(std::map<NetworkAddress, WorkerDetails> const& workersMap, NetworkAddress const& address) {
auto itr = workersMap.find(address);
if(itr == workersMap.end()) {
return Optional<std::pair<WorkerInterface, ProcessClass>>();
return Optional<WorkerDetails>();
}
return itr->second;
@ -261,7 +261,7 @@ static JsonBuilderObject getError(const TraceEventFields& errorFields) {
return statusObj;
}
static JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics, vector<std::pair<WorkerInterface, ProcessClass>> workers, Optional<DatabaseConfiguration> configuration, std::set<std::string> *incomplete_reasons) {
static JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics, vector<WorkerDetails> workers, Optional<DatabaseConfiguration> configuration, std::set<std::string> *incomplete_reasons) {
JsonBuilderObject machineMap;
double metric;
int failed = 0;
@ -274,9 +274,9 @@ static JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics, vector<std:
std::map<std::string, JsonBuilderObject> machineJsonMap;
for (auto const& worker : workers){
locality[worker.first.address()] = worker.first.locality;
if (worker.first.locality.dcId().present())
dcIds[worker.first.address()] = worker.first.locality.dcId().get().printable();
locality[worker.interf.address()] = worker.interf.locality;
if (worker.interf.locality.dcId().present())
dcIds[worker.interf.address()] = worker.interf.locality.dcId().get().printable();
}
for(auto it = mMetrics.begin(); it != mMetrics.end(); it++) {
@ -540,7 +540,7 @@ struct RolesInfo {
ACTOR static Future<JsonBuilderObject> processStatusFetcher(
Reference<AsyncVar<struct ServerDBInfo>> db,
std::vector<std::pair<WorkerInterface, ProcessClass>> workers,
std::vector<WorkerDetails> workers,
WorkerEvents pMetrics,
WorkerEvents mMetrics,
WorkerEvents errors,
@ -581,13 +581,13 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
}
state std::map<Optional<Standalone<StringRef>>, MachineMemoryInfo> machineMemoryUsage;
state std::vector<std::pair<WorkerInterface, ProcessClass>>::iterator workerItr;
state std::vector<WorkerDetails>::iterator workerItr;
for(workerItr = workers.begin(); workerItr != workers.end(); ++workerItr) {
wait(yield());
state std::map<Optional<Standalone<StringRef>>, MachineMemoryInfo>::iterator memInfo = machineMemoryUsage.insert(std::make_pair(workerItr->first.locality.machineId(), MachineMemoryInfo())).first;
state std::map<Optional<Standalone<StringRef>>, MachineMemoryInfo>::iterator memInfo = machineMemoryUsage.insert(std::make_pair(workerItr->interf.locality.machineId(), MachineMemoryInfo())).first;
try {
ASSERT(pMetrics.count(workerItr->first.address()));
const TraceEventFields& processMetrics = pMetrics[workerItr->first.address()];
ASSERT(pMetrics.count(workerItr->interf.address()));
const TraceEventFields& processMetrics = pMetrics[workerItr->interf.address()];
if(memInfo->second.valid()) {
if(processMetrics.size() > 0) {
@ -647,10 +647,10 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
wait(yield());
state JsonBuilderObject statusObj;
try {
ASSERT(pMetrics.count(workerItr->first.address()));
ASSERT(pMetrics.count(workerItr->interf.address()));
NetworkAddress address = workerItr->first.address();
const TraceEventFields& event = pMetrics[workerItr->first.address()];
NetworkAddress address = workerItr->interf.address();
const TraceEventFields& event = pMetrics[workerItr->interf.address()];
statusObj["address"] = address.toString();
JsonBuilderObject memoryObj;
@ -661,7 +661,7 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
std::string MachineID = event.getValue("MachineID");
statusObj["machine_id"] = MachineID;
statusObj["locality"] = getLocalityInfo(workerItr->first.locality);
statusObj["locality"] = getLocalityInfo(workerItr->interf.locality);
statusObj.setKeyRawNumber("uptime_seconds",event.getValue("UptimeSeconds"));
@ -750,7 +750,7 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
double availableMemory;
availableMemory = mMetrics[address].getDouble("AvailableMemory");
auto machineMemInfo = machineMemoryUsage[workerItr->first.locality.machineId()];
auto machineMemInfo = machineMemoryUsage[workerItr->interf.locality.machineId()];
if (machineMemInfo.valid()) {
ASSERT(machineMemInfo.numProcesses > 0);
int64_t memory = (availableMemory + machineMemInfo.memoryUsage) / machineMemInfo.numProcesses;
@ -794,8 +794,8 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
statusObj["excluded"] = configuration.get().isExcludedServer(address);
}
statusObj["class_type"] = workerItr->second.toString();
statusObj["class_source"] = workerItr->second.sourceString();
statusObj["class_type"] = workerItr->processClass.toString();
statusObj["class_source"] = workerItr->processClass.sourceString();
}
catch (Error& e){
@ -803,7 +803,7 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
incomplete_reasons->insert("Cannot retrieve all process status information.");
}
processMap[printable(workerItr->first.locality.processId())] = statusObj;
processMap[printable(workerItr->interf.locality.processId())] = statusObj;
}
return processMap;
}
@ -847,11 +847,11 @@ static JsonBuilderObject clientStatusFetcher(ClientVersionMap clientVersionMap,
return clientStatus;
}
ACTOR static Future<JsonBuilderObject> recoveryStateStatusFetcher(std::pair<WorkerInterface, ProcessClass> mWorker, int workerCount, std::set<std::string> *incomplete_reasons, int* statusCode) {
ACTOR static Future<JsonBuilderObject> recoveryStateStatusFetcher(WorkerDetails mWorker, int workerCount, std::set<std::string> *incomplete_reasons, int* statusCode) {
state JsonBuilderObject message;
try {
TraceEventFields md = wait( timeoutError(mWorker.first.eventLogRequest.getReply( EventLogRequest( LiteralStringRef("MasterRecoveryState") ) ), 1.0) );
TraceEventFields md = wait( timeoutError(mWorker.interf.eventLogRequest.getReply( EventLogRequest( LiteralStringRef("MasterRecoveryState") ) ), 1.0) );
state int mStatusCode = md.getInt("StatusCode");
if (mStatusCode < 0 || mStatusCode >= RecoveryStatus::END)
throw attribute_not_found();
@ -1100,18 +1100,18 @@ static JsonBuilderObject configurationFetcher(Optional<DatabaseConfiguration> co
return statusObj;
}
ACTOR static Future<JsonBuilderObject> dataStatusFetcher(std::pair<WorkerInterface, ProcessClass> ddWorker, int *minReplicasRemaining) {
ACTOR static Future<JsonBuilderObject> dataStatusFetcher(WorkerDetails ddWorker, int *minReplicasRemaining) {
state JsonBuilderObject statusObjData;
try {
std::vector<Future<TraceEventFields>> futures;
// TODO: Should this be serial?
futures.push_back(timeoutError(ddWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("DDTrackerStarting"))), 1.0));
futures.push_back(timeoutError(ddWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("DDTrackerStats"))), 1.0));
futures.push_back(timeoutError(ddWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("MovingData"))), 1.0));
futures.push_back(timeoutError(ddWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("TotalDataInFlight"))), 1.0));
futures.push_back(timeoutError(ddWorker.first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("TotalDataInFlightRemote"))), 1.0));
futures.push_back(timeoutError(ddWorker.interf.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("DDTrackerStarting"))), 1.0));
futures.push_back(timeoutError(ddWorker.interf.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("DDTrackerStats"))), 1.0));
futures.push_back(timeoutError(ddWorker.interf.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("MovingData"))), 1.0));
futures.push_back(timeoutError(ddWorker.interf.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("TotalDataInFlight"))), 1.0));
futures.push_back(timeoutError(ddWorker.interf.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("TotalDataInFlightRemote"))), 1.0));
std::vector<TraceEventFields> dataInfo = wait(getAll(futures));
@ -1324,16 +1324,16 @@ ACTOR static Future<vector<std::pair<MasterProxyInterface, EventMap>>> getProxie
return results;
}
static int getExtraTLogEligibleMachines(const vector<std::pair<WorkerInterface, ProcessClass>>& workers, const DatabaseConfiguration& configuration) {
static int getExtraTLogEligibleMachines(const vector<WorkerDetails>& workers, const DatabaseConfiguration& configuration) {
std::set<StringRef> allMachines;
std::map<Key,std::set<StringRef>> dcId_machine;
for(auto const& worker : workers) {
if(worker.second.machineClassFitness(ProcessClass::TLog) < ProcessClass::NeverAssign
&& !configuration.isExcludedServer(worker.first.address()))
if(worker.processClass.machineClassFitness(ProcessClass::TLog) < ProcessClass::NeverAssign
&& !configuration.isExcludedServer(worker.interf.address()))
{
allMachines.insert(worker.first.locality.zoneId().get());
if(worker.first.locality.dcId().present()) {
dcId_machine[worker.first.locality.dcId().get()].insert(worker.first.locality.zoneId().get());
allMachines.insert(worker.interf.locality.zoneId().get());
if(worker.interf.locality.dcId().present()) {
dcId_machine[worker.interf.locality.dcId().get()].insert(worker.interf.locality.zoneId().get());
}
}
}
@ -1387,7 +1387,7 @@ JsonBuilderObject getPerfLimit(TraceEventFields const& ratekeeper, double transP
return perfLimit;
}
ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<struct ServerDBInfo>> db, vector<std::pair<WorkerInterface, ProcessClass>> workers, std::pair<WorkerInterface, ProcessClass> mWorker, std::pair<WorkerInterface, ProcessClass> ddWorker,
ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<struct ServerDBInfo>> db, vector<WorkerDetails> workers, WorkerDetails mWorker, WorkerDetails ddWorker,
JsonBuilderObject *qos, JsonBuilderObject *data_overlay, std::set<std::string> *incomplete_reasons, Future<ErrorOr<vector<std::pair<StorageServerInterface, EventMap>>>> storageServerFuture)
{
state JsonBuilderObject statusObj;
@ -1398,14 +1398,14 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
// Writes and conflicts
try {
vector<Future<TraceEventFields>> proxyStatFutures;
std::map<NetworkAddress, std::pair<WorkerInterface, ProcessClass>> workersMap;
std::map<NetworkAddress, WorkerDetails> workersMap;
for (auto const& w : workers) {
workersMap[w.first.address()] = w;
workersMap[w.interf.address()] = w;
}
for (auto &p : db->get().client.proxies) {
auto worker = getWorker(workersMap, p.address());
if (worker.present())
proxyStatFutures.push_back(timeoutError(worker.get().first.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("ProxyMetrics"))), 1.0));
proxyStatFutures.push_back(timeoutError(worker.get().interf.eventLogRequest.getReply(EventLogRequest(LiteralStringRef("ProxyMetrics"))), 1.0));
else
throw all_alternatives_failed(); // We need data from all proxies for this result to be trustworthy
}
@ -1439,8 +1439,8 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
// Transactions
try {
state TraceEventFields ratekeeper = wait( timeoutError(ddWorker.first.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdate") ) ), 1.0) );
TraceEventFields batchRatekeeper = wait( timeoutError(ddWorker.first.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdateBatch") ) ), 1.0) );
state TraceEventFields ratekeeper = wait( timeoutError(ddWorker.interf.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdate") ) ), 1.0) );
TraceEventFields batchRatekeeper = wait( timeoutError(ddWorker.interf.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdateBatch") ) ), 1.0) );
double tpsLimit = ratekeeper.getDouble("TPSLimit");
double batchTpsLimit = batchRatekeeper.getDouble("TPSLimit");
@ -1596,7 +1596,7 @@ static JsonBuilderArray oldTlogFetcher(int* oldLogFaultTolerance, Reference<Asyn
return oldTlogsArray;
}
static JsonBuilderObject faultToleranceStatusFetcher(DatabaseConfiguration configuration, ServerCoordinators coordinators, std::vector<std::pair<WorkerInterface, ProcessClass>>& workers, int extraTlogEligibleMachines, int minReplicasRemaining) {
static JsonBuilderObject faultToleranceStatusFetcher(DatabaseConfiguration configuration, ServerCoordinators coordinators, std::vector<WorkerDetails>& workers, int extraTlogEligibleMachines, int minReplicasRemaining) {
JsonBuilderObject statusObj;
// without losing data
@ -1605,7 +1605,7 @@ static JsonBuilderObject faultToleranceStatusFetcher(DatabaseConfiguration confi
std::map<NetworkAddress, StringRef> workerZones;
for(auto& worker : workers) {
workerZones[worker.first.address()] = worker.first.locality.zoneId().orDefault(LiteralStringRef(""));
workerZones[worker.interf.address()] = worker.interf.locality.zoneId().orDefault(LiteralStringRef(""));
}
std::map<StringRef, int> coordinatorZoneCounts;
for(auto& coordinator : coordinators.ccf->getConnectionString().coordinators()) {
@ -1800,7 +1800,7 @@ ACTOR Future<JsonBuilderObject> lockedStatusFetcher(Reference<AsyncVar<struct Se
ACTOR Future<StatusReply> clusterGetStatus(
Reference<AsyncVar<struct ServerDBInfo>> db,
Database cx,
vector<std::pair<WorkerInterface, ProcessClass>> workers,
vector<WorkerDetails> workers,
ProcessIssuesMap workerIssues,
ProcessIssuesMap clientIssues,
ClientVersionMap clientVersionMap,
@ -1814,19 +1814,19 @@ ACTOR Future<StatusReply> clusterGetStatus(
// Check if master worker is present
state JsonBuilderArray messages;
state std::set<std::string> status_incomplete_reasons;
state std::pair<WorkerInterface, ProcessClass> mWorker;
state std::pair<WorkerInterface, ProcessClass> ddWorker; // DataDistributor worker
state WorkerDetails mWorker;
state WorkerDetails ddWorker; // DataDistributor worker
try {
// Get the master Worker interface
Optional<std::pair<WorkerInterface, ProcessClass>> _mWorker = getWorker( workers, db->get().master.address() );
Optional<WorkerDetails> _mWorker = getWorker( workers, db->get().master.address() );
if (_mWorker.present()) {
mWorker = _mWorker.get();
} else {
messages.push_back(JsonString::makeMessage("unreachable_master_worker", "Unable to locate the master worker."));
}
// Get the DataDistributor worker interface
Optional<std::pair<WorkerInterface, ProcessClass>> _ddWorker;
Optional<WorkerDetails> _ddWorker;
if (db->get().distributor.present()) {
_ddWorker = getWorker( workers, db->get().distributor.get().address() );
}
@ -1930,7 +1930,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
// in status output is important to give context to error messages in status that reference a storage server role ID.
state std::unordered_map<NetworkAddress, WorkerInterface> address_workers;
for (auto const& worker : workers) {
address_workers[worker.first.address()] = worker.first;
address_workers[worker.interf.address()] = worker.interf;
}
state Future<ErrorOr<vector<std::pair<StorageServerInterface, EventMap>>>> storageServerFuture = errorOr(getStorageServersAndMetrics(cx, address_workers));

View File

@ -30,7 +30,7 @@
typedef std::map< NetworkAddress, std::pair<std::string,UID> > ProcessIssuesMap;
typedef std::map< NetworkAddress, Standalone<VectorRef<ClientVersionRef>> > ClientVersionMap;
Future<StatusReply> clusterGetStatus( Reference<AsyncVar<struct ServerDBInfo>> const& db, Database const& cx, vector<std::pair<WorkerInterface, ProcessClass>> const& workers,
Future<StatusReply> clusterGetStatus( Reference<AsyncVar<struct ServerDBInfo>> const& db, Database const& cx, vector<WorkerDetails> const& workers,
ProcessIssuesMap const& workerIssues, ProcessIssuesMap const& clientIssues, ClientVersionMap const& clientVersionMap, std::map<NetworkAddress, std::string> const& traceLogGroupMap,
ServerCoordinators const& coordinators, std::vector<NetworkAddress> const& incompatibleConnections, Version const& datacenterVersionDifference );

View File

@ -72,6 +72,20 @@ struct WorkerInterface {
}
};
struct WorkerDetails {
WorkerInterface interf;
ProcessClass processClass;
bool degraded;
WorkerDetails() : degraded(false) {}
WorkerDetails(const WorkerInterface& interf, ProcessClass processClass, bool degraded) : interf(interf), processClass(processClass), degraded(degraded) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, interf, processClass, degraded);
}
};
struct InitializeTLogRequest {
UID recruitmentID;
LogSystemConfig recoverFrom;

View File

@ -1095,11 +1095,11 @@ ACTOR Future<Void> runTests( Reference<AsyncVar<Optional<struct ClusterControlle
int minTestersExpected, StringRef startingConfiguration, LocalityData locality ) {
state int flags = (at == TEST_ON_SERVERS ? 0 : GetWorkersRequest::TESTER_CLASS_ONLY) | GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY;
state Future<Void> testerTimeout = delay(600.0); // wait 600 sec for testers to show up
state vector<std::pair<WorkerInterface, ProcessClass>> workers;
state vector<WorkerDetails> workers;
loop {
choose {
when( vector<std::pair<WorkerInterface, ProcessClass>> w = wait( cc->get().present() ? brokenPromiseToNever( cc->get().get().getWorkers.getReply( GetWorkersRequest( flags ) ) ) : Never() ) ) {
when( vector<WorkerDetails> w = wait( cc->get().present() ? brokenPromiseToNever( cc->get().get().getWorkers.getReply( GetWorkersRequest( flags ) ) ) : Never() ) ) {
if (w.size() >= minTestersExpected) {
workers = w;
break;
@ -1116,7 +1116,7 @@ ACTOR Future<Void> runTests( Reference<AsyncVar<Optional<struct ClusterControlle
vector<TesterInterface> ts;
for(int i=0; i<workers.size(); i++)
ts.push_back(workers[i].first.testerInterface);
ts.push_back(workers[i].interf.testerInterface);
wait( runTests( cc, ci, ts, tests, startingConfiguration, locality) );
return Void();

View File

@ -1089,26 +1089,26 @@ struct ConsistencyCheckWorkload : TestWorkload
//Returns false if any worker that should have a storage server does not have one
ACTOR Future<bool> checkForStorage(Database cx, DatabaseConfiguration configuration, ConsistencyCheckWorkload *self)
{
state vector<std::pair<WorkerInterface, ProcessClass>> workers = wait( getWorkers( self->dbInfo ) );
state vector<WorkerDetails> workers = wait( getWorkers( self->dbInfo ) );
state vector<StorageServerInterface> storageServers = wait( getStorageServers( cx ) );
std::set<Optional<Key>> missingStorage;
for( int i = 0; i < workers.size(); i++ ) {
if( !configuration.isExcludedServer(workers[i].first.address()) &&
( workers[i].second == ProcessClass::StorageClass || workers[i].second == ProcessClass::UnsetClass ) ) {
if( !configuration.isExcludedServer(workers[i].interf.address()) &&
( workers[i].processClass == ProcessClass::StorageClass || workers[i].processClass == ProcessClass::UnsetClass ) ) {
bool found = false;
for( int j = 0; j < storageServers.size(); j++ ) {
if( storageServers[j].address() == workers[i].first.address() ) {
if( storageServers[j].address() == workers[i].interf.address() ) {
found = true;
break;
}
}
if( !found ) {
TraceEvent("ConsistencyCheck_NoStorage")
.detail("Address", workers[i].first.address())
.detail("Address", workers[i].interf.address())
.detail("ProcessClassEqualToStorageClass",
(int)(workers[i].second == ProcessClass::StorageClass));
missingStorage.insert(workers[i].first.locality.dcId());
(int)(workers[i].processClass == ProcessClass::StorageClass));
missingStorage.insert(workers[i].interf.locality.dcId());
}
}
}
@ -1125,12 +1125,12 @@ struct ConsistencyCheckWorkload : TestWorkload
}
ACTOR Future<bool> checkForExtraDataStores(Database cx, ConsistencyCheckWorkload *self) {
state vector<std::pair<WorkerInterface, ProcessClass>> workers = wait( getWorkers( self->dbInfo ) );
state vector<WorkerDetails> workers = wait( getWorkers( self->dbInfo ) );
state vector<StorageServerInterface> storageServers = wait( getStorageServers( cx ) );
auto& db = self->dbInfo->get();
state std::vector<TLogInterface> logs = db.logSystemConfig.allPresentLogs();
state std::vector<std::pair<WorkerInterface, ProcessClass>>::iterator itr;
state std::vector<WorkerDetails>::iterator itr;
state bool foundExtraDataStore = false;
state std::map<NetworkAddress, std::set<UID>> statefulProcesses;
@ -1142,19 +1142,19 @@ struct ConsistencyCheckWorkload : TestWorkload
}
for(itr = workers.begin(); itr != workers.end(); ++itr) {
ErrorOr<Standalone<VectorRef<UID>>> stores = wait(itr->first.diskStoreRequest.getReplyUnlessFailedFor(DiskStoreRequest(false), 2, 0));
ErrorOr<Standalone<VectorRef<UID>>> stores = wait(itr->interf.diskStoreRequest.getReplyUnlessFailedFor(DiskStoreRequest(false), 2, 0));
if(stores.isError()) {
TraceEvent("ConsistencyCheck_GetDataStoreFailure").error(stores.getError()).detail("Address", itr->first.address());
TraceEvent("ConsistencyCheck_GetDataStoreFailure").error(stores.getError()).detail("Address", itr->interf.address());
self->testFailure("Failed to get data stores");
return false;
}
for(auto id : stores.get()) {
if(!statefulProcesses[itr->first.address()].count(id)) {
TraceEvent("ConsistencyCheck_ExtraDataStore").detail("Address", itr->first.address()).detail("DataStoreID", id);
if(!statefulProcesses[itr->interf.address()].count(id)) {
TraceEvent("ConsistencyCheck_ExtraDataStore").detail("Address", itr->interf.address()).detail("DataStoreID", id);
if(g_network->isSimulated()) {
TraceEvent("ConsistencyCheck_RebootProcess").detail("Address", itr->first.address()).detail("DataStoreID", id);
g_simulator.rebootProcess(g_simulator.getProcessByAddress(itr->first.address()), ISimulator::RebootProcess);
TraceEvent("ConsistencyCheck_RebootProcess").detail("Address", itr->interf.address()).detail("DataStoreID", id);
g_simulator.rebootProcess(g_simulator.getProcessByAddress(itr->interf.address()), ISimulator::RebootProcess);
}
foundExtraDataStore = true;
@ -1172,17 +1172,17 @@ struct ConsistencyCheckWorkload : TestWorkload
//Returns true if the worker at the given address has the specified machineClass or has an unset class
//The interfaceType paramater is used in a TraceEvent, should be something like (Master, MasterProxy, StorageServer, ...)
bool workerHasClass(vector<std::pair<WorkerInterface, ProcessClass>> workers, NetworkAddress address, ProcessClass::ClassType machineClass, std::string interfaceType)
bool workerHasClass(vector<WorkerDetails> workers, NetworkAddress address, ProcessClass::ClassType machineClass, std::string interfaceType)
{
//Search all workers until the correct one is found
for(int i = 0; i < workers.size(); i++)
{
if(workers[i].first.address() == address)
if(workers[i].interf.address() == address)
{
if(workers[i].second == machineClass || workers[i].second == ProcessClass::UnsetClass)
if(workers[i].processClass == machineClass || workers[i].processClass == ProcessClass::UnsetClass)
return true;
TraceEvent("ConsistencyCheck_InvalidClassType").detail("RequestedClass", workers[i].second.toString())
TraceEvent("ConsistencyCheck_InvalidClassType").detail("RequestedClass", workers[i].processClass.toString())
.detail("ActualClass", ProcessClass(machineClass, ProcessClass::CommandLineSource).toString()).detail("InterfaceType", interfaceType);
return false;
@ -1200,16 +1200,16 @@ struct ConsistencyCheckWorkload : TestWorkload
if(g_simulator.extraDB)
return true;
vector<std::pair<WorkerInterface, ProcessClass>> workers = wait( getWorkers( self->dbInfo ) );
vector<WorkerDetails> workers = wait( getWorkers( self->dbInfo ) );
std::set<NetworkAddress> workerAddresses;
for( auto it : workers ) {
ISimulator::ProcessInfo* info = g_simulator.getProcessByAddress(it.first.address());
ISimulator::ProcessInfo* info = g_simulator.getProcessByAddress(it.interf.address());
if(!info || info->failed) {
TraceEvent("ConsistencyCheck_FailedWorkerInList").detail("Addr", it.first.address());
TraceEvent("ConsistencyCheck_FailedWorkerInList").detail("Addr", it.interf.address());
return false;
}
workerAddresses.insert( NetworkAddress(it.first.address().ip, it.first.address().port, true, false) );
workerAddresses.insert( NetworkAddress(it.interf.address().ip, it.interf.address().port, true, false) );
}
vector<ISimulator::ProcessInfo*> all = g_simulator.getAllProcesses();
@ -1281,34 +1281,33 @@ struct ConsistencyCheckWorkload : TestWorkload
}
}
typedef std::pair<WorkerInterface, ProcessClass> WorkerClassPair;
//Returns true if all machines in the cluster that specified a desired class are operating in that class
ACTOR Future<bool> checkUsingDesiredClasses(Database cx, ConsistencyCheckWorkload *self) {
state Optional<Key> expectedPrimaryDcId;
state Optional<Key> expectedRemoteDcId;
state DatabaseConfiguration config = wait(getDatabaseConfiguration(cx));
state vector<WorkerClassPair> allWorkers = wait(getWorkers(self->dbInfo));
state vector<WorkerClassPair> nonExcludedWorkers = wait(getWorkers(self->dbInfo, GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY));
state vector<WorkerDetails> allWorkers = wait(getWorkers(self->dbInfo));
state vector<WorkerDetails> nonExcludedWorkers = wait(getWorkers(self->dbInfo, GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY));
auto& db = self->dbInfo->get();
std::map<NetworkAddress, WorkerClassPair> allWorkerProcessMap;
std::map<NetworkAddress, WorkerDetails> allWorkerProcessMap;
std::map<Optional<Key>, std::vector<ProcessClass::ClassType>> dcToAllClassTypes;
for (auto worker : allWorkers) {
allWorkerProcessMap[worker.first.address()] = worker;
Optional<Key> dc = worker.first.locality._data[LocalityData::keyDcId];
allWorkerProcessMap[worker.interf.address()] = worker;
Optional<Key> dc = worker.interf.locality._data[LocalityData::keyDcId];
if (!dcToAllClassTypes.count(dc))
dcToAllClassTypes.insert({});
dcToAllClassTypes[dc].push_back(worker.second.classType());
dcToAllClassTypes[dc].push_back(worker.processClass.classType());
}
std::map<NetworkAddress, WorkerClassPair> nonExcludedWorkerProcessMap;
std::map<NetworkAddress, WorkerDetails> nonExcludedWorkerProcessMap;
std::map<Optional<Key>, std::vector<ProcessClass::ClassType>> dcToNonExcludedClassTypes;
for (auto worker : nonExcludedWorkers) {
nonExcludedWorkerProcessMap[worker.first.address()] = worker;
Optional<Key> dc = worker.first.locality._data[LocalityData::keyDcId];
nonExcludedWorkerProcessMap[worker.interf.address()] = worker;
Optional<Key> dc = worker.interf.locality._data[LocalityData::keyDcId];
if (!dcToNonExcludedClassTypes.count(dc))
dcToNonExcludedClassTypes.insert({});
dcToNonExcludedClassTypes[dc].push_back(worker.second.classType());
dcToNonExcludedClassTypes[dc].push_back(worker.processClass.classType());
}
if (!allWorkerProcessMap.count(db.clusterInterface.clientInterface.address())) {
@ -1320,8 +1319,8 @@ struct ConsistencyCheckWorkload : TestWorkload
return false;
}
Optional<Key> ccDcId = allWorkerProcessMap[db.clusterInterface.clientInterface.address()].first.locality._data[LocalityData::keyDcId];
Optional<Key> masterDcId = allWorkerProcessMap[db.master.address()].first.locality._data[LocalityData::keyDcId];
Optional<Key> ccDcId = allWorkerProcessMap[db.clusterInterface.clientInterface.address()].interf.locality._data[LocalityData::keyDcId];
Optional<Key> masterDcId = allWorkerProcessMap[db.master.address()].interf.locality._data[LocalityData::keyDcId];
if (ccDcId != masterDcId) {
TraceEvent("ConsistencyCheck_CCAndMasterNotInSameDC").detail("ClusterControllerDcId", getOptionalString(ccDcId)).detail("MasterDcId", getOptionalString(masterDcId));
@ -1351,8 +1350,8 @@ struct ConsistencyCheckWorkload : TestWorkload
// Check CC
ProcessClass::Fitness bestClusterControllerFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[ccDcId], ProcessClass::ClusterController);
if (!nonExcludedWorkerProcessMap.count(db.clusterInterface.clientInterface.address()) || nonExcludedWorkerProcessMap[db.clusterInterface.clientInterface.address()].second.machineClassFitness(ProcessClass::ClusterController) != bestClusterControllerFitness) {
TraceEvent("ConsistencyCheck_ClusterControllerNotBest").detail("BestClusterControllerFitness", bestClusterControllerFitness).detail("ExistingClusterControllerFit", nonExcludedWorkerProcessMap.count(db.clusterInterface.clientInterface.address()) ? nonExcludedWorkerProcessMap[db.clusterInterface.clientInterface.address()].second.machineClassFitness(ProcessClass::ClusterController) : -1);
if (!nonExcludedWorkerProcessMap.count(db.clusterInterface.clientInterface.address()) || nonExcludedWorkerProcessMap[db.clusterInterface.clientInterface.address()].processClass.machineClassFitness(ProcessClass::ClusterController) != bestClusterControllerFitness) {
TraceEvent("ConsistencyCheck_ClusterControllerNotBest").detail("BestClusterControllerFitness", bestClusterControllerFitness).detail("ExistingClusterControllerFit", nonExcludedWorkerProcessMap.count(db.clusterInterface.clientInterface.address()) ? nonExcludedWorkerProcessMap[db.clusterInterface.clientInterface.address()].processClass.machineClassFitness(ProcessClass::ClusterController) : -1);
return false;
}
@ -1365,16 +1364,16 @@ struct ConsistencyCheckWorkload : TestWorkload
}
}
if ((!nonExcludedWorkerProcessMap.count(db.master.address()) && bestMasterFitness != ProcessClass::ExcludeFit) || nonExcludedWorkerProcessMap[db.master.address()].second.machineClassFitness(ProcessClass::Master) != bestMasterFitness) {
TraceEvent("ConsistencyCheck_MasterNotBest").detail("BestMasterFitness", bestMasterFitness).detail("ExistingMasterFit", nonExcludedWorkerProcessMap.count(db.master.address()) ? nonExcludedWorkerProcessMap[db.master.address()].second.machineClassFitness(ProcessClass::Master) : -1);
if ((!nonExcludedWorkerProcessMap.count(db.master.address()) && bestMasterFitness != ProcessClass::ExcludeFit) || nonExcludedWorkerProcessMap[db.master.address()].processClass.machineClassFitness(ProcessClass::Master) != bestMasterFitness) {
TraceEvent("ConsistencyCheck_MasterNotBest").detail("BestMasterFitness", bestMasterFitness).detail("ExistingMasterFit", nonExcludedWorkerProcessMap.count(db.master.address()) ? nonExcludedWorkerProcessMap[db.master.address()].processClass.machineClassFitness(ProcessClass::Master) : -1);
return false;
}
// Check proxy
ProcessClass::Fitness bestMasterProxyFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[masterDcId], ProcessClass::Proxy);
for (auto masterProxy : db.client.proxies) {
if (!nonExcludedWorkerProcessMap.count(masterProxy.address()) || nonExcludedWorkerProcessMap[masterProxy.address()].second.machineClassFitness(ProcessClass::Proxy) != bestMasterProxyFitness) {
TraceEvent("ConsistencyCheck_ProxyNotBest").detail("BestMasterProxyFitness", bestMasterProxyFitness).detail("ExistingMasterProxyFitness", nonExcludedWorkerProcessMap.count(masterProxy.address()) ? nonExcludedWorkerProcessMap[masterProxy.address()].second.machineClassFitness(ProcessClass::Proxy) : -1);
if (!nonExcludedWorkerProcessMap.count(masterProxy.address()) || nonExcludedWorkerProcessMap[masterProxy.address()].processClass.machineClassFitness(ProcessClass::Proxy) != bestMasterProxyFitness) {
TraceEvent("ConsistencyCheck_ProxyNotBest").detail("BestMasterProxyFitness", bestMasterProxyFitness).detail("ExistingMasterProxyFitness", nonExcludedWorkerProcessMap.count(masterProxy.address()) ? nonExcludedWorkerProcessMap[masterProxy.address()].processClass.machineClassFitness(ProcessClass::Proxy) : -1);
return false;
}
}
@ -1382,8 +1381,8 @@ struct ConsistencyCheckWorkload : TestWorkload
// Check resolver
ProcessClass::Fitness bestResolverFitness = getBestAvailableFitness(dcToNonExcludedClassTypes[masterDcId], ProcessClass::Resolver);
for (auto resolver : db.resolvers) {
if (!nonExcludedWorkerProcessMap.count(resolver.address()) || nonExcludedWorkerProcessMap[resolver.address()].second.machineClassFitness(ProcessClass::Resolver) != bestResolverFitness) {
TraceEvent("ConsistencyCheck_ResolverNotBest").detail("BestResolverFitness", bestResolverFitness).detail("ExistingResolverFitness", nonExcludedWorkerProcessMap.count(resolver.address()) ? nonExcludedWorkerProcessMap[resolver.address()].second.machineClassFitness(ProcessClass::Resolver) : -1);
if (!nonExcludedWorkerProcessMap.count(resolver.address()) || nonExcludedWorkerProcessMap[resolver.address()].processClass.machineClassFitness(ProcessClass::Resolver) != bestResolverFitness) {
TraceEvent("ConsistencyCheck_ResolverNotBest").detail("BestResolverFitness", bestResolverFitness).detail("ExistingResolverFitness", nonExcludedWorkerProcessMap.count(resolver.address()) ? nonExcludedWorkerProcessMap[resolver.address()].processClass.machineClassFitness(ProcessClass::Resolver) : -1);
return false;
}
}

View File

@ -69,11 +69,11 @@ struct CpuProfilerWorkload : TestWorkload
//If we are turning the profiler on, get a list of workers in the system
if(enabled)
{
vector<std::pair<WorkerInterface, ProcessClass>> _workers = wait( getWorkers( self->dbInfo ) );
vector<WorkerDetails> _workers = wait( getWorkers( self->dbInfo ) );
vector<WorkerInterface> workers;
for(int i = 0; i < _workers.size(); i++) {
if (self->roles.empty() || std::find(self->roles.cbegin(), self->roles.cend(), _workers[i].second.toString()) != self->roles.cend()) {
workers.push_back(_workers[i].first);
if (self->roles.empty() || std::find(self->roles.cbegin(), self->roles.cend(), _workers[i].processClass.toString()) != self->roles.cend()) {
workers.push_back(_workers[i].interf);
}
}
self->profilingWorkers = workers;

View File

@ -54,12 +54,12 @@ struct LogMetricsWorkload : TestWorkload {
ACTOR Future<Void> setSystemRate( LogMetricsWorkload *self, Database cx, uint32_t rate ) {
// set worker interval and ss interval
state BinaryWriter br(Unversioned());
vector<std::pair<WorkerInterface, ProcessClass>> workers = wait( getWorkers( self->dbInfo ) );
vector<WorkerDetails> workers = wait( getWorkers( self->dbInfo ) );
//vector<Future<Void>> replies;
TraceEvent("RateChangeTrigger");
SetMetricsLogRateRequest req(rate);
for(int i = 0; i < workers.size(); i++) {
workers[i].first.setMetricsRate.send( req );
workers[i].interf.setMetricsRate.send( req );
}
//wait( waitForAll( replies ) );

View File

@ -103,11 +103,11 @@ struct PerformanceWorkload : TestWorkload {
//FIXME: does not use testers which are recruited on workers
ACTOR Future<vector<TesterInterface>> getTesters( PerformanceWorkload *self) {
state vector<std::pair<WorkerInterface, ProcessClass>> workers;
state vector<WorkerDetails> workers;
loop {
choose {
when( vector<std::pair<WorkerInterface, ProcessClass>> w = wait( brokenPromiseToNever( self->dbInfo->get().clusterInterface.getWorkers.getReply( GetWorkersRequest( GetWorkersRequest::TESTER_CLASS_ONLY | GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY ) ) ) ) ) {
when( vector<WorkerDetails> w = wait( brokenPromiseToNever( self->dbInfo->get().clusterInterface.getWorkers.getReply( GetWorkersRequest( GetWorkersRequest::TESTER_CLASS_ONLY | GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY ) ) ) ) ) {
workers = w;
break;
}
@ -117,7 +117,7 @@ struct PerformanceWorkload : TestWorkload {
vector<TesterInterface> ts;
for(int i=0; i<workers.size(); i++)
ts.push_back(workers[i].first.testerInterface);
ts.push_back(workers[i].interf.testerInterface);
return ts;
}

View File

@ -180,10 +180,10 @@ struct PingWorkload : TestWorkload {
}
ACTOR Future<Void> workerPinger( PingWorkload* self ) {
vector<std::pair<WorkerInterface, ProcessClass>> workers = wait( getWorkers( self->dbInfo ) );
vector<WorkerDetails> workers = wait( getWorkers( self->dbInfo ) );
vector<RequestStream<LoadedPingRequest>> peers;
for(int i=0; i<workers.size(); i++)
peers.push_back( workers[i].first.debugPing );
peers.push_back( workers[i].interf.debugPing );
vector<Future<Void>> pingers;
for(int i=0; i<self->actorCount; i++)
pingers.push_back( self->pinger( self, peers ) );
@ -208,9 +208,9 @@ struct PingWorkload : TestWorkload {
state Future<Void> collection = actorCollection( addActor.getFuture() );
if( self->workerBroadcast ) {
vector<std::pair<WorkerInterface, ProcessClass>> workers = wait( getWorkers( self->dbInfo ) );
vector<WorkerDetails> workers = wait( getWorkers( self->dbInfo ) );
for( int i=0; i<workers.size(); i++ )
endpoints.push_back( workers[i].first.debugPing );
endpoints.push_back( workers[i].interf.debugPing );
} else {
vector<PingWorkloadInterface> peers = wait( self->fetchInterfaces( self, cx ) );
for( int i=0; i<peers.size(); i++ )

View File

@ -225,11 +225,11 @@ struct ReadWriteWorkload : KVWorkload {
ACTOR static Future<bool> traceDumpWorkers( Reference<AsyncVar<ServerDBInfo>> db ) {
try {
loop {
ErrorOr<vector<std::pair<WorkerInterface, ProcessClass>>> workerList = wait( db->get().clusterInterface.getWorkers.tryGetReply( GetWorkersRequest() ) );
ErrorOr<vector<WorkerDetails>> workerList = wait( db->get().clusterInterface.getWorkers.tryGetReply( GetWorkersRequest() ) );
if( workerList.present() ) {
std::vector<Future<ErrorOr<Void>>> dumpRequests;
for( int i = 0; i < workerList.get().size(); i++)
dumpRequests.push_back( workerList.get()[i].first.traceBatchDumpRequest.tryGetReply( TraceBatchDumpRequest() ) );
dumpRequests.push_back( workerList.get()[i].interf.traceBatchDumpRequest.tryGetReply( TraceBatchDumpRequest() ) );
wait( waitForAll( dumpRequests ) );
return true;
}

View File

@ -61,14 +61,14 @@ struct TargetedKillWorkload : TestWorkload {
return Void();
}
state vector<std::pair<WorkerInterface, ProcessClass>> workers = wait( getWorkers( self->dbInfo ) );
state vector<WorkerDetails> workers = wait( getWorkers( self->dbInfo ) );
int killed = 0;
for( int i = 0; i < workers.size(); i++ ) {
if( workers[i].first.master.getEndpoint().getPrimaryAddress() == address ||
( self->killAllMachineProcesses && workers[i].first.master.getEndpoint().getPrimaryAddress().ip == address.ip && workers[i].second != ProcessClass::TesterClass ) ) {
TraceEvent("WorkerKill").detail("TargetedMachine", address).detail("Worker", workers[i].first.id());
workers[i].first.clientInterface.reboot.send( RebootRequest() );
if( workers[i].interf.master.getEndpoint().getPrimaryAddress() == address ||
( self->killAllMachineProcesses && workers[i].interf.master.getEndpoint().getPrimaryAddress().ip == address.ip && workers[i].processClass != ProcessClass::TesterClass ) ) {
TraceEvent("WorkerKill").detail("TargetedMachine", address).detail("Worker", workers[i].interf.id());
workers[i].interf.clientInterface.reboot.send( RebootRequest() );
}
}

View File

@ -42,10 +42,10 @@ struct WorkerErrorsWorkload : TestWorkload {
virtual void getMetrics( vector<PerfMetric>& m ) {}
ACTOR Future< std::vector< TraceEventFields > > latestEventOnWorkers( std::vector<std::pair<WorkerInterface, ProcessClass>> workers ) {
ACTOR Future< std::vector< TraceEventFields > > latestEventOnWorkers( std::vector<WorkerDetails> workers ) {
state vector<Future<TraceEventFields>> eventTraces;
for(int c = 0; c < workers.size(); c++) {
eventTraces.push_back( workers[c].first.eventLogRequest.getReply( EventLogRequest() ) );
eventTraces.push_back( workers[c].interf.eventLogRequest.getReply( EventLogRequest() ) );
}
wait( timeoutError( waitForAll( eventTraces ), 2.0 ) );
@ -59,7 +59,7 @@ struct WorkerErrorsWorkload : TestWorkload {
}
ACTOR Future<Void> _start(Database cx, WorkerErrorsWorkload *self) {
state vector<std::pair<WorkerInterface, ProcessClass>> workers = wait( getWorkers( self->dbInfo ) );
state vector<WorkerDetails> workers = wait( getWorkers( self->dbInfo ) );
std::vector<TraceEventFields> errors = wait( self->latestEventOnWorkers( workers ) );
for(auto e : errors) {
printf("%s\n", e.toString().c_str());