Log detailed information when a worker is considered as unavailable by the cluster controller for TLog recruitment.

This commit is contained in:
Zhe Wu 2021-03-18 12:18:03 -07:00
parent 85d3e2fee5
commit 451b14af09
1 changed files with 48 additions and 11 deletions

View File

@ -337,19 +337,56 @@ public:
logServerSet = Reference<LocalitySet>(new LocalityMap<WorkerDetails>());
logServerMap = (LocalityMap<WorkerDetails>*)logServerSet.getPtr();
for (auto& it : id_worker) {
if (std::find(exclusionWorkerIds.begin(), exclusionWorkerIds.end(), it.second.details.interf.id()) ==
// Populate `unavailableLocals` and log the reason why the worker is considered as unavailable.
auto logWorkerUnavailable = [this, &unavailableLocals](const std::string& reason,
const WorkerDetails& details,
ProcessClass::Fitness fitness) {
unavailableLocals.push_back(details.interf.locality);
// Note that the recruitment happens only during initial database creation and recovery. So these trace
// events should be sparse.
// TODO(zhewu): Add targeting dcids.
TraceEvent("GetTLogTeamWorkerUnavailable", id)
.detail("Reason", reason)
.detail("WorkerID", details.interf.id())
.detail("WorkerDC", details.interf.locality.dcId())
.detail("Address", details.interf.addresses().toString())
.detail("fitness", fitness);
};
// Go through all the workers to list all the workers that can be recruited.
for (const auto& [worker_process_id, worker_info] : id_worker) {
const auto& worker_details = worker_info.details;
if (std::find(exclusionWorkerIds.begin(), exclusionWorkerIds.end(), worker_details.interf.id()) !=
exclusionWorkerIds.end()) {
auto fitness = it.second.details.processClass.machineClassFitness(ProcessClass::TLog);
if (workerAvailable(it.second, checkStable) &&
!conf.isExcludedServer(it.second.details.interf.addresses()) &&
fitness != ProcessClass::NeverAssign &&
(!dcIds.size() || dcIds.count(it.second.details.interf.locality.dcId()))) {
fitness_workers[std::make_pair(fitness, it.second.details.degraded)].push_back(it.second.details);
} else {
unavailableLocals.push_back(it.second.details.interf.locality);
logWorkerUnavailable("Worker is excluded.", worker_details, ProcessClass::UnsetFit);
continue;
}
auto fitness = worker_details.processClass.machineClassFitness(ProcessClass::TLog);
if (!workerAvailable(worker_info, checkStable)) {
logWorkerUnavailable("Worker is not available.", worker_details, fitness);
continue;
}
if (conf.isExcludedServer(worker_details.interf.addresses())) {
logWorkerUnavailable("Worker's server is excluded from the cluster.", worker_details, fitness);
continue;
}
if (fitness == ProcessClass::NeverAssign) {
logWorkerUnavailable("Worker's fitness is NeverAssign.", worker_details, fitness);
continue;
}
if (!dcIds.empty() && dcIds.count(worker_details.interf.locality.dcId()) == 0) {
logWorkerUnavailable("Worker is not in the target DC.", worker_details, fitness);
continue;
}
// This worker is a candidate for TLog recruitment.
fitness_workers[std::make_pair(fitness, worker_details.degraded)].push_back(worker_details);
}
results.reserve(results.size() + id_worker.size());