Notify processes joining the wrong cluster

And have these processes enter a "zombie" state where they cancel all
their actors and then wait forever, refusing to do any additional work
until they are manually handled by the operator.
This commit is contained in:
Lukas Joswiak 2022-10-17 11:56:19 -07:00
parent 2394a9f4b9
commit 743609f217
3 changed files with 26 additions and 23 deletions

View File

@ -1958,8 +1958,15 @@ public:
probe::context::sim2,
probe::assert::simOnly);
// Check if any processes on machine are rebooting
if (processesOnMachine != processesPerMachine && kt >= RebootAndDelete) {
if (processesOnMachine == processesPerMachine + 1 && originalKt == KillType::RebootProcessAndSwitch) {
// Simulation runs which test DR add an extra process to each
// machine in the original cluster. When killing processes with the
// RebootProcessAndSwitch kill type, processes in the original
// cluster should be rebooted in order to kill any zombie
// processes.
kt = KillType::Reboot;
} else if (processesOnMachine != processesPerMachine) {
// Check if any processes on machine are rebooting
CODE_PROBE(true,
"Attempted reboot, but the target did not have all of its processes running",
probe::context::sim2,
@ -1976,24 +1983,6 @@ public:
return false;
}
// Check if any processes on machine are rebooting
if (processesOnMachine != processesPerMachine) {
CODE_PROBE(true,
"Attempted reboot and kill, but the target did not have all of its processes running",
probe::context::sim2,
probe::assert::simOnly);
TraceEvent(SevWarn, "AbortedKill")
.detail("KillType", kt)
.detail("MachineId", machineId)
.detail("Reason", "Machine processes does not match number of processes per machine")
.detail("Processes", processesOnMachine)
.detail("ProcessesPerMachine", processesPerMachine)
.backtrace();
if (ktFinal)
*ktFinal = None;
return false;
}
TraceEvent("KillMachine")
.detail("MachineId", machineId)
.detail("Kt", kt)

View File

@ -1228,12 +1228,15 @@ ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
std::vector<NetworkAddress> coordinatorAddresses = wait(cs.tryResolveHostnames());
const WorkerInterface& w = req.wi;
if (req.clusterId.present() && self->clusterId->get().present() && req.clusterId != self->clusterId->get()) {
if (req.clusterId.present() && self->clusterId->get().present() && req.clusterId != self->clusterId->get() &&
req.processClass != ProcessClass::TesterClass) {
// TODO: Track invalid processes separately, report status in status json
TraceEvent(g_network->isSimulated() ? SevWarnAlways : SevError, "WorkerBelongsToExistingCluster", self->id)
.detail("WorkerClusterId", req.clusterId)
.detail("ClusterControllerClusterId", self->clusterId->get())
.detail("WorkerId", w.id())
.detail("ProcessId", w.locality.processId());
req.reply.sendError(invalid_cluster_id());
return Void();
}

View File

@ -686,6 +686,9 @@ ACTOR Future<Void> registrationClient(
when(wait(FlowTransport::transport().onIncompatibleChanged())) { break; }
when(wait(issues->onChange())) { break; }
when(wait(recovered)) { break; }
when(wait(clusterId->onChange())) {
break;
}
}
}
}
@ -2827,7 +2830,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
f.cancel();
state Error e = err;
bool ok = e.code() == error_code_please_reboot || e.code() == error_code_actor_cancelled ||
e.code() == error_code_please_reboot_delete || e.code() == error_code_local_config_changed;
e.code() == error_code_please_reboot_delete || e.code() == error_code_local_config_changed ||
e.code() == error_code_invalid_cluster_id;
endRole(Role::WORKER, interf.id(), "WorkerError", ok, e);
errorForwarders.clear(false);
sharedLogs.clear();
@ -2864,6 +2868,7 @@ static std::set<int> const& normalWorkerErrors() {
s.insert(error_code_please_reboot);
s.insert(error_code_please_reboot_delete);
s.insert(error_code_local_config_changed);
s.insert(error_code_invalid_cluster_id);
}
return s;
}
@ -3645,7 +3650,13 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
// Otherwise, these actors may get a broken promise error.
for (auto f : actors)
f.cancel();
Error err = checkIOTimeout(e);
state Error err = checkIOTimeout(e);
if (e.code() == error_code_invalid_cluster_id) {
// If this process tried to join an invalid cluster, become a
// zombie and wait for manual action by the operator.
TraceEvent(g_network->isSimulated() ? SevWarnAlways : SevError, "ZombieProcess").error(e);
wait(Never());
}
throw err;
}
}