removed coordinator check and added pre-processing of workers rather than checking each cycle

This commit is contained in:
Jon Fu 2019-10-23 10:37:38 -07:00
parent 6583c499f8
commit 47dc0ee25c
1 changed files with 13 additions and 32 deletions

View File

@ -141,14 +141,8 @@ struct MachineAttritionWorkload : TestWorkload {
virtual void getMetrics( vector<PerfMetric>& m ) {
}
static bool noSimIsViableKill(int coordFaultTolerance, int& killedCoord, std::vector<NetworkAddress> coordAddrs, WorkerDetails worker) {
static bool noSimIsViableKill(WorkerDetails worker) {
if (worker.processClass == ProcessClass::ClassType::TesterClass) return false;
bool isCoord = (std::find(coordAddrs.begin(), coordAddrs.end(), worker.interf.address()) != coordAddrs.end());
if (isCoord && coordFaultTolerance > killedCoord) {
killedCoord++;
} else if (isCoord) {
return false;
}
return true;
}
@ -156,9 +150,8 @@ struct MachineAttritionWorkload : TestWorkload {
ASSERT(!g_network->isSimulated());
state int killedMachines = 0;
state double delayBeforeKill = deterministicRandom()->random01() * meanDelay;
state std::vector<WorkerDetails> workers =
state std::vector<WorkerDetails> allWorkers =
wait(self->dbInfo->get().clusterInterface.getWorkers.getReply(GetWorkersRequest()));
deterministicRandom()->randomShuffle(workers);
// Can reuse reboot request to send to each interface since no reply promise needed
state RebootRequest rbReq;
if (self->reboot) {
@ -166,32 +159,22 @@ struct MachineAttritionWorkload : TestWorkload {
} else {
rbReq.waitForDuration = std::numeric_limits<uint32_t>::max();
}
// keep track of coordinator fault tolerance and make sure we don't go over
state ClientCoordinators coords(cx->getConnectionFile());
state std::vector<Future<Optional<LeaderInfo>>> leaderServers;
state std::vector<NetworkAddress> coordAddrs;
for (const auto& cls : coords.clientLeaderServers) {
leaderServers.push_back(retryBrokenPromise(cls.getLeader, GetLeaderRequest(coords.clusterKey, UID()), TaskPriority::CoordinationReply));
coordAddrs.push_back(cls.getLeader.getEndpoint().getPrimaryAddress());
}
wait(smartQuorum(leaderServers, leaderServers.size() / 2 + 1, 1.0));
int coordUnavailable = 0;
for (const auto& leaderServer : leaderServers) {
if (!leaderServer.isReady()) {
coordUnavailable++;
state std::vector<WorkerDetails> workers;
// Pre-processing step: remove all testers from list of workers
for (const auto& worker : allWorkers) {
if (noSimIsViableKill(worker)) {
workers.push_back(worker);
}
}
state int coordFaultTolerance = (leaderServers.size() - 1) / 2 - coordUnavailable;
state int killedCoord = 0;
if (self->killDc) {
wait(delay(delayBeforeKill));
// Pick a dcId to kill
deterministicRandom()->randomShuffle(workers);
Optional<Standalone<StringRef>> killDcId = workers.back().interf.locality.dcId();
TraceEvent("Assassination").detail("TargetDataCenter", killDcId);
for (const auto& worker : workers) {
// kill all matching dcId workers, except testers. Also preserve a majority of coordinators
if (worker.interf.locality.dcId().present() && worker.interf.locality.dcId() == killDcId &&
noSimIsViableKill(coordFaultTolerance, killedCoord, coordAddrs, worker)) {
// kill all matching dcId workers
if (worker.interf.locality.dcId().present() && worker.interf.locality.dcId() == killDcId) {
worker.interf.clientInterface.reboot.send(rbReq);
}
}
@ -217,11 +200,9 @@ struct MachineAttritionWorkload : TestWorkload {
}
}
}
// Pick a machine to kill, ignoring testers and preserving majority of coordinators
// Pick a machine to kill
state WorkerDetails targetMachine;
while (!noSimIsViableKill(coordFaultTolerance, killedCoord, coordAddrs, workers.back())) {
deterministicRandom()->randomShuffle(workers);
}
deterministicRandom()->randomShuffle(workers);
targetMachine = workers.back();
TraceEvent("Assassination")
.detail("TargetMachine", targetMachine.interf.locality.toString())
@ -229,7 +210,7 @@ struct MachineAttritionWorkload : TestWorkload {
.detail("KilledMachines", killedMachines)
.detail("MachinesToKill", self->machinesToKill)
.detail("MachinesToLeave", self->machinesToLeave)
.detail("Machines", self->machines.size());
.detail("Machines", workers.size());
targetMachine.interf.clientInterface.reboot.send(rbReq);
killedMachines++;
workers.pop_back();