Merge remote-tracking branch 'origin/master' into bindings-versionstamps-in-tuples

This commit is contained in:
Alec Grieser 2017-10-24 18:08:47 -07:00
commit 1855f876db
17 changed files with 138 additions and 69 deletions

View File

@ -116,6 +116,9 @@ public class Subspace
}
/**
* Create a human-readable string representation of this subspace. This is
* really only useful for debugging purposes, but it includes information
* on what raw prefix the subspace is using.
* @return a printable representation of the subspace
*/
@Override
@ -123,6 +126,16 @@ public class Subspace
return "Subspace(rawPrefix=" + printable(rawPrefix) + ")";
}
/**
* Returns a hash-table compatible hash of this subspace. This is based off
* of the hash of the underlying byte-array prefix.
* @return a hash of this subspace
*/
@Override
public int hashCode() {
return Arrays.hashCode(rawPrefix);
}
/**
* Gets a new subspace which is equivalent to this subspace with its prefix {@link Tuple} extended by
* the specified {@code Object}. The object will be inserted into a {@link Tuple} and passed to {@link #get(Tuple)}.

View File

@ -116,6 +116,9 @@ public class Subspace
}
/**
* Create a human-readable string representation of this subspace. This is
* really only useful for debugging purposes, but it includes information
* on what raw prefix the subspace is using.
* @return a printable representation of the subspace
*/
@Override
@ -123,6 +126,16 @@ public class Subspace
return "Subspace(rawPrefix=" + printable(rawPrefix) + ")";
}
/**
* Returns a hash-table compatible hash of this subspace. This is based off
* of the hash of the underlying byte-array prefix.
* @return a hash of this subspace
*/
@Override
public int hashCode() {
return Arrays.hashCode(rawPrefix);
}
/**
* Gets a new subspace which is equivalent to this subspace with its prefix {@link Tuple} extended by
* the specified {@code Object}. The object will be inserted into a {@link Tuple} and passed to {@link #get(Tuple)}.
@ -184,7 +197,7 @@ public class Subspace
* @return the key encoding the specified tuple in this {@code Subspace}
*/
public byte[] pack(Tuple tuple) {
return join(rawPrefix, tuple.pack());
return tuple.pack(rawPrefix);
}
/**

View File

@ -698,6 +698,8 @@ void printStatus(StatusObjectReader statusObj, StatusClient::StatusLevel level,
description += format("\nNeed at least %d log servers, %d proxies and %d resolvers.", recoveryState["required_logs"].get_int(), recoveryState["required_proxies"].get_int(), recoveryState["required_resolvers"].get_int());
if (statusObjCluster.has("machines") && statusObjCluster.has("processes"))
description += format("\nHave %d processes on %d machines.", statusObjCluster["processes"].get_obj().size(), statusObjCluster["machines"].get_obj().size());
} else if (name == "locking_old_transaction_servers" && recoveryState["missing_logs"].get_str().size()) {
description += format("\nNeed one or more of the following log servers: %s", recoveryState["missing_logs"].get_str().c_str());
}
description = lineWrap(description.c_str(), 80);
if (!printedCoordinators && (

View File

@ -998,7 +998,7 @@ namespace fileBackup {
state Key nextKey = keyAfter(lastKey);
Void _ = wait(saveAndExtendIncrementally(cx, taskBucket, task,
endKeyRangeFile(cx, backup, &rangeFile, backupContainer, &outFileName, endKey, outVersion),
endKeyRangeFile(cx, backup, &rangeFile, backupContainer, &outFileName, nextKey, outVersion),
timeout // time at which to do the first saveAndExtend
)
);

View File

@ -563,6 +563,18 @@ public:
return resolverCount > r.resolverCount;
}
bool betterInDatacenterFitness (InDatacenterFitness const& r) const {
int lmax = std::max(resolverFit,proxyFit);
int lmin = std::min(resolverFit,proxyFit);
int rmax = std::max(r.resolverFit,r.proxyFit);
int rmin = std::min(r.resolverFit,r.proxyFit);
if( lmax != rmax ) return lmax < rmax;
if( lmin != rmin ) return lmin < rmin;
return false;
}
bool operator == (InDatacenterFitness const& r) const { return proxyFit == r.proxyFit && resolverFit == r.resolverFit && proxyCount == r.proxyCount && resolverCount == r.resolverCount; }
};
@ -689,6 +701,10 @@ public:
id_used[masterProcessId]++;
ProcessClass::Fitness oldMasterFit = masterWorker->second.processClass.machineClassFitness( ProcessClass::Master );
if(db.config.isExcludedServer(dbi.master.address())) {
oldMasterFit = std::max(oldMasterFit, ProcessClass::WorstFit);
}
ProcessClass::Fitness newMasterFit = getMasterWorker(db.config, true).second.machineClassFitness( ProcessClass::Master );
if(dbi.recoveryState < RecoveryState::FULLY_RECOVERED) {
@ -751,7 +767,8 @@ public:
newInFit = fitness;
}
if(oldInFit < newInFit) return false;
if(oldInFit.betterInDatacenterFitness(newInFit)) return false;
if(oldMasterFit > newMasterFit || oldAcrossFit > newAcrossFit || oldInFit > newInFit) {
TraceEvent("BetterMasterExists", id).detail("oldMasterFit", oldMasterFit).detail("newMasterFit", newMasterFit)
.detail("oldAcrossFitC", oldAcrossFit.tlogCount).detail("newAcrossFitC", newAcrossFit.tlogCount)
@ -1663,18 +1680,22 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
registerWorker( req, &self );
}
when( GetWorkersRequest req = waitNext( interf.getWorkers.getFuture() ) ) {
if ( req.flags & GetWorkersRequest::FLAG_TESTER_CLASS ) {
vector<std::pair<WorkerInterface, ProcessClass>> testers;
for(auto& it : self.id_worker)
if (it.second.processClass.classType() == ProcessClass::TesterClass)
testers.push_back(std::make_pair(it.second.interf, it.second.processClass));
req.reply.send( testers );
} else {
vector<std::pair<WorkerInterface, ProcessClass>> workers;
for(auto& it : self.id_worker)
workers.push_back(std::make_pair(it.second.interf, it.second.processClass));
req.reply.send( workers );
vector<std::pair<WorkerInterface, ProcessClass>> workers;
auto masterAddr = self.db.serverInfo->get().master.address();
for(auto& it : self.id_worker) {
if ( (req.flags & GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY) && self.db.config.isExcludedServer(it.second.interf.address()) ) {
continue;
}
if ( (req.flags & GetWorkersRequest::TESTER_CLASS_ONLY) && it.second.processClass.classType() != ProcessClass::TesterClass ) {
continue;
}
workers.push_back(std::make_pair(it.second.interf, it.second.processClass));
}
req.reply.send( workers );
}
when( GetClientWorkersRequest req = waitNext( interf.clientInterface.getClientWorkers.getFuture() ) ) {
vector<ClientWorkerInterface> workers;

View File

@ -130,7 +130,7 @@ struct RegisterWorkerRequest {
};
struct GetWorkersRequest {
enum { FLAG_TESTER_CLASS = 1 };
enum { TESTER_CLASS_ONLY = 0x1, NON_EXCLUDED_PROCESSES_ONLY = 0x2 };
int flags;
ReplyPromise<vector<std::pair<WorkerInterface, ProcessClass>>> reply;

View File

@ -30,10 +30,10 @@
#include "Status.h"
#include "fdbclient/ManagementAPI.h"
ACTOR Future<vector<std::pair<WorkerInterface, ProcessClass>>> getWorkers( Reference<AsyncVar<ServerDBInfo>> dbInfo ) {
ACTOR Future<vector<std::pair<WorkerInterface, ProcessClass>>> getWorkers( Reference<AsyncVar<ServerDBInfo>> dbInfo, int flags = 0 ) {
loop {
choose {
when( vector<std::pair<WorkerInterface, ProcessClass>> w = wait( brokenPromiseToNever( dbInfo->get().clusterInterface.getWorkers.getReply( GetWorkersRequest() ) ) ) ) {
when( vector<std::pair<WorkerInterface, ProcessClass>> w = wait( brokenPromiseToNever( dbInfo->get().clusterInterface.getWorkers.getReply( GetWorkersRequest( flags ) ) ) ) ) {
return w;
}
when( Void _ = wait( dbInfo->onChange() ) ) {}

View File

@ -33,7 +33,7 @@ Future<int64_t> getMaxTLogQueueSize( Database const& cx, Reference<AsyncVar<stru
Future<int64_t> getMaxStorageServerQueueSize( Database const& cx, Reference<AsyncVar<struct ServerDBInfo>> const& );
Future<int64_t> getDataDistributionQueueSize( Database const &cx, Reference<AsyncVar<struct ServerDBInfo>> const&, bool const& reportInFlight );
Future<vector<StorageServerInterface>> getStorageServers( Database const& cx, bool const &use_system_priority = false);
Future<vector<std::pair<WorkerInterface, ProcessClass>>> getWorkers( Reference<AsyncVar<ServerDBInfo>> const& dbInfo );
Future<vector<std::pair<WorkerInterface, ProcessClass>>> getWorkers( Reference<AsyncVar<ServerDBInfo>> const& dbInfo, int const& flags = 0 );
Future<WorkerInterface> getMasterWorker( Database const& cx, Reference<AsyncVar<ServerDBInfo>> const& dbInfo );
//Waits for f to complete. If simulated, disables connection failures after waiting a specified amount of time

View File

@ -904,11 +904,11 @@ static StatusObject clientStatusFetcher(ClientVersionMap clientVersionMap) {
return clientStatus;
}
ACTOR static Future<StatusObject> recoveryStateStatusFetcher(std::pair<WorkerInterface, ProcessClass> mWorker, std::string dbName, int workerCount, std::set<std::string> *incomplete_reasons) {
ACTOR static Future<StatusObject> recoveryStateStatusFetcher(std::pair<WorkerInterface, ProcessClass> mWorker, int workerCount, std::set<std::string> *incomplete_reasons) {
state StatusObject message;
try {
Standalone<StringRef> md = wait( timeoutError(mWorker.first.eventLogRequest.getReply( EventLogRequest(StringRef(dbName+"/MasterRecoveryState") ) ), 1.0) );
Standalone<StringRef> md = wait( timeoutError(mWorker.first.eventLogRequest.getReply( EventLogRequest( LiteralStringRef("MasterRecoveryState") ) ), 1.0) );
state int mStatusCode = parseInt( extractAttribute(md, LiteralStringRef("StatusCode")) );
if (mStatusCode < 0 || mStatusCode >= RecoveryStatus::END)
throw attribute_not_found();
@ -926,6 +926,8 @@ ACTOR static Future<StatusObject> recoveryStateStatusFetcher(std::pair<WorkerInt
message["required_logs"] = requiredLogs;
message["required_proxies"] = requiredProxies;
message["required_resolvers"] = requiredResolvers;
} else if (mStatusCode == RecoveryStatus::locking_old_transaction_servers) {
message["missing_logs"] = extractAttribute(md, LiteralStringRef("MissingIDs")).c_str();
}
// TODO: time_in_recovery: 0.5
// time_in_state: 0.1
@ -1744,7 +1746,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
}
// construct status information for cluster subsections
state StatusObject recoveryStateStatus = wait(recoveryStateStatusFetcher(mWorker, dbName, workers.size(), &status_incomplete_reasons));
state StatusObject recoveryStateStatus = wait(recoveryStateStatusFetcher(mWorker, workers.size(), &status_incomplete_reasons));
// machine metrics
state WorkerEvents mMetrics = workerEventsVec[0].present() ? workerEventsVec[0].get().first : WorkerEvents();

View File

@ -28,6 +28,7 @@
#include "fdbrpc/simulator.h"
#include "fdbrpc/Replication.h"
#include "fdbrpc/ReplicationUtils.h"
#include "RecoveryState.h"
template <class Collection>
void uniquify( Collection& c ) {
@ -648,6 +649,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
std::vector<TLogLockResult> results;
std::string sServerState;
LocalityGroup unResponsiveSet;
std::string missingServerIds;
double t = timer();
cycles ++;
@ -660,6 +663,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
else {
unResponsiveSet.add(prevState.tLogLocalities[t]);
sServerState += 'f';
if(missingServerIds.size()) {
missingServerIds += ", ";
}
missingServerIds += logServers[t]->get().toString();
}
}
@ -773,22 +780,12 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
.detail("LogZones", ::describeZones(prevState.tLogLocalities))
.detail("LogDataHalls", ::describeDataHalls(prevState.tLogLocalities));
}
}
// Too many failures
else {
TraceEvent("LogSystemWaitingForRecovery", dbgid).detail("Cycles", cycles)
.detail("AvailableServers", results.size())
.detail("TotalServers", logServers.size())
.detail("Present", results.size())
.detail("Available", availableItems.size())
.detail("Absent", logServers.size() - results.size())
.detail("ServerState", sServerState)
.detail("ReplicationFactor", prevState.tLogReplicationFactor)
.detail("AntiQuorum", prevState.tLogWriteAntiQuorum)
.detail("Policy", prevState.tLogPolicy->info())
.detail("TooManyFailures", bTooManyFailures)
.detail("LogZones", ::describeZones(prevState.tLogLocalities))
.detail("LogDataHalls", ::describeDataHalls(prevState.tLogLocalities));
} else {
TraceEvent("MasterRecoveryState", dbgid)
.detail("StatusCode", RecoveryStatus::locking_old_transaction_servers)
.detail("Status", RecoveryStatus::names[RecoveryStatus::locking_old_transaction_servers])
.detail("MissingIDs", missingServerIds)
.trackLatest("MasterRecoveryState");
}
// Wait for anything relevant to change

View File

@ -452,7 +452,7 @@ ACTOR Future<Void> recruitEverything( Reference<MasterData> self, vector<Storage
TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", status)
.detail("Status", RecoveryStatus::names[status])
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
.trackLatest("MasterRecoveryState");
return Never();
} else
TraceEvent("MasterRecoveryState", self->dbgid)
@ -465,7 +465,7 @@ ACTOR Future<Void> recruitEverything( Reference<MasterData> self, vector<Storage
.detail("RequiredResolvers", 1)
.detail("DesiredResolvers", self->configuration.getDesiredResolvers())
.detail("storeType", self->configuration.storageServerStoreType)
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
.trackLatest("MasterRecoveryState");
RecruitFromConfigurationReply recruits = wait(
brokenPromiseToNever( self->clusterController.recruitFromConfiguration.getReply(
@ -477,7 +477,7 @@ ACTOR Future<Void> recruitEverything( Reference<MasterData> self, vector<Storage
.detail("Proxies", recruits.proxies.size())
.detail("TLogs", recruits.tLogs.size())
.detail("Resolvers", recruits.resolvers.size())
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
.trackLatest("MasterRecoveryState");
// Actually, newSeedServers does both the recruiting and initialization of the seed servers; so if this is a brand new database we are sort of lying that we are
// past the recruitment phase. In a perfect world we would split that up so that the recruitment part happens above (in parallel with recruiting the transaction servers?).
@ -637,7 +637,7 @@ ACTOR Future<Void> recoverFrom( Reference<MasterData> self, Reference<ILogSystem
TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::reading_transaction_system_state)
.detail("Status", RecoveryStatus::names[RecoveryStatus::reading_transaction_system_state])
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
.trackLatest("MasterRecoveryState");
self->hasConfiguration = false;
if(BUGGIFY)
@ -966,7 +966,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::reading_coordinated_state)
.detail("Status", RecoveryStatus::names[RecoveryStatus::reading_coordinated_state])
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
.trackLatest("MasterRecoveryState");
Value prevDBStateRaw = wait( self->cstate1.read() );
addActor.send( masterTerminateOnConflict( self, self->cstate1.onConflict() ) );
@ -981,7 +981,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
.detail("TLogs", self->prevDBState.tLogs.size())
.detail("MyRecoveryCount", self->prevDBState.recoveryCount+2)
.detail("StateSize", prevDBStateRaw.size())
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
.trackLatest("MasterRecoveryState");
state Reference<AsyncVar<Reference<ILogSystem>>> oldLogSystems( new AsyncVar<Reference<ILogSystem>> );
state Future<Void> recoverAndEndEpoch = ILogSystem::recoverAndEndEpoch(oldLogSystems, self->dbgid, self->prevDBState, self->myInterface.tlogRejoin.getFuture(), self->myInterface.locality);
@ -999,7 +999,8 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::locking_old_transaction_servers)
.detail("Status", RecoveryStatus::names[RecoveryStatus::locking_old_transaction_servers])
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
.detail("MissingIDs", "")
.trackLatest("MasterRecoveryState");
loop {
Reference<ILogSystem> oldLogSystem = oldLogSystems->get();
@ -1025,7 +1026,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::recovery_transaction)
.detail("Status", RecoveryStatus::names[RecoveryStatus::recovery_transaction])
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
.trackLatest("MasterRecoveryState");
// Recovery transaction
state bool debugResult = debug_checkMinRestoredVersion( UID(), self->lastEpochEnd, "DBRecovery", SevWarn );
@ -1101,7 +1102,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
.detail("Status", RecoveryStatus::names[RecoveryStatus::writing_coordinated_state])
.detail("TLogs", self->logSystem->getLogServerCount())
.detail("TLogList", self->logSystem->describe())
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
.trackLatest("MasterRecoveryState");
// Multiple masters prevent conflicts between themselves via CoordinatedState (self->cstate)
// 1. If SetMaster succeeds, then by CS's contract, these "new" Tlogs are the immediate
@ -1137,7 +1138,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self, PromiseStream<Future<
.detail("StatusCode", RecoveryStatus::fully_recovered)
.detail("Status", RecoveryStatus::names[RecoveryStatus::fully_recovered])
.detail("storeType", self->configuration.storageServerStoreType)
.trackLatest(format("%s/MasterRecoveryState", printable(self->dbName).c_str() ).c_str());
.trackLatest("MasterRecoveryState");
// Now that recovery is complete, we register ourselves with the cluster controller, so that the client and server information
// it hands out can be updated

View File

@ -1079,7 +1079,7 @@ ACTOR Future<Void> runTests( Reference<AsyncVar<Optional<struct ClusterControlle
ACTOR Future<Void> runTests( Reference<AsyncVar<Optional<struct ClusterControllerFullInterface>>> cc,
Reference<AsyncVar<Optional<struct ClusterInterface>>> ci, vector<TestSpec> tests, test_location_t at,
int minTestersExpected, StringRef startingConfiguration, LocalityData locality ) {
state int flags = at == TEST_ON_SERVERS ? 0 : GetWorkersRequest::FLAG_TESTER_CLASS;
state int flags = (at == TEST_ON_SERVERS ? 0 : GetWorkersRequest::TESTER_CLASS_ONLY) | GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY;
state Future<Void> testerTimeout = delay(60.0); // wait 60 sec for testers to show up
state vector<std::pair<WorkerInterface, ProcessClass>> workers;

View File

@ -241,11 +241,10 @@ struct ConsistencyCheckWorkload : TestWorkload
bool hasStorage = wait( self->checkForStorage(cx, configuration, self) );
bool hasExtraStores = wait( self->checkForExtraDataStores(cx, self) );
//SOMEDAY: enable this check when support for background reassigning server type is supported
//Check that each machine is operating as its desired class
/*bool usingDesiredClasses = wait(self->checkUsingDesiredClasses(cx, self));
bool usingDesiredClasses = wait(self->checkUsingDesiredClasses(cx, self));
if(!usingDesiredClasses)
self->testFailure("Cluster has machine(s) not using requested classes");*/
self->testFailure("Cluster has machine(s) not using requested classes");
bool workerListCorrect = wait( self->checkWorkerList(cx, self) );
if(!workerListCorrect)
@ -1176,32 +1175,53 @@ struct ConsistencyCheckWorkload : TestWorkload
return true;
}
static ProcessClass::Fitness getBestAvailableFitness(std::set<ProcessClass::ClassType>& availableClassTypes, ProcessClass::ClusterRole role) {
ProcessClass::Fitness bestAvailableFitness = ProcessClass::NeverAssign;
for (auto classType : availableClassTypes) {
bestAvailableFitness = std::min(bestAvailableFitness, ProcessClass(classType, ProcessClass::InvalidSource).machineClassFitness(role));
}
return bestAvailableFitness;
}
//Returns true if all machines in the cluster that specified a desired class are operating in that class
ACTOR Future<bool> checkUsingDesiredClasses(Database cx, ConsistencyCheckWorkload *self)
{
state vector<std::pair<WorkerInterface, ProcessClass>> workers = wait( getWorkers( self->dbInfo ) );
state vector<std::pair<WorkerInterface, ProcessClass>> workers = wait( getWorkers( self->dbInfo, GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY ) );
state vector<StorageServerInterface> storageServers = wait( getStorageServers( cx ) );
auto& db = self->dbInfo->get();
//Check master server
if(!self->workerHasClass(workers, db.master.address(), ProcessClass::ResolutionClass, "Master"))
std::set<ProcessClass::ClassType> availableClassTypes;
std::map<NetworkAddress, ProcessClass> workerProcessMap;
for (auto worker : workers) {
availableClassTypes.insert(worker.second.classType());
workerProcessMap[worker.first.address()] = worker.second;
}
// Check master
ProcessClass::Fitness bestMasterFitness = getBestAvailableFitness(availableClassTypes, ProcessClass::Master);
if (!workerProcessMap.count(db.master.address()) || workerProcessMap[db.master.address()].machineClassFitness(ProcessClass::Master) != bestMasterFitness) {
return false;
}
//Check master proxies
for(int i = 0; i < db.client.proxies.size(); i++)
if(!self->workerHasClass(workers, db.client.proxies[i].address(), ProcessClass::TransactionClass, "MasterProxy"))
// Check master proxy
ProcessClass::Fitness bestMasterProxyFitness = getBestAvailableFitness(availableClassTypes, ProcessClass::Proxy);
for (auto masterProxy : db.client.proxies) {
if (!workerProcessMap.count(masterProxy.address()) || workerProcessMap[masterProxy.address()].machineClassFitness(ProcessClass::Proxy) != bestMasterProxyFitness) {
return false;
}
}
//Check storage servers
for(int i = 0; i < storageServers.size(); i++)
if(!self->workerHasClass(workers, storageServers[i].address(), ProcessClass::StorageClass, "StorageServer"))
// Check master resolver
ProcessClass::Fitness bestResolverFitness = getBestAvailableFitness(availableClassTypes, ProcessClass::Resolver);
for (auto resolver : db.resolvers) {
if (!workerProcessMap.count(resolver.address()) || workerProcessMap[resolver.address()].machineClassFitness(ProcessClass::Resolver) != bestResolverFitness) {
return false;
}
}
//Check tlogs
std::vector<TLogInterface> logs = db.logSystemConfig.allPresentLogs();
for(int i = 0; i < logs.size(); i++)
if(!self->workerHasClass(workers, logs[i].address(), ProcessClass::TransactionClass, "TLog"))
return false;
// TODO: Check Tlog and cluster controller
return true;
}

View File

@ -107,7 +107,7 @@ struct PerformanceWorkload : TestWorkload {
loop {
choose {
when( vector<std::pair<WorkerInterface, ProcessClass>> w = wait( brokenPromiseToNever( self->dbInfo->get().clusterInterface.getWorkers.getReply( GetWorkersRequest( GetWorkersRequest::FLAG_TESTER_CLASS ) ) ) ) ) {
when( vector<std::pair<WorkerInterface, ProcessClass>> w = wait( brokenPromiseToNever( self->dbInfo->get().clusterInterface.getWorkers.getReply( GetWorkersRequest( GetWorkersRequest::TESTER_CLASS_ONLY | GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY ) ) ) ) ) {
workers = w;
break;
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long