Fixed multiple bugs related to locality based exclusions (#10623)

* fix: Non-storage processes were not being checked for locality exclusions
fix: Data distribution when not detect a newly added process was locality excluded
fix: RemoveServerSafely did not wait for processes to be excluded before killing them when excluding localities

* fix: do not allow locality based excludes if they cannot exclude the required addresses
This commit is contained in:
Evan Tschannen 2023-08-11 15:17:02 -07:00 committed by GitHub
parent 3b295b4c93
commit 3209dc7b30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 74 additions and 41 deletions

View File

@ -746,7 +746,7 @@ Optional<ValueRef> DatabaseConfiguration::get(KeyRef key) const {
}
}
bool DatabaseConfiguration::isExcludedServer(NetworkAddressList a) const {
bool DatabaseConfiguration::isExcludedServer(NetworkAddressList a, const LocalityData& locality) const {
return get(encodeExcludedServersKey(AddressExclusion(a.address.ip, a.address.port))).present() ||
get(encodeExcludedServersKey(AddressExclusion(a.address.ip))).present() ||
get(encodeFailedServersKey(AddressExclusion(a.address.ip, a.address.port))).present() ||
@ -757,7 +757,8 @@ bool DatabaseConfiguration::isExcludedServer(NetworkAddressList a) const {
get(encodeExcludedServersKey(AddressExclusion(a.secondaryAddress.get().ip))).present() ||
get(encodeFailedServersKey(AddressExclusion(a.secondaryAddress.get().ip, a.secondaryAddress.get().port)))
.present() ||
get(encodeFailedServersKey(AddressExclusion(a.secondaryAddress.get().ip))).present()));
get(encodeFailedServersKey(AddressExclusion(a.secondaryAddress.get().ip))).present())) ||
isExcludedLocality(locality);
}
std::set<AddressExclusion> DatabaseConfiguration::getExcludedServers() const {
const_cast<DatabaseConfiguration*>(this)->makeConfigurationImmutable();

View File

@ -264,7 +264,7 @@ struct DatabaseConfiguration {
EncryptionAtRestMode encryptionAtRestMode;
// Excluded servers (no state should be here)
bool isExcludedServer(NetworkAddressList) const;
bool isExcludedServer(NetworkAddressList, const LocalityData& locality) const;
bool isExcludedLocality(const LocalityData& locality) const;
bool isMachineExcluded(const LocalityData& locality) const;
std::set<AddressExclusion> getExcludedServers() const;

View File

@ -533,7 +533,7 @@ ProcessClass::Fitness findBestFitnessForSingleton(const ClusterControllerData* s
// If the process has been marked as excluded, we take the max with ExcludeFit to ensure its fit
// is at least as bad as ExcludeFit. This assists with successfully offboarding such processes
// and removing them from the cluster.
if (self->db.config.isExcludedServer(worker.interf.addresses())) {
if (self->db.config.isExcludedServer(worker.interf.addresses(), worker.interf.locality)) {
bestFitness = std::max(bestFitness, ProcessClass::ExcludeFit);
}
return bestFitness;
@ -1032,8 +1032,8 @@ void clusterRegisterMaster(ClusterControllerData* self, RegisterMasterRequest co
self->gotFullyRecoveredConfig = true;
db->fullyRecoveredConfig = req.configuration.get();
for (auto& it : self->id_worker) {
bool isExcludedFromConfig =
db->fullyRecoveredConfig.isExcludedServer(it.second.details.interf.addresses());
bool isExcludedFromConfig = db->fullyRecoveredConfig.isExcludedServer(
it.second.details.interf.addresses(), it.second.details.interf.locality);
if (it.second.priorityInfo.isExcluded != isExcludedFromConfig) {
it.second.priorityInfo.isExcluded = isExcludedFromConfig;
if (!it.second.reply.isSet()) {
@ -1255,7 +1255,7 @@ ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
}
if (self->gotFullyRecoveredConfig) {
newPriorityInfo.isExcluded = self->db.fullyRecoveredConfig.isExcludedServer(w.addresses());
newPriorityInfo.isExcluded = self->db.fullyRecoveredConfig.isExcludedServer(w.addresses(), w.locality);
}
}
@ -3133,7 +3133,8 @@ ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf,
for (auto const& [id, worker] : self.id_worker) {
if ((req.flags & GetWorkersRequest::NON_EXCLUDED_PROCESSES_ONLY) &&
self.db.config.isExcludedServer(worker.details.interf.addresses())) {
self.db.config.isExcludedServer(worker.details.interf.addresses(),
worker.details.interf.locality)) {
continue;
}

View File

@ -392,7 +392,7 @@ JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics,
tempList.address = it->first;
bool excludedServer = true;
bool excludedLocality = true;
if (configuration.present() && !configuration.get().isExcludedServer(tempList))
if (configuration.present() && !configuration.get().isExcludedServer(tempList, LocalityData()))
excludedServer = false;
if (locality.count(it->first) && configuration.present() &&
!configuration.get().isMachineExcluded(locality[it->first]))
@ -1086,8 +1086,8 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
statusObj["roles"] = roles.getStatusForAddress(address);
if (configuration.present()) {
statusObj["excluded"] = configuration.get().isExcludedServer(workerItr->interf.addresses()) ||
configuration.get().isExcludedLocality(workerItr->interf.locality);
statusObj["excluded"] =
configuration.get().isExcludedServer(workerItr->interf.addresses(), workerItr->interf.locality);
}
statusObj["class_type"] = workerItr->processClass.toString();
@ -2080,7 +2080,7 @@ static int getExtraTLogEligibleZones(const std::vector<WorkerDetails>& workers,
std::map<Key, std::set<StringRef>> dcId_zone;
for (auto const& worker : workers) {
if (worker.processClass.machineClassFitness(ProcessClass::TLog) < ProcessClass::NeverAssign &&
!configuration.isExcludedServer(worker.interf.addresses())) {
!configuration.isExcludedServer(worker.interf.addresses(), worker.interf.locality)) {
allZones.insert(worker.interf.locality.zoneId().get());
if (worker.interf.locality.dcId().present()) {
dcId_zone[worker.interf.locality.dcId().get()].insert(worker.interf.locality.zoneId().get());

View File

@ -430,7 +430,7 @@ public:
for (auto& it : id_worker) {
auto fitness = it.second.details.processClass.machineClassFitness(ProcessClass::Storage);
if (workerAvailable(it.second, false) && it.second.details.recoveredDiskFiles &&
!conf.isExcludedServer(it.second.details.interf.addresses()) &&
!conf.isExcludedServer(it.second.details.interf.addresses(), it.second.details.interf.locality) &&
!isExcludedDegradedServer(it.second.details.interf.addresses()) &&
fitness != ProcessClass::NeverAssign &&
(!dcId.present() || it.second.details.interf.locality.dcId() == dcId.get())) {
@ -667,7 +667,7 @@ public:
SevInfo, id, "complex", "Worker disk file recovery unfinished", worker_details, fitness, dcIds);
continue;
}
if (conf.isExcludedServer(worker_details.interf.addresses())) {
if (conf.isExcludedServer(worker_details.interf.addresses(), worker_details.interf.locality)) {
logWorkerUnavailable(SevInfo,
id,
"complex",
@ -917,7 +917,7 @@ public:
SevInfo, id, "simple", "Worker disk file recovery unfinished", worker_details, fitness, dcIds);
continue;
}
if (conf.isExcludedServer(worker_details.interf.addresses())) {
if (conf.isExcludedServer(worker_details.interf.addresses(), worker_details.interf.locality)) {
logWorkerUnavailable(SevInfo,
id,
"simple",
@ -1069,7 +1069,7 @@ public:
SevInfo, id, "deprecated", "Worker disk file recovery unfinished", worker_details, fitness, dcIds);
continue;
}
if (conf.isExcludedServer(worker_details.interf.addresses())) {
if (conf.isExcludedServer(worker_details.interf.addresses(), worker_details.interf.locality)) {
logWorkerUnavailable(SevInfo,
id,
"deprecated",
@ -1512,7 +1512,7 @@ public:
for (auto& it : id_worker) {
auto fitness = it.second.details.processClass.machineClassFitness(role);
if (conf.isExcludedServer(it.second.details.interf.addresses()) ||
if (conf.isExcludedServer(it.second.details.interf.addresses(), it.second.details.interf.locality) ||
isExcludedDegradedServer(it.second.details.interf.addresses())) {
fitness = std::max(fitness, ProcessClass::ExcludeFit);
}
@ -1559,7 +1559,7 @@ public:
for (auto& it : id_worker) {
auto fitness = it.second.details.processClass.machineClassFitness(role);
if (workerAvailable(it.second, checkStable) &&
!conf.isExcludedServer(it.second.details.interf.addresses()) &&
!conf.isExcludedServer(it.second.details.interf.addresses(), it.second.details.interf.locality) &&
!isExcludedDegradedServer(it.second.details.interf.addresses()) &&
it.second.details.interf.locality.dcId() == dcId &&
(!minWorker.present() ||
@ -1696,7 +1696,7 @@ public:
std::set<Optional<Standalone<StringRef>>> result;
for (auto& it : id_worker)
if (workerAvailable(it.second, checkStable) &&
!conf.isExcludedServer(it.second.details.interf.addresses()) &&
!conf.isExcludedServer(it.second.details.interf.addresses(), it.second.details.interf.locality) &&
!isExcludedDegradedServer(it.second.details.interf.addresses()))
result.insert(it.second.details.interf.locality.dcId());
return result;
@ -2210,7 +2210,7 @@ public:
auto w = id_worker.find(worker.locality.processId());
ASSERT(w != id_worker.end());
auto const& [_, workerInfo] = *w;
ASSERT(!conf.isExcludedServer(workerInfo.details.interf.addresses()));
ASSERT(!conf.isExcludedServer(workerInfo.details.interf.addresses(), workerInfo.details.interf.locality));
firstDetails.push_back(workerInfo.details);
//TraceEvent("CompareAddressesFirst").detail(description.c_str(), w->second.details.interf.address());
}
@ -2221,7 +2221,7 @@ public:
auto w = id_worker.find(worker.locality.processId());
ASSERT(w != id_worker.end());
auto const& [_, workerInfo] = *w;
ASSERT(!conf.isExcludedServer(workerInfo.details.interf.addresses()));
ASSERT(!conf.isExcludedServer(workerInfo.details.interf.addresses(), workerInfo.details.interf.locality));
secondDetails.push_back(workerInfo.details);
//TraceEvent("CompareAddressesSecond").detail(description.c_str(), w->second.details.interf.address());
}
@ -2568,7 +2568,7 @@ public:
// still need master for recovery.
ProcessClass::Fitness oldMasterFit =
masterWorker->second.details.processClass.machineClassFitness(ProcessClass::Master);
if (db.config.isExcludedServer(dbi.master.addresses())) {
if (db.config.isExcludedServer(dbi.master.addresses(), dbi.master.locality)) {
oldMasterFit = std::max(oldMasterFit, ProcessClass::ExcludeFit);
}
@ -2579,7 +2579,7 @@ public:
WorkerFitnessInfo mworker = getWorkerForRoleInDatacenter(
clusterControllerDcId, ProcessClass::Master, ProcessClass::NeverAssign, db.config, id_used, {}, true);
auto newMasterFit = mworker.worker.processClass.machineClassFitness(ProcessClass::Master);
if (db.config.isExcludedServer(mworker.worker.interf.addresses())) {
if (db.config.isExcludedServer(mworker.worker.interf.addresses(), mworker.worker.interf.locality)) {
newMasterFit = std::max(newMasterFit, ProcessClass::ExcludeFit);
}

View File

@ -108,14 +108,29 @@ struct ExclusionTracker {
newFailed.insert(localityFailedAddresses.begin(), localityFailedAddresses.end());
}
self->excluded = newExcluded;
self->failed = newFailed;
self->changed.trigger();
bool foundChange = false;
if (self->excluded != newExcluded) {
self->excluded = newExcluded;
foundChange = true;
}
if (self->failed != newFailed) {
self->failed = newFailed;
foundChange = true;
}
if (foundChange) {
self->changed.trigger();
}
state Future<Void> watchFuture =
tr.watch(excludedServersVersionKey) || tr.watch(failedServersVersionKey) ||
tr.watch(excludedLocalityVersionKey) || tr.watch(failedLocalityVersionKey);
wait(tr.commit());
if (excludedLocalityResults.size() > 0 || failedLocalityResults.size() > 0) {
// when there are excluded localities we need to monitor for when the worker list changes, so we
// must poll
watchFuture = watchFuture || delay(10.0);
}
wait(watchFuture);
tr.reset();
} catch (Error& e) {

View File

@ -1055,7 +1055,7 @@ struct ConsistencyCheckWorkload : TestWorkload {
for (int i = 0; i < workers.size(); i++) {
NetworkAddress addr = workers[i].interf.stableAddress();
if (!configuration.isExcludedServer(workers[i].interf.addresses()) &&
if (!configuration.isExcludedServer(workers[i].interf.addresses(), workers[i].interf.locality) &&
(workers[i].processClass == ProcessClass::StorageClass ||
workers[i].processClass == ProcessClass::UnsetClass)) {
bool found = false;
@ -1271,7 +1271,7 @@ struct ConsistencyCheckWorkload : TestWorkload {
for (const auto& worker : workers) {
NetworkAddress addr = worker.interf.stableAddress();
bool inCCDc = worker.interf.locality.dcId() == ccDcId;
if (!configuration.isExcludedServer(worker.interf.addresses())) {
if (!configuration.isExcludedServer(worker.interf.addresses(), worker.interf.locality)) {
if (worker.processClass == ProcessClass::BlobWorkerClass) {
numBlobWorkerProcesses++;

View File

@ -637,9 +637,9 @@ struct RemoveServersSafelyWorkload : TestWorkload {
state bool excludeLocalitiesInsteadOfServers = deterministicRandom()->coinflip();
if (markExcludeAsFailed) {
if (excludeLocalitiesInsteadOfServers) {
state std::unordered_set<std::string> toKillLocalitiesFailed =
self->getLocalitiesFromAddresses(toKillMarkFailedArray);
state std::unordered_set<std::string> toKillLocalitiesFailed =
self->getLocalitiesFromAddresses(toKillMarkFailedArray);
if (excludeLocalitiesInsteadOfServers && toKillLocalitiesFailed.size() > 0) {
TraceEvent("RemoveAndKill", functionId)
.detail("Step", "Excluding localities with failed option")
.detail("FailedAddressesSize", toKillMarkFailedArray.size())
@ -658,8 +658,8 @@ struct RemoveServersSafelyWorkload : TestWorkload {
}
}
if (excludeLocalitiesInsteadOfServers) {
state std::unordered_set<std::string> toKillLocalities = self->getLocalitiesFromAddresses(toKillArray);
state std::unordered_set<std::string> toKillLocalities = self->getLocalitiesFromAddresses(toKillArray);
if (excludeLocalitiesInsteadOfServers && toKillLocalities.size() > 0) {
TraceEvent("RemoveAndKill", functionId)
.detail("Step", "Excluding localities without failed option")
.detail("AddressesSize", toKillArray.size())
@ -680,14 +680,12 @@ struct RemoveServersSafelyWorkload : TestWorkload {
// We need to skip at least the quorum change if there's nothing to kill, because there might not be enough
// servers left alive to do a coordinators auto (?)
if (toKill.size()) {
if (!excludeLocalitiesInsteadOfServers) {
// Wait for removal to be safe
TraceEvent("RemoveAndKill", functionId)
.detail("Step", "Wait For Server Exclusion")
.detail("Addresses", describe(toKill))
.detail("ClusterAvailable", g_simulator->isAvailable());
wait(success(checkForExcludingServers(cx, toKillArray, true /* wait for exclusion */)));
}
// Wait for removal to be safe
TraceEvent("RemoveAndKill", functionId)
.detail("Step", "Wait For Server Exclusion")
.detail("Addresses", describe(toKill))
.detail("ClusterAvailable", g_simulator->isAvailable());
wait(success(checkForExcludingServers(cx, toKillArray, true /* wait for exclusion */)));
TraceEvent("RemoveAndKill", functionId)
.detail("Step", "coordinators auto")
@ -784,6 +782,24 @@ struct RemoveServersSafelyWorkload : TestWorkload {
}
}
for (const auto& processInfo : processes) {
AddressExclusion pAddr(processInfo->address.ip, processInfo->address.port);
if (std::find(addresses.begin(), addresses.end(), pAddr) != addresses.end()) {
std::map<std::string, std::string> localityData = processInfo->locality.getAllData();
bool found = false;
for (const auto& l : localityData) {
if (toKillLocalities.count(LocalityData::ExcludeLocalityPrefix.toString() + l.first + ":" +
l.second)) {
found = true;
break;
}
}
if (!found) {
return std::unordered_set<std::string>();
}
}
}
return toKillLocalities;
}