Bring #4518 (Logging more detailed information during Tlog recruitment) back.

This commit is contained in:
RenxuanW 2021-05-13 12:20:31 -07:00
parent 061afda2ec
commit 8a15d7d14b
1 changed files with 94 additions and 16 deletions

View File

@ -458,6 +458,33 @@ public:
}
}
// Log the reason why the worker is considered as unavailable.
void logWorkerUnavailable(const UID& id,
const std::string& method,
const std::string& reason,
const WorkerDetails& details,
const ProcessClass::Fitness& fitness,
const std::set<Optional<Key>>& dcIds) {
// Construct the list of DCs where the TLog recruitment is happening. This is mainly for logging purpose.
std::string dcList;
for (const auto& dc : dcIds) {
if (!dcList.empty()) {
dcList += ',';
}
dcList += printable(dc);
}
// Note that the recruitment happens only during initial database creation and recovery. So these trace
// events should be sparse.
TraceEvent("GetTLogTeamWorkerUnavailable", id)
.detail("TLogRecruitMethod", method)
.detail("Reason", reason)
.detail("WorkerID", details.interf.id())
.detail("WorkerDC", details.interf.locality.dcId())
.detail("Address", details.interf.addresses().toString())
.detail("Fitness", fitness)
.detail("RecruitmentDcIds", dcList);
};
// A TLog recruitment method specialized for three_data_hall and three_datacenter configurations
// It attempts to evenly recruit processes from across data_halls or datacenters
std::vector<WorkerDetails> getWorkersForTlogsComplex(DatabaseConfiguration const& conf,
@ -478,11 +505,30 @@ public:
auto fitness = worker_details.processClass.machineClassFitness(ProcessClass::TLog);
if (std::find(exclusionWorkerIds.begin(), exclusionWorkerIds.end(), worker_details.interf.id()) !=
exclusionWorkerIds.end() ||
!workerAvailable(worker_info, checkStable) ||
conf.isExcludedServer(worker_details.interf.addresses()) || fitness == ProcessClass::NeverAssign ||
(!dcIds.empty() && dcIds.count(worker_details.interf.locality.dcId()) == 0) ||
(!allowDegraded && worker_details.degraded)) {
exclusionWorkerIds.end()) {
logWorkerUnavailable(id, "complex", "Worker is excluded", worker_details, fitness, dcIds);
continue;
}
if (!workerAvailable(worker_info, checkStable)) {
logWorkerUnavailable(id, "complex", "Worker is not available", worker_details, fitness, dcIds);
continue;
}
if (conf.isExcludedServer(worker_details.interf.addresses())) {
logWorkerUnavailable(
id, "complex", "Worker server is excluded from the cluster", worker_details, fitness, dcIds);
continue;
}
if (fitness == ProcessClass::NeverAssign) {
logWorkerUnavailable(id, "complex", "Worker's fitness is NeverAssign", worker_details, fitness, dcIds);
continue;
}
if (!dcIds.empty() && dcIds.count(worker_details.interf.locality.dcId()) == 0) {
logWorkerUnavailable(id, "complex", "Worker is not in the target DC", worker_details, fitness, dcIds);
continue;
}
if (!allowDegraded && worker_details.degraded) {
logWorkerUnavailable(
id, "complex", "Worker is degraded and not allowed", worker_details, fitness, dcIds);
continue;
}
@ -686,10 +732,25 @@ public:
const auto& worker_details = worker_info.details;
auto fitness = worker_details.processClass.machineClassFitness(ProcessClass::TLog);
if (std::find(exclusionWorkerIds.begin(), exclusionWorkerIds.end(), worker_details.interf.id()) !=
exclusionWorkerIds.end() ||
!workerAvailable(worker_info, checkStable) ||
conf.isExcludedServer(worker_details.interf.addresses()) || fitness == ProcessClass::NeverAssign ||
(!dcIds.empty() && dcIds.count(worker_details.interf.locality.dcId()) == 0)) {
exclusionWorkerIds.end()) {
logWorkerUnavailable(id, "simple", "Worker is excluded", worker_details, fitness, dcIds);
continue;
}
if (!workerAvailable(worker_info, checkStable)) {
logWorkerUnavailable(id, "simple", "Worker is not available", worker_details, fitness, dcIds);
continue;
}
if (conf.isExcludedServer(worker_details.interf.addresses())) {
logWorkerUnavailable(
id, "simple", "Worker server is excluded from the cluster", worker_details, fitness, dcIds);
continue;
}
if (fitness == ProcessClass::NeverAssign) {
logWorkerUnavailable(id, "simple", "Worker's fitness is NeverAssign", worker_details, fitness, dcIds);
continue;
}
if (!dcIds.empty() && dcIds.count(worker_details.interf.locality.dcId()) == 0) {
logWorkerUnavailable(id, "simple", "Worker is not in the target DC", worker_details, fitness, dcIds);
continue;
}
@ -795,10 +856,27 @@ public:
const auto& worker_details = worker_info.details;
auto fitness = worker_details.processClass.machineClassFitness(ProcessClass::TLog);
if (std::find(exclusionWorkerIds.begin(), exclusionWorkerIds.end(), worker_details.interf.id()) !=
exclusionWorkerIds.end() ||
!workerAvailable(worker_info, checkStable) ||
conf.isExcludedServer(worker_details.interf.addresses()) || fitness == ProcessClass::NeverAssign ||
(!dcIds.empty() && dcIds.count(worker_details.interf.locality.dcId()) == 0)) {
exclusionWorkerIds.end()) {
logWorkerUnavailable(id, "deprecated", "Worker is excluded", worker_details, fitness, dcIds);
continue;
}
if (!workerAvailable(worker_info, checkStable)) {
logWorkerUnavailable(id, "deprecated", "Worker is not available", worker_details, fitness, dcIds);
continue;
}
if (conf.isExcludedServer(worker_details.interf.addresses())) {
logWorkerUnavailable(
id, "deprecated", "Worker server is excluded from the cluster", worker_details, fitness, dcIds);
continue;
}
if (fitness == ProcessClass::NeverAssign) {
logWorkerUnavailable(
id, "deprecated", "Worker's fitness is NeverAssign", worker_details, fitness, dcIds);
continue;
}
if (!dcIds.empty() && dcIds.count(worker_details.interf.locality.dcId()) == 0) {
logWorkerUnavailable(
id, "deprecated", "Worker is not in the target DC", worker_details, fitness, dcIds);
continue;
}
@ -3091,9 +3169,9 @@ ACTOR Future<Void> workerAvailabilityWatch(WorkerInterface worker,
cluster->masterProcessId = Optional<Key>();
}
TraceEvent("ClusterControllerWorkerFailed", cluster->id)
.detail("ProcessId", worker.locality.processId())
.detail("ProcessClass", failedWorkerInfo.details.processClass.toString())
.detail("Address", worker.address());
.detail("ProcessId", worker.locality.processId())
.detail("ProcessClass", failedWorkerInfo.details.processClass.toString())
.detail("Address", worker.address());
cluster->removedDBInfoEndpoints.insert(worker.updateServerDBInfo.getEndpoint());
cluster->id_worker.erase(worker.locality.processId());
cluster->updateWorkerList.set(worker.locality.processId(), Optional<ProcessData>());