Added support for optionally re-including excluded servers

Removed unneeded code to protect coordinators
Ensured that simulation exclusion list is updated for excluded processes
This commit is contained in:
Alvin Moore 2017-06-19 16:51:07 -07:00
parent 9553458b78
commit 96f40d8eb0
1 changed files with 112 additions and 171 deletions

View File

@ -26,6 +26,9 @@
#include "fdbrpc/simulator.h"
#include "fdbclient/ManagementAPI.h"
const char* removeClearEnv = getenv("REMOVE_CLEAR");
int removeClear = removeClearEnv ? atoi(removeClearEnv) : 1;
template <>
std::string describe( uint32_t const& item ) {
return format("%d", item);
@ -68,9 +71,11 @@ struct RemoveServersSafelyWorkload : TestWorkload {
AddressExclusion machineIp(it->address.ip);
AddressExclusion pAddr(it->address.ip, it->address.port);
processAddrs.push_back(pAddr);
TraceEvent("RemoveAndKill").detail("Step", "listAddresses")
.detail("Address", pAddr.toString()).detail("Process",describe(*it));
if (g_simulator.protectedAddresses.count(it->address) == 0)
processAddrs.push_back(pAddr);
machineProcesses[machineIp].insert(pAddr);
// add only one entry for each machine
@ -181,43 +186,56 @@ struct RemoveServersSafelyWorkload : TestWorkload {
{
std::vector<ISimulator::ProcessInfo*> processes;
std::set<AddressExclusion> processAddrs;
std::vector<AddressExclusion> killableAddrs;
std::vector<ISimulator::ProcessInfo*> killProcesses, killableProcesses, processesLeft, processesDead;
// Get the list of processes matching network address
for (auto processInfo : getProcesses(killAddrs)) {
// Add non-test processes
if (processInfo->startingClass != ProcessClass::TesterClass) {
if (g_simulator.protectedAddresses.count(processInfo->address))
processesLeft.push_back(processInfo);
// Add reliable machine processes to available group
else if (processInfo->isAvailable())
processesLeft.push_back(processInfo);
else
processesDead.push_back(processInfo);
}
for (auto processInfo : getServers()) {
auto processNet = AddressExclusion(processInfo->address.ip, processInfo->address.port);
// Mark all of the unavailable as dead
if (!processInfo->isAvailable())
processesDead.push_back(processInfo);
// Save all processes not specified within set
else if (killAddrs.find(processNet) == killAddrs.end())
processesLeft.push_back(processInfo);
else
killProcesses.push_back(processInfo);
}
killProcesses = processesLeft;
// Identify the largest set of processes which can be killed
int kills, randomIndex;
for (int kills = 0; kills < killProcesses.size(); kills ++)
int randomIndex;
bool bCanKillProcess;
ISimulator::ProcessInfo* randomProcess;
auto deadProcess = processesDead.back();
for (int killsLeft = killProcesses.size(); killsLeft > 0; killsLeft --)
{
// Select a random kill process
randomIndex = g_random->randomInt(0, killProcesses.size());
processesDead.push_back(killProcesses[randomIndex]);
randomIndex = g_random->randomInt(0, killsLeft);
randomProcess = killProcesses[randomIndex];
processesDead.push_back(randomProcess);
killProcesses[randomIndex] = killProcesses.back();
killProcesses.pop_back();
// Add all of the remaining processes the leftover array
processesLeft.insert(processesLeft.end(), killProcesses.begin(), killProcesses.end());
if (g_simulator.canKillProcesses(processesLeft, processesLeft, ISimulator::KillInstantly, NULL)) {
killableProcesses.push_back(processesDead.back());
}
else {
processesLeft.push_back(processesDead.back());
processesDead.pop_back();
}
// Check if we can kill the added process
bCanKillProcess = g_simulator.canKillProcesses(processesLeft, processesDead, ISimulator::KillInstantly, NULL);
// Remove the added processes
processesLeft.resize(processesLeft.size() - killProcesses.size());
if (bCanKillProcess) {
killableProcesses.push_back(randomProcess);
killableAddrs.push_back(AddressExclusion(randomProcess->address.ip, randomProcess->address.port));
TraceEvent("RemoveAndKill").detail("Step", "identifyVictim")
.detail("VictimCount", killableAddrs.size()).detail("Victim",randomProcess->toString())
.detail("Victims", describe(killableAddrs));
}
// Move the process to the keep array
else {
processesLeft.push_back(randomProcess);
processesDead.pop_back();
}
}
return killableProcesses;
@ -231,193 +249,117 @@ struct RemoveServersSafelyWorkload : TestWorkload {
state std::vector<NetworkAddress> firstCoordinators;
state std::vector<ISimulator::ProcessInfo*> killProcesses;
TraceEvent("RemoveAndKill").detail("Step", "remove first list").detail("toKill1", describe(toKill1)).detail("KillTotal", toKill1.size())
TraceEvent("RemoveAndKill").detail("Step", "exclude first list").detail("toKill1", describe(toKill1)).detail("KillTotal", toKill1.size())
.detail("ClusterAvailable", g_simulator.isAvailable());
Optional<Void> result = wait( timeout( removeAndKill( self, cx, toKill1, &firstCoordinators ), self->kill1Timeout ) );
TraceEvent("RemoveAndKill").detail("Step", "remove result").detail("result", result.present() ? "succeeded" : "failed");
killProcesses = self->getProcesses(toKill1);
TraceEvent("RemoveAndKill").detail("Step", "mark first processes excluded").detail("Addresses", describe(toKill1))
.detail("AddressTotal", toKill1.size()).detail("Processes", killProcesses.size())
.detail("ClusterAvailable", g_simulator.isAvailable());
for (auto& killProcess : killProcesses) {
killProcess->excluded = true;
g_simulator.excludeAddress(killProcess->address);
TraceEvent("RemoveAndKill").detail("Step", "MarkProcessFirst").detail("Process", describe(*killProcess));
}
Optional<Void> result = wait( timeout( removeAndKill( self, cx, toKill1), self->kill1Timeout ) );
TraceEvent("RemoveAndKill").detail("Step", "first exclusion result").detail("result", result.present() ? "succeeded" : "failed");
killProcesses = self->getProcesses(toKill1);
TraceEvent("RemoveAndKill").detail("Step", "unexclude first processes").detail("toKill1", describe(toKill1))
TraceEvent("RemoveAndKill").detail("Step", "include first processes").detail("toKill1", describe(toKill1))
.detail("KillTotal", toKill1.size()).detail("Processes", killProcesses.size());
for (auto& killProcess : killProcesses) {
g_simulator.includeAddress(killProcess->address);
killProcess->excluded = false;
}
// virtual bool killMachine(Optional<Standalone<StringRef>> zoneId, KillType, bool killIsSafe = false, bool forceKill = false ) = 0;
// virtual bool canKillProcesses(std::vector<ProcessInfo*> const& availableProcesses, std::vector<ProcessInfo*> const& deadProcesses, KillType kt, KillType* newKillType) = 0;
// Identify the unique machines within kill list
// Remove any machine with a protected process (ie coordinator)
// Check if we can remove the machines and still have a healthy cluster
// If not, remove one random machine at a time until we have healthy cluster
// Mark all of the processes protected, store the list of modified processes
// Exclude the machines
// Remove the machines Kill instantly
// Unprotect the changed processes
killProcesses = self->protectServers(toKill2);
// Update the kill networks to the killable processes
toKill2 = self->getNetworks(killProcesses);
TraceEvent("RemoveAndKill").detail("Step", "Mark second processes excluded").detail("toKill2", describe(toKill2))
.detail("KillTotal", toKill2.size()).detail("Processes", killProcesses.size());
for (auto& killProcess : killProcesses) {
killProcess->excluded = true;
g_simulator.excludeAddress(killProcess->address);
TraceEvent("RemoveAndKill").detail("Step", "MarkProcessSecond").detail("Processes", killProcesses.size()).detail("Process", describe(*killProcess));
}
// The second set of machines is selected so that we can always make progress without it, even after the permitted number of other permanent failures
// so we expect to succeed after a finite amount of time
state Future<Void> disabler = disableConnectionFailuresAfter( self->kill2Timeout/2, "RemoveServersSafely" );
TraceEvent("RemoveAndKill").detail("Step", "remove second list").detail("toKill2", describe(toKill2)).detail("KillTotal", toKill2.size())
TraceEvent("RemoveAndKill").detail("Step", "exclude second list").detail("toKill2", describe(toKill2)).detail("KillTotal", toKill2.size())
.detail("Processes", killProcesses.size()).detail("ClusterAvailable", g_simulator.isAvailable());
Void _ = wait( reportErrors( timeoutError( removeAndKill( self, cx, toKill2, NULL, firstCoordinators), self->kill2Timeout ), "RemoveServersSafelyError", UID() ) );
Void _ = wait( reportErrors( timeoutError( removeAndKill( self, cx, toKill2), self->kill2Timeout ), "RemoveServersSafelyError", UID() ) );
killProcesses = self->getProcesses(toKill2);
TraceEvent("RemoveAndKill").detail("Step", "unexclude second processes").detail("toKill2", describe(toKill2))
.detail("KillTotal", toKill2.size()).detail("Processes", killProcesses.size());
for (auto& killProcess : killProcesses) {
killProcess->excluded = false;
}
TraceEvent("RemoveAndKill").detail("Step", "completed final remove").detail("KillTotal", toKill2.size()).detail("Excluded", killProcesses.size())
TraceEvent("RemoveAndKill").detail("Step", "excluded second list").detail("KillTotal", toKill2.size()).detail("Excluded", killProcesses.size())
.detail("ClusterAvailable", g_simulator.isAvailable());
// Reinclude all of the machine, if buggified
if (BUGGIFY) {
TraceEvent("RemoveAndKill").detail("Step", "final include all").detail("ClusterAvailable", g_simulator.isAvailable());
Void _ = wait( includeServers( cx, vector<AddressExclusion>(1) ) );
for (auto& killProcess : killProcesses) {
g_simulator.includeAddress(killProcess->address);
killProcess->excluded = false;
}
TraceEvent("RemoveAndKill").detail("Step", "final included all").detail("ClusterAvailable", g_simulator.isAvailable());
}
return Void();
}
ACTOR static Future<Void> removeAndKill( RemoveServersSafelyWorkload* self, Database cx, std::set<AddressExclusion> toKill, std::vector<NetworkAddress>* outSafeCoordinators, std::vector<NetworkAddress> firstCoordinators = std::vector<NetworkAddress>())
ACTOR static Future<Void> removeAndKill( RemoveServersSafelyWorkload* self, Database cx, std::set<AddressExclusion> toKill)
{
// First clear the exclusion list and exclude the given list
TraceEvent("RemoveAndKill").detail("Step", "include all").detail("coordinators", describe(firstCoordinators)).detail("ClusterAvailable", g_simulator.isAvailable());
TraceEvent("RemoveAndKill").detail("Step", "include all").detail("ClusterAvailable", g_simulator.isAvailable());
Void _ = wait( includeServers( cx, vector<AddressExclusion>(1) ) );
TraceEvent("RemoveAndKill").detail("Step", "getting coordinators").detail("ClusterAvailable", g_simulator.isAvailable());
std::vector<NetworkAddress> coordinators = wait( getCoordinators(cx) );
state std::vector<NetworkAddress> safeCoordinators;
TraceEvent("RemoveAndKill").detail("Step", "got coordinators").detail("coordinators", describe(coordinators));
ASSERT(coordinators.size() > (g_simulator.desiredCoordinators-1)/2);
if(firstCoordinators.size()) {
ASSERT(firstCoordinators.size() > (g_simulator.desiredCoordinators-1)/2);
//check that at least one coordinator from the first set is not excluded.
int firstSafeCount = 0;
for( auto it : firstCoordinators ) {
RemoveServersSafelyWorkload *tSelf = self;
if(std::none_of(toKill.begin(), toKill.end(), [tSelf, it](AddressExclusion exclusion){ return tSelf->killContainsProcess(exclusion, it); })) {
++firstSafeCount;
}
}
int idx = g_random->randomInt(0, firstCoordinators.size());
int startIndex = idx;
while(firstSafeCount <= (g_simulator.desiredCoordinators-1)/2 ) {
//remove a random coordinator from the kill list
auto addr = firstCoordinators[idx];
int removedCount = 0;
for (auto it = toKill.begin(); it != toKill.end(); ) {
if(self->killContainsProcess(*it, firstCoordinators[idx])) {
toKill.erase(it++);
removedCount++;
}
else {
it ++;
}
}
if(removedCount >= 1) {
firstSafeCount++;
}
idx = (idx + 1) % firstCoordinators.size();
ASSERT(idx != startIndex || firstSafeCount > (g_simulator.desiredCoordinators-1)/2);
}
}
//check that at least one coordinator is not excluded.
int safeCount = 0;
for( auto it : coordinators ) {
RemoveServersSafelyWorkload *tSelf = self;
if(std::none_of(toKill.begin(), toKill.end(), [tSelf, it](AddressExclusion exclusion){ return tSelf->killContainsProcess(exclusion, it); })) {
safeCoordinators.push_back(it);
++safeCount;
}
}
int idx = g_random->randomInt(0, coordinators.size());
int startIndex = idx;
while(safeCount <= (g_simulator.desiredCoordinators-1)/2) {
//remove a random coordinator from the kill list
auto addr = coordinators[idx];
int removedCount = 0;
for (auto it = toKill.begin(); it != toKill.end(); ) {
if(self->killContainsProcess(*it, coordinators[idx])) {
toKill.erase(it++);
removedCount++;
}
else {
it ++;
}
}
if (removedCount >= 1) {
TraceEvent("ProtectMachine").detail("Address", addr).detail("Coordinators", coordinators.size()).backtrace();
g_simulator.protectedAddresses.insert(NetworkAddress(addr.ip,addr.port,true,false));
safeCoordinators.push_back(addr);
safeCount++;
}
idx = (idx + 1) % coordinators.size();
ASSERT(idx != startIndex || safeCount > (g_simulator.desiredCoordinators-1)/2);
}
TraceEvent("RemoveAndKill").detail("Step", "included all").detail("ClusterAvailable", g_simulator.isAvailable());
state std::vector<ISimulator::ProcessInfo*> killProcesses;
killProcesses = self->getProcesses(toKill);
TraceEvent("RemoveAndKill").detail("Step", "exclude").detail("Addresses", describe(toKill))
.detail("AddressTotal", toKill.size()).detail("Processes", killProcesses.size())
.detail("SafeCount", safeCount).detail("ClusterAvailable", g_simulator.isAvailable());
// ASSERT(toKill.size() == killProcesses.size());
for (auto& killProcess : killProcesses) {
killProcess->excluded = true;
TraceEvent("RemoveAndKill").detail("Step", "exclude process").detail("killProcess", describe(*killProcess));
}
state std::vector<AddressExclusion> toKillArray;
std::copy(toKill.begin(), toKill.end(), std::back_inserter(toKillArray));
std::copy(toKill.begin(), toKill.end(), std::back_inserter(toKillArray));
killProcesses = self->getProcesses(toKill);
TraceEvent("RemoveAndKill").detail("Step", "Activate Server Exclusion").detail("toKill", describe(toKill)).detail("Addresses", describe(toKillArray)).detail("ClusterAvailable", g_simulator.isAvailable());
Void _ = wait( excludeServers( cx, toKillArray ) );
// We need to skip at least the quorum change if there's nothing to kill, because there might not be enough servers left
// alive to do a coordinators auto (?)
if (toKill.size()) {
// Wait for removal to be safe
TraceEvent("RemoveAndKill").detail("Step", "wait").detail("Addresses", describe(toKill));
TraceEvent("RemoveAndKill").detail("Step", "Wait For Server Exclusion").detail("Addresses", describe(toKill)).detail("ClusterAvailable", g_simulator.isAvailable());
Void _ = wait( waitForExcludedServers( cx, toKillArray ) );
// Change coordinators if necessary
TraceEvent("RemoveAndKill").detail("Step", "coordinators auto");
if(outSafeCoordinators != NULL) {
for(auto it : safeCoordinators) {
outSafeCoordinators->push_back(it);
}
}
TraceEvent("RemoveAndKill").detail("Step", "coordinators auto").detail("desiredCoordinators", g_simulator.desiredCoordinators).detail("ClusterAvailable", g_simulator.isAvailable());
// Setup the coordinators BEFORE the exclusion
// Otherwise, we may end up with NotEnoughMachinesForCoordinators
state int cycle=0;
state int nQuorum;
while (true) {
CoordinatorsResult::Type result = wait( changeQuorum( cx, autoQuorumChange(((g_simulator.desiredCoordinators+1)/2)*2-1) ) );
TraceEvent(result==CoordinatorsResult::SUCCESS || result==CoordinatorsResult::SAME_NETWORK_ADDRESSES ? SevInfo : SevWarn, "RemoveAndKillQuorumChangeResult").detail("Step", "coordinators auto").detail("Result", (int)result);
cycle ++;
nQuorum = ((g_simulator.desiredCoordinators+1)/2)*2-1;
CoordinatorsResult::Type result = wait( changeQuorum( cx, autoQuorumChange(nQuorum) ) );
TraceEvent(result==CoordinatorsResult::SUCCESS || result==CoordinatorsResult::SAME_NETWORK_ADDRESSES ? SevInfo : SevWarn, "RemoveAndKillQuorumChangeResult").detail("Step", "coordinators auto").detail("Result", (int)result).detail("attempt", cycle).detail("Quorum", nQuorum).detail("desiredCoordinators", g_simulator.desiredCoordinators);
if (result==CoordinatorsResult::SUCCESS || result==CoordinatorsResult::SAME_NETWORK_ADDRESSES)
break;
}
TraceEvent("RemoveAndKill").detail("Step", "unexclude before clear").detail("Processes", killProcesses.size());
for (auto& killProcess : killProcesses) {
killProcess->excluded = false;
}
// Reboot and delete the servers
// Reboot and delete or kill the servers
if( self->killProcesses ) {
TraceEvent("RemoveAndKill").detail("Step", "ClearProcesses").detail("Addresses", describe(toKill))
TraceEvent("RemoveAndKill").detail("Step", removeClear ? "ClearProcesses" : "KillProcesses").detail("Addresses", describe(toKill))
.detail("Processes", killProcesses.size()).detail("ClusterAvailable", g_simulator.isAvailable());
for (auto& killProcess : killProcesses) {
TraceEvent("RemoveAndKill").detail("Step", "Clear Process").detail("Process", describe(*killProcess)).detail("ClusterAvailable", g_simulator.isAvailable()).detail("Protected", g_simulator.protectedAddresses.count(killProcess->address) == 0);
g_simulator.rebootProcess( killProcess, ISimulator::RebootProcessAndDelete );
TraceEvent("RemoveAndKill").detail("Step", removeClear ? "Clear Process" : "Kill Process").detail("Process", describe(*killProcess)).detail("ClusterAvailable", g_simulator.isAvailable()).detail("Protected", g_simulator.protectedAddresses.count(killProcess->address));
// ASSERT(g_simulator.protectedAddresses.count(killProcess->address) == 0);
if (removeClear)
g_simulator.rebootProcess( killProcess, ISimulator::RebootProcessAndDelete);
else
g_simulator.killProcess( killProcess, ISimulator::KillInstantly );
}
}
else {
@ -426,20 +368,19 @@ struct RemoveServersSafelyWorkload : TestWorkload {
for (auto& killProcess : killProcesses) {
zoneIds.insert(killProcess->locality.zoneId());
}
TraceEvent("RemoveAndKill").detail("Step", "ClearMachines").detail("Addresses", describe(toKill)).detail("Processes", killProcesses.size()).detail("Zones", zoneIds.size()).detail("ClusterAvailable", g_simulator.isAvailable());
TraceEvent("RemoveAndKill").detail("Step", removeClear ? "ClearMachines" : "KillMachines").detail("Addresses", describe(toKill)).detail("Processes", killProcesses.size()).detail("Zones", zoneIds.size()).detail("ClusterAvailable", g_simulator.isAvailable());
for (auto& zoneId : zoneIds) {
killedMachine = g_simulator.killMachine( zoneId, ISimulator::RebootAndDelete, true );
TraceEvent(killedMachine ? SevInfo : SevWarn, "RemoveAndKill").detail("Step", "Clear Machine").detailext("ZoneId", zoneId).detail("Cleared", killedMachine).detail("ClusterAvailable", g_simulator.isAvailable());
killedMachine = g_simulator.killMachine( zoneId, removeClear ? ISimulator::RebootAndDelete : ISimulator::KillInstantly, removeClear ? true : false );
TraceEvent(killedMachine ? SevInfo : SevWarn, "RemoveAndKill").detail("Step", removeClear ? "Clear Machine" : "Kill Machine").detailext("ZoneId", zoneId).detail(removeClear ? "Cleared" : "Killed", killedMachine).detail("ClusterAvailable", g_simulator.isAvailable());
}
}
}
else
{
TraceEvent("RemoveAndKill").detail("Step", "nothing to clear");
TraceEvent("RemoveAndKill").detail("Step", "nothing to clear").detail("ClusterAvailable", g_simulator.isAvailable());
}
TraceEvent("RemoveAndKill").detail("Step", "done")
.detail("ClusterAvailable", g_simulator.isAvailable());
TraceEvent("RemoveAndKill").detail("Step", "done").detail("ClusterAvailable", g_simulator.isAvailable());
return Void();
}
@ -448,7 +389,7 @@ struct RemoveServersSafelyWorkload : TestWorkload {
vector<ISimulator::ProcessInfo*> machines;
vector<ISimulator::ProcessInfo*> all = g_simulator.getAllProcesses();
for(int i = 0; i < all.size(); i++)
if (all[i]->name == std::string("Server") && all[i]->startingClass != ProcessClass::TesterClass )
if (all[i]->name == std::string("Server") && all[i]->isAvailableClass())
machines.push_back( all[i] );
return machines;
}