Fixed support for RemoveServers Workload
Added availability functions to simulation
This commit is contained in:
parent
93509133ad
commit
0b9ed67e12
|
@ -944,9 +944,27 @@ public:
|
|||
|
||||
return m;
|
||||
}
|
||||
virtual bool isAvailable() const
|
||||
{
|
||||
std::vector<ProcessInfo*> processesLeft, processesDead;
|
||||
|
||||
for (auto processInfo : getAllProcesses()) {
|
||||
// Add non-test processes (ie. datahall is not be set for test processes)
|
||||
if (processInfo->startingClass != ProcessClass::TesterClass) {
|
||||
// Do not kill protected processes
|
||||
if (protectedAddresses.count(processInfo->address))
|
||||
processesLeft.push_back(processInfo);
|
||||
else if (processInfo->isAvailable())
|
||||
processesLeft.push_back(processInfo);
|
||||
else
|
||||
processesDead.push_back(processInfo);
|
||||
}
|
||||
}
|
||||
return canKillProcesses(processesLeft, processesDead, KillInstantly, NULL);
|
||||
}
|
||||
|
||||
// The following function will determine if the specified configuration of available and dead processes can allow the cluster to survive
|
||||
virtual bool canKillProcesses(std::vector<ProcessInfo*> const& availableProcesses, std::vector<ProcessInfo*> const& deadProcesses, KillType kt, KillType* newKillType)
|
||||
virtual bool canKillProcesses(std::vector<ProcessInfo*> const& availableProcesses, std::vector<ProcessInfo*> const& deadProcesses, KillType kt, KillType* newKillType) const
|
||||
{
|
||||
bool canSurvive = true;
|
||||
KillType newKt = kt;
|
||||
|
@ -1017,12 +1035,12 @@ public:
|
|||
TEST( kt == InjectFaults ); // Simulated machine was killed with faults
|
||||
|
||||
if (kt == KillInstantly) {
|
||||
TraceEvent(SevWarn, "FailMachine").detail("Name", machine->name).detail("Address", machine->address).detailext("ZoneId", machine->locality.zoneId()).backtrace();
|
||||
TraceEvent(SevWarn, "FailMachine").detail("Name", machine->name).detail("Address", machine->address).detailext("ZoneId", machine->locality.zoneId()).detail("Process", describe(*machine)).detail("Rebooting", machine->rebooting).backtrace();
|
||||
// This will remove all the "tracked" messages that came from the machine being killed
|
||||
latestEventCache.clear();
|
||||
machine->failed = true;
|
||||
} else if (kt == InjectFaults) {
|
||||
TraceEvent(SevWarn, "FaultMachine").detail("Name", machine->name).detail("Address", machine->address).detailext("ZoneId", machine->locality.zoneId()).backtrace();
|
||||
TraceEvent(SevWarn, "FaultMachine").detail("Name", machine->name).detail("Address", machine->address).detailext("ZoneId", machine->locality.zoneId()).detail("Process", describe(*machine)).detail("Rebooting", machine->rebooting).backtrace();
|
||||
should_inject_fault = simulator_should_inject_fault;
|
||||
machine->fault_injection_r = g_random->randomUniqueID().first();
|
||||
machine->fault_injection_p1 = 0.1;
|
||||
|
@ -1030,6 +1048,7 @@ public:
|
|||
} else {
|
||||
ASSERT( false );
|
||||
}
|
||||
ASSERT(!protectedAddresses.count(machine->address) || machine->rebooting);
|
||||
}
|
||||
virtual void rebootProcess( ProcessInfo* process, KillType kt ) {
|
||||
if( kt == RebootProcessAndDelete && protectedAddresses.count(process->address) )
|
||||
|
@ -1096,25 +1115,31 @@ public:
|
|||
for (auto processInfo : machineRec.second.processes) {
|
||||
// Add non-test processes (ie. datahall is not be set for test processes)
|
||||
if (processInfo->startingClass != ProcessClass::TesterClass) {
|
||||
// Add machine processes to dead group if dead or specified kill machine
|
||||
if (processInfo->failed || (machineRec.second.zoneId == zoneId))
|
||||
processesDead.push_back(processInfo);
|
||||
else
|
||||
if (protectedAddresses.count(processInfo->address))
|
||||
processesLeft.push_back(processInfo);
|
||||
else if (processInfo->isAvailable() && (machineRec.second.zoneId != zoneId)) {
|
||||
processesLeft.push_back(processInfo);
|
||||
}
|
||||
// Add processes from dead machines and datacenter machines to dead group
|
||||
else
|
||||
processesDead.push_back(processInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!canKillProcesses(processesLeft, processesLeft, kt, &kt)) {
|
||||
if (!canKillProcesses(processesLeft, processesDead, kt, &kt)) {
|
||||
if ((kt != Reboot) && (!killIsSafe)) {
|
||||
kt = Reboot;
|
||||
}
|
||||
TraceEvent("ChangedKillMachine", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("OrigKillType", ktOrig).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalZones", machines.size()).detail("processesPerMachine", processesPerMachine).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info());
|
||||
TraceEvent("ChangedKillMachine", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("OrigKillType", ktOrig).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalProcesses", machines.size()).detail("processesPerMachine", processesPerMachine).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info());
|
||||
}
|
||||
else if ((kt == KillInstantly) || (kt == InjectFaults)) {
|
||||
TraceEvent("DeadMachine", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalZones", machines.size()).detail("processesPerMachine", processesPerMachine).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info());
|
||||
TraceEvent("DeadMachine", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalProcesses", machines.size()).detail("processesPerMachine", processesPerMachine).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info());
|
||||
for (auto process : processesLeft) {
|
||||
TraceEvent("DeadMachineSurvivors", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("SurvivingProcess", describe(*process));
|
||||
}
|
||||
}
|
||||
else {
|
||||
TraceEvent("ClearMachine", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalZones", machines.size()).detail("processesPerMachine", processesPerMachine).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info());
|
||||
TraceEvent("ClearMachine", zoneId).detailext("ZoneId", zoneId).detail("KillType", kt).detail("ProcessesLeft", processesLeft.size()).detail("ProcessesDead", processesDead.size()).detail("TotalProcesses", machines.size()).detail("processesPerMachine", processesPerMachine).detail("tLogPolicy", tLogPolicy->info()).detail("storagePolicy", storagePolicy->info());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1125,7 +1150,7 @@ public:
|
|||
return false;
|
||||
}
|
||||
|
||||
TraceEvent("KillMachine", zoneId).detailext("ZoneId", zoneId).detail("Kt", kt).detail("KtOrig", ktOrig).detail("KilledMachines", killedMachines).detail("KillableMachines", processesOnMachine).detail("ProcessPerMachine", processesPerMachine).detail("KillChanged", kt == ktOrig).detail("killIsSafe", killIsSafe);
|
||||
TraceEvent("KillMachine", zoneId).detailext("ZoneId", zoneId).detail("Kt", kt).detail("KtOrig", ktOrig).detail("KilledMachines", killedMachines).detail("KillableMachines", processesOnMachine).detail("ProcessPerMachine", processesPerMachine).detail("KillChanged", kt!=ktOrig).detail("killIsSafe", killIsSafe);
|
||||
if (kt < RebootAndDelete ) {
|
||||
for (auto& process : machines[zoneId].processes) {
|
||||
TraceEvent("KillMachineProcess", zoneId).detail("KillType", kt).detail("Process", process->toString()).detail("startingClass", process->startingClass.toString());
|
||||
|
@ -1171,16 +1196,19 @@ public:
|
|||
for (auto processInfo : machineRec.second.processes) {
|
||||
// Add non-test processes (ie. datahall is not be set for test processes)
|
||||
if (processInfo->startingClass != ProcessClass::TesterClass) {
|
||||
// Add processes from dead machines and datacenter machines to dead group
|
||||
if (processInfo->failed || (datacenterZones.find(machineRec.second.zoneId) != datacenterZones.end()))
|
||||
processesDead.push_back(processInfo);
|
||||
else
|
||||
// Do not kill protected processes
|
||||
if (protectedAddresses.count(processInfo->address))
|
||||
processesLeft.push_back(processInfo);
|
||||
else if (processInfo->isAvailable() && (datacenterZones.find(machineRec.second.zoneId) == datacenterZones.end())) {
|
||||
processesLeft.push_back(processInfo);
|
||||
}
|
||||
else
|
||||
processesDead.push_back(processInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!canKillProcesses(processesLeft, processesLeft, kt, &kt)) {
|
||||
if (!canKillProcesses(processesLeft, processesDead, kt, &kt)) {
|
||||
TraceEvent(SevWarn, "DcKillChanged").detailext("DataCenter", dcId).detail("KillType", ktOrig).detail("NewKillType", kt);
|
||||
}
|
||||
else {
|
||||
|
@ -1219,7 +1247,7 @@ public:
|
|||
virtual void clogPair( uint32_t from, uint32_t to, double seconds ) {
|
||||
g_clogging.clogPairFor( from, to, seconds );
|
||||
}
|
||||
virtual std::vector<ProcessInfo*> getAllProcesses() {
|
||||
virtual std::vector<ProcessInfo*> getAllProcesses() const {
|
||||
std::vector<ProcessInfo*> processes;
|
||||
for( auto c = machines.begin(); c != machines.end(); ++c )
|
||||
processes.insert( processes.end(), c->second.processes.begin(), c->second.processes.end() );
|
||||
|
|
|
@ -55,6 +55,7 @@ public:
|
|||
TDMetricCollection tdmetrics;
|
||||
Reference<IListener> listener;
|
||||
bool failed;
|
||||
bool excluded;
|
||||
int64_t cpuTicks;
|
||||
bool rebooting;
|
||||
std::vector<flowGlobalType> globals;
|
||||
|
@ -67,14 +68,15 @@ public:
|
|||
ProcessInfo(const char* name, LocalityData locality, ProcessClass startingClass, NetworkAddress address,
|
||||
INetworkConnections *net, const char* dataFolder, const char* coordinationFolder )
|
||||
: name(name), locality(locality), startingClass(startingClass), address(address), dataFolder(dataFolder),
|
||||
network(net), coordinationFolder(coordinationFolder), failed(false), cpuTicks(0),
|
||||
network(net), coordinationFolder(coordinationFolder), failed(false), excluded(false), cpuTicks(0),
|
||||
rebooting(false), fault_injection_p1(0), fault_injection_p2(0),
|
||||
fault_injection_r(0), machine(0)
|
||||
{}
|
||||
|
||||
Future<KillType> onShutdown() { return shutdownSignal.getFuture(); }
|
||||
|
||||
bool isReliable() { return !failed && fault_injection_p1 == 0 && fault_injection_p2 == 0; }
|
||||
bool isReliable() const { return !failed && fault_injection_p1 == 0 && fault_injection_p2 == 0; }
|
||||
bool isAvailable() const { return !excluded && isReliable(); }
|
||||
|
||||
inline flowGlobalType global(int id) { return (globals.size() > id) ? globals[id] : NULL; };
|
||||
inline void setGlobal(size_t id, flowGlobalType v) { globals.resize(std::max(globals.size(),id+1)); globals[id] = v; };
|
||||
|
@ -129,7 +131,8 @@ public:
|
|||
virtual bool killMachine(Optional<Standalone<StringRef>> zoneId, KillType, bool killIsSafe = false, bool forceKill = false ) = 0;
|
||||
virtual void killDataCenter(Optional<Standalone<StringRef>> dcId, KillType ) = 0;
|
||||
//virtual KillType getMachineKillState( UID zoneID ) = 0;
|
||||
virtual bool canKillProcesses(std::vector<ProcessInfo*> const& availableProcesses, std::vector<ProcessInfo*> const& deadProcesses, KillType kt, KillType* newKillType) = 0;
|
||||
virtual bool canKillProcesses(std::vector<ProcessInfo*> const& availableProcesses, std::vector<ProcessInfo*> const& deadProcesses, KillType kt, KillType* newKillType) const = 0;
|
||||
virtual bool isAvailable() const = 0;
|
||||
|
||||
virtual void disableSwapToMachine(Optional<Standalone<StringRef>> zoneId ) {
|
||||
swapsDisabled.insert(zoneId);
|
||||
|
@ -152,7 +155,7 @@ public:
|
|||
|
||||
virtual void clogInterface( uint32_t ip, double seconds, ClogMode mode = ClogDefault ) = 0;
|
||||
virtual void clogPair( uint32_t from, uint32_t to, double seconds ) = 0;
|
||||
virtual std::vector<ProcessInfo*> getAllProcesses() = 0;
|
||||
virtual std::vector<ProcessInfo*> getAllProcesses() const = 0;
|
||||
virtual ProcessInfo* getProcessByAddress( NetworkAddress const& address ) = 0;
|
||||
virtual MachineInfo* getMachineByNetworkAddress(NetworkAddress const& address) = 0;
|
||||
virtual MachineInfo* getMachineById(Optional<Standalone<StringRef>> const& zoneId) = 0;
|
||||
|
|
|
@ -215,6 +215,7 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
|
|||
{
|
||||
std::map<ProcessClass::Fitness, vector<std::pair<WorkerInterface, ProcessClass>>> fitness_workers;
|
||||
std::vector<std::pair<WorkerInterface, ProcessClass>> results;
|
||||
std::vector<LocalityData> unavailableLocals;
|
||||
LocalitySetRef logServerSet;
|
||||
LocalityMap<std::pair<WorkerInterface, ProcessClass>>* logServerMap;
|
||||
bool bCompleted = false;
|
||||
|
@ -227,6 +228,24 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
|
|||
if( workerAvailable(it.second, checkStable) && !conf.isExcludedServer(it.second.interf.address()) && fitness != ProcessClass::NeverAssign ) {
|
||||
fitness_workers[ fitness ].push_back(std::make_pair(it.second.interf, it.second.processClass));
|
||||
}
|
||||
else {
|
||||
if (it.second.interf.locality.dataHallId().present())
|
||||
TraceEvent(SevWarn,"GWFTADNotAvailable", id)
|
||||
.detail("Fitness", fitness)
|
||||
.detailext("Zone", it.second.interf.locality.zoneId())
|
||||
.detailext("DataHall", it.second.interf.locality.dataHallId())
|
||||
.detail("Address", it.second.interf.address())
|
||||
.detail("workerAvailable", workerAvailable(it.second, checkStable))
|
||||
.detail("isExcludedServer", conf.isExcludedServer(it.second.interf.address()))
|
||||
.detail("checkStable", checkStable)
|
||||
.detail("reboots", it.second.reboots)
|
||||
.detail("isAvailable", IFailureMonitor::failureMonitor().getState(it.second.interf.storage.getEndpoint()).isAvailable())
|
||||
.detail("Locality", it.second.interf.locality.toString())
|
||||
.detail("tLogReplicationFactor", conf.tLogReplicationFactor)
|
||||
.detail("tLogPolicy", conf.tLogPolicy ? conf.tLogPolicy->info() : "[unset]")
|
||||
.detail("DesiredLogs", conf.getDesiredLogs());
|
||||
unavailableLocals.push_back(it.second.interf.locality);
|
||||
}
|
||||
}
|
||||
|
||||
results.reserve(results.size() + id_worker.size());
|
||||
|
@ -316,11 +335,15 @@ std::vector<std::pair<WorkerInterface, ProcessClass>> getWorkersForTlogsAcrossDa
|
|||
.detail("Policy", conf.tLogPolicy->info())
|
||||
.detail("Processes", logServerSet->size())
|
||||
.detail("Workers", id_worker.size())
|
||||
.detail("FitnessGroups", fitness_workers.size())
|
||||
.detail("TLogZones", ::describeZones(tLocalities))
|
||||
.detail("TLogDataHalls", ::describeDataHalls(tLocalities))
|
||||
.detail("MissingZones", ::describeZones(unavailableLocals))
|
||||
.detail("MissingDataHalls", ::describeDataHalls(unavailableLocals))
|
||||
.detail("Replication", conf.tLogReplicationFactor)
|
||||
.detail("DesiredLogs", conf.getDesiredLogs())
|
||||
.detail("RatingTests",SERVER_KNOBS->POLICY_RATING_TESTS)
|
||||
.detail("checkStable", checkStable)
|
||||
.detail("PolicyGenerations",SERVER_KNOBS->POLICY_GENERATIONS).backtrace();
|
||||
|
||||
// Free the set
|
||||
|
@ -1242,10 +1265,10 @@ void clusterRegisterMaster( ClusterControllerData* self, RegisterMasterRequest c
|
|||
void registerWorker( RegisterWorkerRequest req, ClusterControllerData *self ) {
|
||||
WorkerInterface w = req.wi;
|
||||
ProcessClass processClass = req.processClass;
|
||||
|
||||
TraceEvent("ClusterControllerActualWorkers", self->id).detail("WorkerID",w.id()).detailext("ProcessID", w.locality.processId()).detailext("ZoneId", w.locality.zoneId()).detailext("DataHall", w.locality.dataHallId()).detail("pClass", req.processClass.toString());
|
||||
auto info = self->id_worker.find( w.locality.processId() );
|
||||
|
||||
TraceEvent("ClusterControllerActualWorkers", self->id).detail("WorkerID",w.id()).detailext("ProcessID", w.locality.processId()).detailext("ZoneId", w.locality.zoneId()).detailext("DataHall", w.locality.dataHallId()).detail("pClass", req.processClass.toString()).detail("Workers", self->id_worker.size()).detail("Registered", (info == self->id_worker.end() ? "False" : "True")).backtrace();
|
||||
|
||||
if( info == self->id_worker.end() ) {
|
||||
auto classIter = self->id_class.find(w.locality.processId());
|
||||
|
||||
|
|
|
@ -248,7 +248,7 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(
|
|||
if(e.code() != error_code_actor_cancelled)
|
||||
printf("SimulatedFDBDTerminated: %s\n", e.what());
|
||||
ASSERT( destructed || g_simulator.getCurrentProcess() == process ); // simulatedFDBD catch called on different process
|
||||
TraceEvent(e.code() == error_code_actor_cancelled || e.code() == error_code_file_not_found || destructed ? SevInfo : SevError, "SimulatedFDBDTerminated").error(e, true);
|
||||
TraceEvent(e.code() == error_code_actor_cancelled || e.code() == error_code_file_not_found || destructed ? SevInfo : SevError, "SimulatedFDBDTerminated", localities.zoneId()).error(e, true);
|
||||
}
|
||||
|
||||
TraceEvent("SimulatedFDBDDone", localities.zoneId()).detail("Cycles", cycles)
|
||||
|
@ -259,7 +259,7 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(
|
|||
if (!onShutdown.isReady())
|
||||
onShutdown = ISimulator::InjectFaults;
|
||||
} catch (Error& e) {
|
||||
TraceEvent(destructed ? SevInfo : SevError, "SimulatedFDBDRebooterError").error(e, true);
|
||||
TraceEvent(destructed ? SevInfo : SevError, "SimulatedFDBDRebooterError", localities.zoneId()).error(e, true);
|
||||
onShutdown = e;
|
||||
}
|
||||
|
||||
|
@ -737,13 +737,14 @@ void setupSimulatedSystem( vector<Future<Void>> *systemActors, std::string baseF
|
|||
TraceEvent("SelectedCoordinator").detail("Address", coordinatorAddresses.back());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
g_random->randomShuffle(coordinatorAddresses);
|
||||
for(int i = 0; i < (coordinatorAddresses.size()/2)+1; i++) {
|
||||
TraceEvent("ProtectMachine").detail("Address", coordinatorAddresses[i]).detail("Coordinators", coordinatorAddresses.size()).backtrace();
|
||||
g_simulator.protectedAddresses.insert(coordinatorAddresses[i]);
|
||||
}
|
||||
g_random->randomShuffle(coordinatorAddresses);
|
||||
|
||||
|
||||
ASSERT( coordinatorAddresses.size() == coordinatorCount );
|
||||
ClusterConnectionString conn(coordinatorAddresses, LiteralStringRef("TestCluster:0"));
|
||||
g_simulator.extraDB = extraDB ? new ClusterConnectionString(coordinatorAddresses, ((extraDB==1 && BUGGIFY) ? LiteralStringRef("TestCluster:0") : LiteralStringRef("ExtraCluster:0"))) : NULL;
|
||||
|
@ -920,7 +921,7 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot
|
|||
state int extraDB = checkExtraDB(testFile);
|
||||
|
||||
Void _ = wait( g_simulator.onProcess( g_simulator.newProcess(
|
||||
"TestSystem", 0x01010101, 1, LocalityData(Optional<Standalone<StringRef>>(), Standalone<StringRef>(g_random->randomUniqueID().toString()), Optional<Standalone<StringRef>>(), Optional<Standalone<StringRef>>()), ProcessClass(), "", "" ), TaskDefaultYield ) );
|
||||
"TestSystem", 0x01010101, 1, LocalityData(Optional<Standalone<StringRef>>(), Standalone<StringRef>(g_random->randomUniqueID().toString()), Optional<Standalone<StringRef>>(), Optional<Standalone<StringRef>>()), ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource), "", "" ), TaskDefaultYield ) );
|
||||
Sim2FileSystem::newFileSystem();
|
||||
FlowTransport::createInstance(1);
|
||||
simInitTLS();
|
||||
|
|
|
@ -37,9 +37,9 @@ struct RemoveServersSafelyWorkload : TestWorkload {
|
|||
double minDelay, maxDelay;
|
||||
double kill1Timeout, kill2Timeout;
|
||||
|
||||
vector<AddressExclusion> toKill1, toKill2;
|
||||
std::map<AddressExclusion, Optional<Standalone<StringRef>>> machine_ids;
|
||||
std::map<AddressExclusion, std::set<AddressExclusion>> machineProcesses;
|
||||
std::set<AddressExclusion> toKill1, toKill2;
|
||||
std::map<AddressExclusion, Optional<Standalone<StringRef>>> machine_ids; // ip -> Locality Zone id
|
||||
std::map<AddressExclusion, std::set<AddressExclusion>> machineProcesses; // ip -> ip:port
|
||||
|
||||
RemoveServersSafelyWorkload( WorkloadContext const& wcx )
|
||||
: TestWorkload(wcx)
|
||||
|
@ -47,6 +47,7 @@ struct RemoveServersSafelyWorkload : TestWorkload {
|
|||
enabled = !clientId && g_network->isSimulated(); // only do this on the "first" client, and only when in simulation
|
||||
minMachinesToKill = getOption( options, LiteralStringRef("minMachinesToKill"), 1 );
|
||||
maxMachinesToKill = getOption( options, LiteralStringRef("maxMachinesToKill"), 10 );
|
||||
maxMachinesToKill = std::max(minMachinesToKill, maxMachinesToKill);
|
||||
minDelay = getOption( options, LiteralStringRef("minDelay"), 0.0 );
|
||||
maxDelay = getOption( options, LiteralStringRef("maxDelay"), 60.0 );
|
||||
kill1Timeout = getOption( options, LiteralStringRef("kill1Timeout"), 60.0 );
|
||||
|
@ -59,90 +60,65 @@ struct RemoveServersSafelyWorkload : TestWorkload {
|
|||
if( !enabled )
|
||||
return Void();
|
||||
|
||||
std::map<Optional<Standalone<StringRef>>, AddressExclusion> machinesMap;
|
||||
std::vector<AddressExclusion> processAddrs;
|
||||
std::map<Optional<Standalone<StringRef>>, AddressExclusion> machinesMap; // Locality Zone Id -> ip address
|
||||
std::vector<AddressExclusion> processAddrs; // IF (killProcesses) THEN ip:port ELSE ip addresses unique list of the machines
|
||||
std::map<uint32_t, Optional<Standalone<StringRef>>> ip_dcid;
|
||||
auto processes = getServers();
|
||||
for(auto& it : processes) {
|
||||
AddressExclusion machineIp(it->address.ip);
|
||||
AddressExclusion pAddr(it->address.ip, it->address.port);
|
||||
|
||||
AddressExclusion addr(it->address.ip);
|
||||
processAddrs.push_back(pAddr);
|
||||
TraceEvent("RemoveAndKill").detail("Step", "listAddresses")
|
||||
.detail("Address", pAddr.toString()).detail("Process",describe(*it));
|
||||
machineProcesses[machineIp].insert(pAddr);
|
||||
|
||||
if (killProcesses){
|
||||
AddressExclusion pAddr(it->address.ip, it->address.port);
|
||||
processAddrs.push_back(pAddr);
|
||||
}
|
||||
else {
|
||||
machineProcesses[AddressExclusion(it->machine->machineProcess->address.ip)].insert(AddressExclusion(it->address.ip, it->address.port));
|
||||
}
|
||||
// add only one entry for each machine
|
||||
if (!machinesMap.count(it->locality.zoneId()))
|
||||
machinesMap[it->locality.zoneId()] = addr;
|
||||
machinesMap[it->locality.zoneId()] = machineIp;
|
||||
|
||||
machine_ids[addr] = it->locality.zoneId();
|
||||
machine_ids[machineIp] = it->locality.zoneId();
|
||||
ip_dcid[it->address.ip] = it->locality.dcId();
|
||||
}
|
||||
|
||||
if (!killProcesses){
|
||||
for (auto m : machinesMap){
|
||||
processAddrs.push_back(m.second);
|
||||
}
|
||||
}
|
||||
|
||||
//data centers can kill too many machines, that when combined with the excluded machines from this workload we cannot chance coordinators
|
||||
if(g_network->isSimulated())
|
||||
g_simulator.killableDatacenters = 0;
|
||||
|
||||
int nToKill1 = processAddrs.size();
|
||||
nToKill1 = g_random->randomInt( std::min(nToKill1,minMachinesToKill), std::min(nToKill1,maxMachinesToKill)+1 );
|
||||
int nToKill2 = std::max<int>(0, machinesMap.size() - g_simulator.killableMachines - std::max(g_simulator.machinesNeededForProgress, g_simulator.desiredCoordinators));
|
||||
nToKill2 = g_random->randomInt( std::min(nToKill2,minMachinesToKill), std::min(nToKill2,maxMachinesToKill)+1 );
|
||||
int processCount = processAddrs.size();
|
||||
int nToKill1 = g_random->randomInt( std::min(processCount,minMachinesToKill), std::min(processCount,maxMachinesToKill)+1 );
|
||||
int nToKill2 = g_random->randomInt( std::min(processCount,minMachinesToKill), std::min(processCount,maxMachinesToKill)+1 );
|
||||
toKill1 = random_subset( processAddrs, nToKill1 );
|
||||
|
||||
loop {
|
||||
toKill2 = random_subset( processAddrs, nToKill2 );
|
||||
std::set<Optional<Standalone<StringRef>>> datacenters;
|
||||
for(auto& addr : processAddrs)
|
||||
if(std::find(toKill2.begin(), toKill2.end(), addr) == toKill2.end())
|
||||
datacenters.insert(ip_dcid[addr.ip]);
|
||||
if(datacenters.size() >= g_simulator.neededDatacenters) {
|
||||
//FIXME: each machine kill could take down a datacenter after exclusion, so for now we need to lower killable machines
|
||||
g_simulator.killableMachines = std::min(g_simulator.killableMachines, std::max<int>(0, datacenters.size() - g_simulator.neededDatacenters));
|
||||
break;
|
||||
}
|
||||
}
|
||||
toKill2 = random_subset( processAddrs, nToKill2 );
|
||||
|
||||
if (!killProcesses) {
|
||||
std::vector<AddressExclusion> processKills;
|
||||
std::set<AddressExclusion> processSet;
|
||||
|
||||
for (auto k1 : toKill1) {
|
||||
ASSERT(machineProcesses.count(k1));
|
||||
AddressExclusion machineIp(k1.ip);
|
||||
ASSERT(machineProcesses.count(machineIp));
|
||||
// kill all processes on this machine even if it has a different ip address
|
||||
processKills.insert(processKills.end(), machineProcesses[k1].begin(), machineProcesses[k1].end());
|
||||
std::copy(machineProcesses[machineIp].begin(), machineProcesses[machineIp].end(), std::inserter(processSet,processSet.end()));
|
||||
}
|
||||
toKill1.insert(processSet.begin(), processSet.end());
|
||||
|
||||
toKill1.insert(toKill1.end(), processKills.begin(), processKills.end());
|
||||
|
||||
processKills.clear();
|
||||
processSet.clear();
|
||||
for (auto k2 : toKill2) {
|
||||
ASSERT(machineProcesses.count(k2));
|
||||
processKills.insert(processKills.end(), machineProcesses[k2].begin(), machineProcesses[k2].end());
|
||||
AddressExclusion machineIp(k2.ip);
|
||||
ASSERT(machineProcesses.count(machineIp));
|
||||
std::copy(machineProcesses[machineIp].begin(), machineProcesses[machineIp].end(), std::inserter(processSet,processSet.end()));
|
||||
}
|
||||
|
||||
toKill2.insert(toKill2.end(), processKills.begin(), processKills.end());
|
||||
toKill2.insert(processSet.begin(), processSet.end());
|
||||
}
|
||||
|
||||
std::vector<NetworkAddress> disableAddrs1;
|
||||
for( AddressExclusion ex : toKill1 ) {
|
||||
AddressExclusion machine(ex.ip);
|
||||
ASSERT(machine_ids.count(machine));
|
||||
g_simulator.disableSwapToMachine(machine_ids[machine]);
|
||||
AddressExclusion machineIp(ex.ip);
|
||||
ASSERT(machine_ids.count(machineIp));
|
||||
g_simulator.disableSwapToMachine(machine_ids[machineIp]);
|
||||
}
|
||||
|
||||
std::vector<NetworkAddress> disableAddrs2;
|
||||
for( AddressExclusion ex : toKill2 ) {
|
||||
AddressExclusion machine(ex.ip);
|
||||
ASSERT(machine_ids.count(machine));
|
||||
g_simulator.disableSwapToMachine(machine_ids[machine]);
|
||||
AddressExclusion machineIp(ex.ip);
|
||||
ASSERT(machine_ids.count(machineIp));
|
||||
g_simulator.disableSwapToMachine(machine_ids[machineIp]);
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
@ -159,34 +135,163 @@ struct RemoveServersSafelyWorkload : TestWorkload {
|
|||
virtual void getMetrics( vector<PerfMetric>& ) {
|
||||
}
|
||||
|
||||
virtual std::set<AddressExclusion> getNetworks(std::vector<ISimulator::ProcessInfo*> const& processes)
|
||||
{
|
||||
std::set<AddressExclusion> processAddrs;
|
||||
|
||||
for (auto& processInfo : processes) {
|
||||
processAddrs.insert(AddressExclusion(processInfo->address.ip, processInfo->address.port));
|
||||
}
|
||||
return processAddrs;
|
||||
}
|
||||
|
||||
virtual std::vector<ISimulator::ProcessInfo*> getProcesses(std::set<AddressExclusion> const& netAddrs)
|
||||
{
|
||||
std::vector<ISimulator::ProcessInfo*> processes;
|
||||
std::set<AddressExclusion> processAddrs;
|
||||
|
||||
// Get the list of process network addresses
|
||||
for (auto& netAddr : netAddrs) {
|
||||
auto machineIpPorts = machineProcesses.find(netAddr);
|
||||
if (machineIpPorts != machineProcesses.end()) {
|
||||
ASSERT(machineIpPorts->second.size());
|
||||
for (auto& processAdd : machineIpPorts->second)
|
||||
processAddrs.insert(processAdd);
|
||||
}
|
||||
else {
|
||||
processAddrs.insert(netAddr);
|
||||
}
|
||||
}
|
||||
// Get the list of processes matching network address
|
||||
for (auto processInfo : g_simulator.getAllProcesses()) {
|
||||
auto processNet = AddressExclusion(processInfo->address.ip, processInfo->address.port);
|
||||
if (processAddrs.find(processNet) != processAddrs.end())
|
||||
processes.push_back(processInfo);
|
||||
}
|
||||
TraceEvent("RemoveAndKill").detail("Step", "getProcesses")
|
||||
.detail("netAddrs",describe(netAddrs)).detail("processAddrs",describe(processAddrs))
|
||||
.detail("Proceses", processes.size()).detail("MachineProcesses", machineProcesses.size());
|
||||
|
||||
// Processes may have been destroyed causing
|
||||
// ASSERT(processAddrs.size() == processes.size());
|
||||
return processes;
|
||||
}
|
||||
|
||||
virtual std::vector<ISimulator::ProcessInfo*> protectServers(std::set<AddressExclusion> const& killAddrs)
|
||||
{
|
||||
std::vector<ISimulator::ProcessInfo*> processes;
|
||||
std::set<AddressExclusion> processAddrs;
|
||||
std::vector<ISimulator::ProcessInfo*> killProcesses, killableProcesses, processesLeft, processesDead;
|
||||
|
||||
// Get the list of processes matching network address
|
||||
for (auto processInfo : getProcesses(killAddrs)) {
|
||||
// Add non-test processes
|
||||
if (processInfo->startingClass != ProcessClass::TesterClass) {
|
||||
if (g_simulator.protectedAddresses.count(processInfo->address))
|
||||
processesLeft.push_back(processInfo);
|
||||
// Add reliable machine processes to available group
|
||||
else if (processInfo->isAvailable())
|
||||
processesLeft.push_back(processInfo);
|
||||
else
|
||||
processesDead.push_back(processInfo);
|
||||
}
|
||||
}
|
||||
killProcesses = processesLeft;
|
||||
|
||||
// Identify the largest set of processes which can be killed
|
||||
int kills, randomIndex;
|
||||
for (int kills = 0; kills < killProcesses.size(); kills ++)
|
||||
{
|
||||
// Select a random kill process
|
||||
randomIndex = g_random->randomInt(0, killProcesses.size());
|
||||
processesDead.push_back(killProcesses[randomIndex]);
|
||||
killProcesses[randomIndex] = killProcesses.back();
|
||||
killProcesses.pop_back();
|
||||
processesLeft.insert(processesLeft.end(), killProcesses.begin(), killProcesses.end());
|
||||
|
||||
if (g_simulator.canKillProcesses(processesLeft, processesLeft, ISimulator::KillInstantly, NULL)) {
|
||||
killableProcesses.push_back(processesDead.back());
|
||||
}
|
||||
else {
|
||||
processesLeft.push_back(processesDead.back());
|
||||
processesDead.pop_back();
|
||||
}
|
||||
// Remove the added processes
|
||||
processesLeft.resize(processesLeft.size() - killProcesses.size());
|
||||
}
|
||||
|
||||
return killableProcesses;
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> workloadMain( RemoveServersSafelyWorkload* self, Database cx, double waitSeconds,
|
||||
vector<AddressExclusion> toKill1, vector<AddressExclusion> toKill2 ) {
|
||||
std::set<AddressExclusion> toKill1, std::set<AddressExclusion> toKill2 ) {
|
||||
Void _ = wait( delay( waitSeconds ) );
|
||||
|
||||
// Removing the first set of machines might legitimately bring the database down, so a timeout is not an error
|
||||
state std::vector<NetworkAddress> firstCoordinators;
|
||||
Void _ = wait( timeout( removeAndKill( self, cx, toKill1, &firstCoordinators ), self->kill1Timeout, Void() ) );
|
||||
state std::vector<ISimulator::ProcessInfo*> killProcesses;
|
||||
|
||||
TraceEvent("RemoveAndKill").detail("Step", "remove first list").detail("toKill1", describe(toKill1)).detail("KillTotal", toKill1.size())
|
||||
.detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
Optional<Void> result = wait( timeout( removeAndKill( self, cx, toKill1, &firstCoordinators ), self->kill1Timeout ) );
|
||||
|
||||
TraceEvent("RemoveAndKill").detail("Step", "remove result").detail("result", result.present() ? "succeeded" : "failed");
|
||||
killProcesses = self->getProcesses(toKill1);
|
||||
TraceEvent("RemoveAndKill").detail("Step", "unexclude first processes").detail("toKill1", describe(toKill1))
|
||||
.detail("KillTotal", toKill1.size()).detail("Processes", killProcesses.size());
|
||||
for (auto& killProcess : killProcesses) {
|
||||
killProcess->excluded = false;
|
||||
}
|
||||
|
||||
// virtual bool killMachine(Optional<Standalone<StringRef>> zoneId, KillType, bool killIsSafe = false, bool forceKill = false ) = 0;
|
||||
// virtual bool canKillProcesses(std::vector<ProcessInfo*> const& availableProcesses, std::vector<ProcessInfo*> const& deadProcesses, KillType kt, KillType* newKillType) = 0;
|
||||
|
||||
// Identify the unique machines within kill list
|
||||
// Remove any machine with a protected process (ie coordinator)
|
||||
// Check if we can remove the machines and still have a healthy cluster
|
||||
// If not, remove one random machine at a time until we have healthy cluster
|
||||
// Mark all of the processes protected, store the list of modified processes
|
||||
// Exclude the machines
|
||||
// Remove the machines Kill instantly
|
||||
// Unprotect the changed processes
|
||||
killProcesses = self->protectServers(toKill2);
|
||||
|
||||
// Update the kill networks to the killable processes
|
||||
toKill2 = self->getNetworks(killProcesses);
|
||||
|
||||
// The second set of machines is selected so that we can always make progress without it, even after the permitted number of other permanent failures
|
||||
// so we expect to succeed after a finite amount of time
|
||||
state Future<Void> disabler = disableConnectionFailuresAfter( self->kill2Timeout/2, "RemoveServersSafely" );
|
||||
Void _ = wait( reportErrors( timeoutError( removeAndKill( self, cx, toKill2, NULL, firstCoordinators, true ), self->kill2Timeout ), "RemoveServersSafelyError", UID() ) );
|
||||
TraceEvent("RemoveAndKill").detail("Step", "remove second list").detail("toKill2", describe(toKill2)).detail("KillTotal", toKill2.size())
|
||||
.detail("Processes", killProcesses.size()).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
Void _ = wait( reportErrors( timeoutError( removeAndKill( self, cx, toKill2, NULL, firstCoordinators), self->kill2Timeout ), "RemoveServersSafelyError", UID() ) );
|
||||
|
||||
killProcesses = self->getProcesses(toKill2);
|
||||
TraceEvent("RemoveAndKill").detail("Step", "unexclude second processes").detail("toKill2", describe(toKill2))
|
||||
.detail("KillTotal", toKill2.size()).detail("Processes", killProcesses.size());
|
||||
for (auto& killProcess : killProcesses) {
|
||||
killProcess->excluded = false;
|
||||
}
|
||||
|
||||
TraceEvent("RemoveAndKill").detail("Step", "completed final remove").detail("KillTotal", toKill2.size()).detail("Excluded", killProcesses.size())
|
||||
.detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> removeAndKill( RemoveServersSafelyWorkload* self, Database cx, vector<AddressExclusion> toKill, std::vector<NetworkAddress>* outSafeCoordinators, std::vector<NetworkAddress> firstCoordinators = std::vector<NetworkAddress>(), bool exitAfterInclude = false ) {
|
||||
ACTOR static Future<Void> removeAndKill( RemoveServersSafelyWorkload* self, Database cx, std::set<AddressExclusion> toKill, std::vector<NetworkAddress>* outSafeCoordinators, std::vector<NetworkAddress> firstCoordinators = std::vector<NetworkAddress>())
|
||||
{
|
||||
// First clear the exclusion list and exclude the given list
|
||||
TraceEvent("RemoveAndKill").detail("Step", "include all").detail("first", describe(firstCoordinators));
|
||||
TraceEvent("RemoveAndKill").detail("Step", "include all").detail("coordinators", describe(firstCoordinators)).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
Void _ = wait( includeServers( cx, vector<AddressExclusion>(1) ) );
|
||||
|
||||
// The actor final boolean argument is a hack to prevent the second part of this function from happening
|
||||
// Fix Me
|
||||
if (exitAfterInclude) return Void();
|
||||
TraceEvent("RemoveAndKill").detail("Step", "getting coordinators").detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
|
||||
std::vector<NetworkAddress> coordinators = wait( getCoordinators(cx) );
|
||||
state std::vector<NetworkAddress> safeCoordinators;
|
||||
|
||||
TraceEvent("RemoveAndKill").detail("Step", "got coordinators").detail("coordinators", describe(coordinators));
|
||||
|
||||
ASSERT(coordinators.size() > (g_simulator.desiredCoordinators-1)/2);
|
||||
|
||||
if(firstCoordinators.size()) {
|
||||
|
@ -208,12 +313,14 @@ struct RemoveServersSafelyWorkload : TestWorkload {
|
|||
auto addr = firstCoordinators[idx];
|
||||
|
||||
int removedCount = 0;
|
||||
for(int i = 0; i < toKill.size(); i++) {
|
||||
if(self->killContainsProcess(toKill[i], firstCoordinators[idx])) {
|
||||
std::swap(toKill[i--], toKill.back());
|
||||
toKill.pop_back();
|
||||
for (auto it = toKill.begin(); it != toKill.end(); ) {
|
||||
if(self->killContainsProcess(*it, firstCoordinators[idx])) {
|
||||
toKill.erase(it++);
|
||||
removedCount++;
|
||||
}
|
||||
else {
|
||||
it ++;
|
||||
}
|
||||
}
|
||||
if(removedCount >= 1) {
|
||||
firstSafeCount++;
|
||||
|
@ -241,14 +348,18 @@ struct RemoveServersSafelyWorkload : TestWorkload {
|
|||
auto addr = coordinators[idx];
|
||||
|
||||
int removedCount = 0;
|
||||
for(int i = 0; i < toKill.size(); i++) {
|
||||
if(self->killContainsProcess(toKill[i], coordinators[idx])) {
|
||||
std::swap(toKill[i--], toKill.back());
|
||||
toKill.pop_back();
|
||||
for (auto it = toKill.begin(); it != toKill.end(); ) {
|
||||
if(self->killContainsProcess(*it, coordinators[idx])) {
|
||||
toKill.erase(it++);
|
||||
removedCount++;
|
||||
}
|
||||
else {
|
||||
it ++;
|
||||
}
|
||||
}
|
||||
if(removedCount >= 1) {
|
||||
if (removedCount >= 1) {
|
||||
TraceEvent("ProtectMachine").detail("Address", addr).detail("Coordinators", coordinators.size()).backtrace();
|
||||
g_simulator.protectedAddresses.insert(addr);
|
||||
safeCoordinators.push_back(addr);
|
||||
safeCount++;
|
||||
}
|
||||
|
@ -257,15 +368,33 @@ struct RemoveServersSafelyWorkload : TestWorkload {
|
|||
ASSERT(idx != startIndex || safeCount > (g_simulator.desiredCoordinators-1)/2);
|
||||
}
|
||||
|
||||
TraceEvent("RemoveAndKill").detail("Step", "exclude").detail("Servers", describe(toKill)).detail("SafeCount", safeCount);
|
||||
Void _ = wait( excludeServers( cx, toKill ) );
|
||||
// The actor final boolean argument is a hack to prevent the second part of this function from happening
|
||||
// Fix Me
|
||||
NOT_IN_CLEAN;
|
||||
|
||||
state std::vector<ISimulator::ProcessInfo*> killProcesses;
|
||||
|
||||
killProcesses = self->getProcesses(toKill);
|
||||
TraceEvent("RemoveAndKill").detail("Step", "exclude").detail("Addresses", describe(toKill))
|
||||
.detail("AddressTotal", toKill.size()).detail("Processes", killProcesses.size())
|
||||
.detail("SafeCount", safeCount).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
// ASSERT(toKill.size() == killProcesses.size());
|
||||
for (auto& killProcess : killProcesses) {
|
||||
killProcess->excluded = true;
|
||||
TraceEvent("RemoveAndKill").detail("Step", "exclude process").detail("killProcess", describe(*killProcess));
|
||||
}
|
||||
|
||||
state std::vector<AddressExclusion> toKillArray;
|
||||
std::copy(toKill.begin(), toKill.end(), std::back_inserter(toKillArray));
|
||||
|
||||
Void _ = wait( excludeServers( cx, toKillArray ) );
|
||||
|
||||
// We need to skip at least the quorum change if there's nothing to kill, because there might not be enough servers left
|
||||
// alive to do a coordinators auto (?)
|
||||
if (toKill.size()) {
|
||||
// Wait for removal to be safe
|
||||
TraceEvent("RemoveAndKill").detail("Step", "wait").detail("Servers", describe(toKill));
|
||||
Void _ = wait( waitForExcludedServers( cx, toKill ) );
|
||||
TraceEvent("RemoveAndKill").detail("Step", "wait").detail("Addresses", describe(toKill));
|
||||
Void _ = wait( waitForExcludedServers( cx, toKillArray ) );
|
||||
|
||||
// Change coordinators if necessary
|
||||
TraceEvent("RemoveAndKill").detail("Step", "coordinators auto");
|
||||
|
@ -281,25 +410,40 @@ struct RemoveServersSafelyWorkload : TestWorkload {
|
|||
break;
|
||||
}
|
||||
|
||||
TraceEvent("RemoveAndKill").detail("Step", "unexclude before clear").detail("Processes", killProcesses.size());
|
||||
for (auto& killProcess : killProcesses) {
|
||||
killProcess->excluded = false;
|
||||
}
|
||||
|
||||
// Reboot and delete the servers
|
||||
TraceEvent("RemoveAndKill").detail("Step", "RebootAndDelete").detail("Servers", describe(toKill));
|
||||
for(auto a = toKill.begin(); a != toKill.end(); ++a) {
|
||||
if( self->killProcesses ) {
|
||||
TraceEvent("RemoveAndKill").detail("Step", "Kill Process").detail("Process", describe(*a));
|
||||
g_simulator.rebootProcess( g_simulator.getProcessByAddress(NetworkAddress(a->ip, a->port, true, false)), ISimulator::RebootProcessAndDelete );
|
||||
if( self->killProcesses ) {
|
||||
TraceEvent("RemoveAndKill").detail("Step", "ClearProcesses").detail("Addresses", describe(toKill))
|
||||
.detail("Processes", killProcesses.size()).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
for (auto& killProcess : killProcesses) {
|
||||
TraceEvent("RemoveAndKill").detail("Step", "Clear Process").detail("Process", describe(*killProcess)).detail("ClusterAvailable", g_simulator.isAvailable()).detail("Protected", g_simulator.protectedAddresses.count(killProcess->address) == 0);
|
||||
g_simulator.rebootProcess( killProcess, ISimulator::RebootProcessAndDelete );
|
||||
}
|
||||
else {
|
||||
TraceEvent("RemoveAndKill").detail("Step", "Kill Machine").detail("MachineAddr", describe(*a));
|
||||
g_simulator.killMachine( self->machine_ids[ *a ], ISimulator::RebootAndDelete, true );
|
||||
}
|
||||
else {
|
||||
std::set<Optional<Standalone<StringRef>>> zoneIds;
|
||||
bool killedMachine;
|
||||
for (auto& killProcess : killProcesses) {
|
||||
zoneIds.insert(killProcess->locality.zoneId());
|
||||
}
|
||||
TraceEvent("RemoveAndKill").detail("Step", "ClearMachines").detail("Addresses", describe(toKill)).detail("Processes", killProcesses.size()).detail("Zones", zoneIds.size()).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
for (auto& zoneId : zoneIds) {
|
||||
killedMachine = g_simulator.killMachine( zoneId, ISimulator::RebootAndDelete, true );
|
||||
TraceEvent(killedMachine ? SevInfo : SevWarn, "RemoveAndKill").detail("Step", "Clear Machine").detailext("ZoneId", zoneId).detail("Cleared", killedMachine).detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
TraceEvent("RemoveAndKill").detail("Step", "nothing to kill");
|
||||
TraceEvent("RemoveAndKill").detail("Step", "nothing to clear");
|
||||
}
|
||||
|
||||
TraceEvent("RemoveAndKill").detail("Step", "done");
|
||||
TraceEvent("RemoveAndKill").detail("Step", "done")
|
||||
.detail("ClusterAvailable", g_simulator.isAvailable());
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -308,20 +452,22 @@ struct RemoveServersSafelyWorkload : TestWorkload {
|
|||
vector<ISimulator::ProcessInfo*> machines;
|
||||
vector<ISimulator::ProcessInfo*> all = g_simulator.getAllProcesses();
|
||||
for(int i = 0; i < all.size(); i++)
|
||||
if( !all[i]->failed && all[i]->name == std::string("Server") && all[i]->startingClass != ProcessClass::TesterClass )
|
||||
if (all[i]->name == std::string("Server") && all[i]->startingClass != ProcessClass::TesterClass )
|
||||
machines.push_back( all[i] );
|
||||
return machines;
|
||||
}
|
||||
|
||||
template <class T> static vector<T> random_subset( vector<T> v, int n ) {
|
||||
template <class T> static std::set<T> random_subset( std::vector<T> v, int n ) {
|
||||
std::set<T> subset;
|
||||
// No, this isn't efficient!
|
||||
g_random->randomShuffle(v);
|
||||
v.resize(n);
|
||||
return v;
|
||||
std::copy(v.begin(), v.end(), std::inserter(subset,subset.end()));
|
||||
return subset;
|
||||
}
|
||||
|
||||
bool killContainsProcess(AddressExclusion kill, NetworkAddress process) {
|
||||
return kill.excludes(process) || machineProcesses[kill].count(AddressExclusion(process.ip, process.port)) > 0;
|
||||
return kill.excludes(process) || (machineProcesses.find(kill) != machineProcesses.end() && machineProcesses[kill].count(AddressExclusion(process.ip, process.port)) > 0);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue