Fix simulation issue where process switch was ignored
The simulator tracks only active processes. Rebooted or killed processes are removed from the list of processes, and only get added back when the process is rebooted and starts up again. This causes a problem for the `RebootProcessAndSwitch` kill type, which wants to simultaneously reboot all machines in a cluster and change their cluster file. If a machine is currently being rebooted, it will miss the reboot process and switch command. The fix is to add a check when a process is being started in simulation. If the process has had its cluster file changed and the cluster is in a state where all processes should have had their cluster files reverted to the original value, the simulator will now send a `RebootProcessAndSwitch` signal right when the process is started. This will cause an extra reboot, but should correctly switch the process back to its original, correct cluster file, allowing the cluster to fully recover all clusters. Note that the above issue should only affect simulation, due to how the simulator tracks processes and handles kill signals. This commit also adds a field to each process struct to determine whether the process is being run in a DR cluster in the simulation run. This is needed because simulation does not differentiate between processes in different clusters (other than by the IP), and some processes needed to switch clusters and some simply needed to be rebooted.
This commit is contained in:
parent
f43011e4b7
commit
5ca2b89bdf
|
@ -2639,7 +2639,8 @@ TEST_CASE("/ManagementAPI/AutoQuorumChange/checkLocality") {
|
|||
ProcessClass(ProcessClass::CoordinatorClass, ProcessClass::CommandLineSource),
|
||||
"",
|
||||
"",
|
||||
currentProtocolVersion());
|
||||
currentProtocolVersion(),
|
||||
false);
|
||||
}
|
||||
|
||||
workers.push_back(data);
|
||||
|
|
|
@ -105,6 +105,7 @@ public:
|
|||
bool excluded;
|
||||
bool cleared;
|
||||
bool rebooting;
|
||||
bool drProcess;
|
||||
std::vector<flowGlobalType> globals;
|
||||
|
||||
INetworkConnections* network;
|
||||
|
@ -129,8 +130,8 @@ public:
|
|||
const char* coordinationFolder)
|
||||
: name(name), coordinationFolder(coordinationFolder), dataFolder(dataFolder), machine(nullptr),
|
||||
addresses(addresses), address(addresses.address), locality(locality), startingClass(startingClass),
|
||||
failed(false), excluded(false), cleared(false), rebooting(false), network(net), fault_injection_r(0),
|
||||
fault_injection_p1(0), fault_injection_p2(0), failedDisk(false) {
|
||||
failed(false), excluded(false), cleared(false), rebooting(false), drProcess(false), network(net),
|
||||
fault_injection_r(0), fault_injection_p1(0), fault_injection_p2(0), failedDisk(false) {
|
||||
uid = deterministicRandom()->randomUniqueID();
|
||||
}
|
||||
|
||||
|
@ -284,7 +285,8 @@ public:
|
|||
ProcessClass startingClass,
|
||||
const char* dataFolder,
|
||||
const char* coordinationFolder,
|
||||
ProtocolVersion protocol) = 0;
|
||||
ProtocolVersion protocol,
|
||||
bool drProcess) = 0;
|
||||
virtual void killProcess(ProcessInfo* machine, KillType) = 0;
|
||||
virtual void rebootProcess(Optional<Standalone<StringRef>> zoneId, bool allProcesses) = 0;
|
||||
virtual void rebootProcess(ProcessInfo* process, KillType kt) = 0;
|
||||
|
@ -393,7 +395,11 @@ public:
|
|||
}
|
||||
|
||||
void switchCluster(NetworkAddress const& address) { switchedCluster[address] = !switchedCluster[address]; }
|
||||
bool hasSwitchedCluster(NetworkAddress const& address) const { return switchedCluster.at(address); }
|
||||
bool hasSwitchedCluster(NetworkAddress const& address) const {
|
||||
return switchedCluster.find(address) != switchedCluster.end() ? switchedCluster.at(address) : false;
|
||||
}
|
||||
void toggleGlobalSwitchCluster() { globalSwitchedCluster = !globalSwitchedCluster; }
|
||||
bool globalHasSwitchedCluster() const { return globalSwitchedCluster; }
|
||||
|
||||
void excludeAddress(NetworkAddress const& address) {
|
||||
excludedAddresses[address]++;
|
||||
|
@ -546,6 +552,7 @@ private:
|
|||
std::map<NetworkAddress, int> excludedAddresses;
|
||||
std::map<NetworkAddress, int> clearedAddresses;
|
||||
std::map<NetworkAddress, bool> switchedCluster;
|
||||
bool globalSwitchedCluster = false;
|
||||
std::map<NetworkAddress, std::map<std::string, int>> roleAddresses;
|
||||
std::map<std::string, double> disabledMap;
|
||||
bool allSwapsDisabled;
|
||||
|
|
|
@ -1261,7 +1261,8 @@ public:
|
|||
ProcessClass startingClass,
|
||||
const char* dataFolder,
|
||||
const char* coordinationFolder,
|
||||
ProtocolVersion protocol) override {
|
||||
ProtocolVersion protocol,
|
||||
bool drProcess) override {
|
||||
ASSERT(locality.machineId().present());
|
||||
MachineInfo& machine = machines[locality.machineId().get()];
|
||||
if (!machine.machineId.present())
|
||||
|
@ -1311,6 +1312,7 @@ public:
|
|||
m->excluded = g_simulator->isExcluded(NetworkAddress(ip, port, true, false));
|
||||
m->cleared = g_simulator->isCleared(addresses.address);
|
||||
m->protocolVersion = protocol;
|
||||
m->drProcess = drProcess;
|
||||
|
||||
m->setGlobal(enTDMetrics, (flowGlobalType)&m->tdmetrics);
|
||||
if (FLOW_KNOBS->ENABLE_CHAOS_FEATURES) {
|
||||
|
@ -1324,7 +1326,8 @@ public:
|
|||
.detail("Address", m->address)
|
||||
.detail("MachineId", m->locality.machineId())
|
||||
.detail("Excluded", m->excluded)
|
||||
.detail("Cleared", m->cleared);
|
||||
.detail("Cleared", m->cleared)
|
||||
.detail("DrProcess", m->drProcess);
|
||||
|
||||
if (std::string(name) == "remote flow process") {
|
||||
protectedAddresses.insert(m->address);
|
||||
|
@ -1825,6 +1828,7 @@ public:
|
|||
}
|
||||
|
||||
int processesOnMachine = 0;
|
||||
bool isMainCluster = true; // false for machines running DR processes
|
||||
|
||||
KillType originalKt = kt;
|
||||
// Reboot if any of the processes are protected and count the number of processes not rebooting
|
||||
|
@ -1833,6 +1837,9 @@ public:
|
|||
kt = Reboot;
|
||||
if (!process->rebooting)
|
||||
processesOnMachine++;
|
||||
if (process->drProcess) {
|
||||
isMainCluster = false;
|
||||
}
|
||||
}
|
||||
|
||||
// Do nothing, if no processes to kill
|
||||
|
@ -1959,14 +1966,12 @@ public:
|
|||
probe::context::sim2,
|
||||
probe::assert::simOnly);
|
||||
|
||||
if (processesOnMachine == processesPerMachine + 1 && originalKt == KillType::RebootProcessAndSwitch) {
|
||||
// Simulation runs which test DR add an extra process to each
|
||||
// machine in the original cluster. When killing processes with the
|
||||
// RebootProcessAndSwitch kill type, processes in the original
|
||||
// cluster should be rebooted in order to kill any zombie
|
||||
// processes.
|
||||
if (isMainCluster && originalKt == RebootProcessAndSwitch) {
|
||||
// When killing processes with the RebootProcessAndSwitch kill
|
||||
// type, processes in the original cluster should be rebooted in
|
||||
// order to kill any zombie processes.
|
||||
kt = KillType::Reboot;
|
||||
} else if (processesOnMachine != processesPerMachine) {
|
||||
} else if (processesOnMachine != processesPerMachine && kt != RebootProcessAndSwitch) {
|
||||
// Check if any processes on machine are rebooting
|
||||
CODE_PROBE(true,
|
||||
"Attempted reboot, but the target did not have all of its processes running",
|
||||
|
|
|
@ -170,7 +170,8 @@ ACTOR Future<int> spawnSimulated(std::vector<std::string> paramList,
|
|||
ProcessClass(ProcessClass::UnsetClass, ProcessClass::AutoSource),
|
||||
self->dataFolder.c_str(),
|
||||
self->coordinationFolder.c_str(), // do we need to customize this coordination folder path?
|
||||
self->protocolVersion);
|
||||
self->protocolVersion,
|
||||
false);
|
||||
wait(g_simulator->onProcess(child));
|
||||
state Future<ISimulator::KillType> onShutdown = child->onShutdown();
|
||||
state Future<ISimulator::KillType> parentShutdown = self->onShutdown();
|
||||
|
|
|
@ -625,7 +625,8 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<IClusterConne
|
|||
AgentMode runBackupAgents,
|
||||
std::string whitelistBinPaths,
|
||||
ProtocolVersion protocolVersion,
|
||||
ConfigDBType configDBType) {
|
||||
ConfigDBType configDBType,
|
||||
bool isDr) {
|
||||
state ISimulator::ProcessInfo* simProcess = g_simulator->getCurrentProcess();
|
||||
state UID randomId = nondeterministicRandom()->randomUniqueID();
|
||||
state int cycles = 0;
|
||||
|
@ -645,7 +646,8 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<IClusterConne
|
|||
.detail("Address", NetworkAddress(ip, port, true, false))
|
||||
.detail("ZoneId", localities.zoneId())
|
||||
.detail("WaitTime", waitTime)
|
||||
.detail("Port", port);
|
||||
.detail("Port", port)
|
||||
.detail("IsDr", isDr);
|
||||
|
||||
wait(delay(waitTime));
|
||||
|
||||
|
@ -658,7 +660,8 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<IClusterConne
|
|||
processClass,
|
||||
dataFolder->c_str(),
|
||||
coordFolder->c_str(),
|
||||
protocolVersion);
|
||||
protocolVersion,
|
||||
isDr);
|
||||
wait(g_simulator->onProcess(
|
||||
process,
|
||||
TaskPriority::DefaultYield)); // Now switch execution to the process on which we will run
|
||||
|
@ -725,6 +728,16 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<IClusterConne
|
|||
}
|
||||
|
||||
futures.push_back(success(onShutdown));
|
||||
if (!g_simulator->globalHasSwitchedCluster() && g_simulator->hasSwitchedCluster(process->address)) {
|
||||
// When switching machines between clusters, a simultaneous
|
||||
// reboot followed by a reboot and switch can cause the
|
||||
// reboot and switch to be ignored. Handle this case by
|
||||
// sending the reboot and switch kill type when the process
|
||||
// comes back online.
|
||||
TraceEvent("RebootProcessAndSwitchLateReboot").detail("Address", process->address);
|
||||
g_simulator->switchCluster(process->address);
|
||||
process->shutdownSignal.send(ISimulator::KillType::RebootProcessAndSwitch);
|
||||
}
|
||||
wait(waitForAny(futures));
|
||||
} catch (Error& e) {
|
||||
// If in simulation, if we make it here with an error other than io_timeout but enASIOTimedOut is set
|
||||
|
@ -840,7 +853,8 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<IClusterConne
|
|||
.detail("KillType", shutdownResult)
|
||||
.detail("ConnectionString", connStr.toString())
|
||||
.detail("OtherConnectionString", otherConnStr.toString())
|
||||
.detail("SwitchingTo", g_simulator->hasSwitchedCluster(process->address));
|
||||
.detail("SwitchingTo", g_simulator->hasSwitchedCluster(process->address))
|
||||
.detail("MachineId", process->machine->machineId);
|
||||
|
||||
// Handle the case where otherConnStr is '@'.
|
||||
if (otherConnStr.toString().size() > 1) {
|
||||
|
@ -876,7 +890,8 @@ ACTOR Future<Void> simulatedMachine(ClusterConnectionString connStr,
|
|||
bool sslOnly,
|
||||
std::string whitelistBinPaths,
|
||||
ProtocolVersion protocolVersion,
|
||||
ConfigDBType configDBType) {
|
||||
ConfigDBType configDBType,
|
||||
bool isDr) {
|
||||
state int bootCount = 0;
|
||||
state std::vector<std::string> myFolders;
|
||||
state std::vector<std::string> coordFolders;
|
||||
|
@ -948,7 +963,8 @@ ACTOR Future<Void> simulatedMachine(ClusterConnectionString connStr,
|
|||
agentMode,
|
||||
whitelistBinPaths,
|
||||
protocolVersion,
|
||||
configDBType));
|
||||
configDBType,
|
||||
isDr));
|
||||
g_simulator->setDiffProtocol = true;
|
||||
} else {
|
||||
processes.push_back(simulatedFDBDRebooter(clusterFile,
|
||||
|
@ -967,7 +983,8 @@ ACTOR Future<Void> simulatedMachine(ClusterConnectionString connStr,
|
|||
agentMode,
|
||||
whitelistBinPaths,
|
||||
g_network->protocolVersion(),
|
||||
configDBType));
|
||||
configDBType,
|
||||
isDr));
|
||||
}
|
||||
TraceEvent("SimulatedMachineProcess", randomId)
|
||||
.detail("Address", NetworkAddress(ips[i], listenPort, true, false))
|
||||
|
@ -1344,7 +1361,8 @@ ACTOR Future<Void> restartSimulatedSystem(std::vector<Future<Void>>* systemActor
|
|||
usingSSL && (listenersPerProcess == 1 || processClass == ProcessClass::TesterClass),
|
||||
whitelistBinPaths,
|
||||
protocolVersion,
|
||||
configDBType),
|
||||
configDBType,
|
||||
false),
|
||||
processClass == ProcessClass::TesterClass ? "SimulatedTesterMachine" : "SimulatedMachine"));
|
||||
}
|
||||
|
||||
|
@ -2383,7 +2401,8 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
|
|||
sslOnly,
|
||||
whitelistBinPaths,
|
||||
protocolVersion,
|
||||
configDBType),
|
||||
configDBType,
|
||||
false),
|
||||
"SimulatedMachine"));
|
||||
|
||||
if (requiresExtraDBMachines) {
|
||||
|
@ -2413,7 +2432,8 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
|
|||
sslOnly,
|
||||
whitelistBinPaths,
|
||||
protocolVersion,
|
||||
configDBType),
|
||||
configDBType,
|
||||
true),
|
||||
"SimulatedMachine"));
|
||||
++cluster;
|
||||
}
|
||||
|
@ -2460,7 +2480,8 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
|
|||
sslOnly,
|
||||
whitelistBinPaths,
|
||||
protocolVersion,
|
||||
configDBType),
|
||||
configDBType,
|
||||
false),
|
||||
"SimulatedTesterMachine"));
|
||||
}
|
||||
|
||||
|
@ -2584,7 +2605,8 @@ ACTOR void setupAndRun(std::string dataFolder,
|
|||
ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource),
|
||||
"",
|
||||
"",
|
||||
currentProtocolVersion());
|
||||
currentProtocolVersion(),
|
||||
false);
|
||||
testSystem->excludeFromRestarts = true;
|
||||
wait(g_simulator->onProcess(testSystem, TaskPriority::DefaultYield));
|
||||
Sim2FileSystem::newFileSystem();
|
||||
|
|
|
@ -74,7 +74,8 @@ class WorkloadProcessState {
|
|||
ProcessClass(ProcessClass::TesterClass, ProcessClass::AutoSource),
|
||||
dataFolder.c_str(),
|
||||
parent->coordinationFolder.c_str(),
|
||||
parent->protocolVersion);
|
||||
parent->protocolVersion,
|
||||
false);
|
||||
self->childProcess->excludeFromRestarts = true;
|
||||
wait(g_simulator->onProcess(self->childProcess, TaskPriority::DefaultYield));
|
||||
try {
|
||||
|
|
|
@ -359,9 +359,12 @@ struct MachineAttritionWorkload : FailureInjectionWorkload {
|
|||
g_simulator->killDataHall(target, kt);
|
||||
} else if (!g_simulator->extraDatabases.empty() && deterministicRandom()->random01() < 0.1) {
|
||||
state ISimulator::KillType kt = ISimulator::RebootProcessAndSwitch;
|
||||
TraceEvent("Assassination").detail("KillType", kt);
|
||||
g_simulator->killAll(kt, true);
|
||||
g_simulator->toggleGlobalSwitchCluster();
|
||||
wait(delay(self->testDuration / 2));
|
||||
g_simulator->killAll(kt, true);
|
||||
g_simulator->toggleGlobalSwitchCluster();
|
||||
} else {
|
||||
state int killedMachines = 0;
|
||||
while (killedMachines < self->machinesToKill && self->machines.size() > self->machinesToLeave) {
|
||||
|
|
Loading…
Reference in New Issue