Merge pull request #11009 from johscheuer/verify-localities-and-server

Make sure server list is validated against the excluded localities
This commit is contained in:
Jingyu Zhou 2023-10-24 11:00:18 -07:00 committed by GitHub
commit c5f791d11a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 116 additions and 50 deletions

View File

@ -170,9 +170,8 @@ ACTOR Future<std::vector<std::string>> getFailedLocalities(Reference<IDatabase>
}
ACTOR Future<std::set<NetworkAddress>> checkForExcludingServers(Reference<IDatabase> db,
std::vector<AddressExclusion> excl,
std::set<AddressExclusion> exclusions,
bool waitForAllExcluded) {
state std::set<AddressExclusion> exclusions(excl.begin(), excl.end());
state std::set<NetworkAddress> inProgressExclusion;
state Reference<ITransaction> tr = db->createTransaction();
loop {
@ -220,7 +219,7 @@ ACTOR Future<std::set<NetworkAddress>> checkForExcludingServers(Reference<IDatab
return inProgressExclusion;
}
ACTOR Future<Void> checkForCoordinators(Reference<IDatabase> db, std::vector<AddressExclusion> exclusionVector) {
ACTOR Future<Void> checkForCoordinators(Reference<IDatabase> db, std::set<AddressExclusion> exclusions) {
state bool foundCoordinator = false;
state std::vector<NetworkAddress> coordinatorList;
@ -237,9 +236,10 @@ ACTOR Future<Void> checkForCoordinators(Reference<IDatabase> db, std::vector<Add
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
for (const auto& c : coordinatorList) {
if (std::count(exclusionVector.begin(), exclusionVector.end(), AddressExclusion(c.ip, c.port)) ||
std::count(exclusionVector.begin(), exclusionVector.end(), AddressExclusion(c.ip))) {
if (exclusions.find(AddressExclusion(c.ip, c.port)) != exclusions.end() ||
exclusions.find(AddressExclusion(c.ip)) != exclusions.end()) {
fprintf(stderr, "WARNING: %s is a coordinator!\n", c.toString().c_str());
foundCoordinator = true;
}
@ -310,7 +310,6 @@ ACTOR Future<bool> excludeCommandActor(Reference<IDatabase> db, std::vector<Stri
return true;
} else {
state std::vector<AddressExclusion> exclusionVector;
state std::set<AddressExclusion> exclusionSet;
state std::vector<AddressExclusion> exclusionAddresses;
state std::unordered_set<std::string> exclusionLocalities;
@ -319,9 +318,11 @@ ACTOR Future<bool> excludeCommandActor(Reference<IDatabase> db, std::vector<Stri
state bool waitForAllExcluded = true;
state bool markFailed = false;
state std::vector<ProcessData> workers;
bool result = wait(fdb_cli::getWorkers(db, &workers));
if (!result)
return false;
state std::map<std::string, StorageServerInterface> server_interfaces;
state Future<bool> future_workers = fdb_cli::getWorkers(db, &workers);
state Future<Void> future_server_interfaces = fdb_cli::getStorageServerInterfaces(db, &server_interfaces);
wait(success(future_workers) && success(future_server_interfaces));
for (auto t = tokens.begin() + 1; t != tokens.end(); ++t) {
if (*t == "FORCE"_sr) {
force = true;
@ -331,15 +332,21 @@ ACTOR Future<bool> excludeCommandActor(Reference<IDatabase> db, std::vector<Stri
markFailed = true;
} else if (t->startsWith(LocalityData::ExcludeLocalityPrefix) &&
t->toString().find(':') != std::string::npos) {
std::set<AddressExclusion> localityAddresses = getAddressesByLocality(workers, t->toString());
if (localityAddresses.empty()) {
exclusionLocalities.insert(t->toString());
auto localityAddresses = getAddressesByLocality(workers, t->toString());
auto localityServerAddresses = getServerAddressesByLocality(server_interfaces, t->toString());
if (localityAddresses.empty() && localityServerAddresses.empty()) {
noMatchLocalities.push_back(t->toString());
} else {
// add all the server ipaddresses that belong to the given localities to the exclusionSet.
exclusionVector.insert(exclusionVector.end(), localityAddresses.begin(), localityAddresses.end());
continue;
}
if (!localityAddresses.empty()) {
exclusionSet.insert(localityAddresses.begin(), localityAddresses.end());
}
exclusionLocalities.insert(t->toString());
if (!localityServerAddresses.empty()) {
exclusionSet.insert(localityServerAddresses.begin(), localityServerAddresses.end());
}
} else {
auto a = AddressExclusion::parse(*t);
if (!a.isValid()) {
@ -350,13 +357,12 @@ ACTOR Future<bool> excludeCommandActor(Reference<IDatabase> db, std::vector<Stri
printf(" Do not include the `:tls' suffix when naming a process\n");
return true;
}
exclusionVector.push_back(a);
exclusionSet.insert(a);
exclusionAddresses.push_back(a);
}
}
if (exclusionAddresses.empty() && exclusionLocalities.empty()) {
if (exclusionSet.empty()) {
fprintf(stderr, "ERROR: At least one valid network endpoint address or a locality is not provided\n");
return false;
}
@ -374,14 +380,14 @@ ACTOR Future<bool> excludeCommandActor(Reference<IDatabase> db, std::vector<Stri
warn.cancel();
state std::set<NetworkAddress> notExcludedServers =
wait(checkForExcludingServers(db, exclusionVector, waitForAllExcluded));
wait(checkForExcludingServers(db, exclusionSet, waitForAllExcluded));
std::map<IPAddress, std::set<uint16_t>> workerPorts;
for (auto addr : workers)
workerPorts[addr.address.ip].insert(addr.address.port);
// Print a list of all excluded addresses that don't have a corresponding worker
std::set<AddressExclusion> absentExclusions;
for (const auto& addr : exclusionVector) {
for (const auto& addr : exclusionSet) {
auto worker = workerPorts.find(addr.ip);
if (worker == workerPorts.end())
absentExclusions.insert(addr);
@ -389,7 +395,7 @@ ACTOR Future<bool> excludeCommandActor(Reference<IDatabase> db, std::vector<Stri
absentExclusions.insert(addr);
}
for (const auto& exclusion : exclusionVector) {
for (const auto& exclusion : exclusionSet) {
if (absentExclusions.find(exclusion) != absentExclusions.end()) {
if (exclusion.port == 0) {
fprintf(stderr,
@ -437,7 +443,7 @@ ACTOR Future<bool> excludeCommandActor(Reference<IDatabase> db, std::vector<Stri
locality.c_str());
}
wait(checkForCoordinators(db, exclusionVector));
wait(checkForCoordinators(db, exclusionSet));
return true;
}
}

View File

@ -797,20 +797,6 @@ bool DatabaseConfiguration::isExcludedLocality(const LocalityData& locality) con
return false;
}
// checks if this machineid of given locality is excluded.
bool DatabaseConfiguration::isMachineExcluded(const LocalityData& locality) const {
if (locality.machineId().present()) {
return get(encodeExcludedLocalityKey(LocalityData::ExcludeLocalityKeyMachineIdPrefix.toString() +
locality.machineId().get().toString()))
.present() ||
get(encodeFailedLocalityKey(LocalityData::ExcludeLocalityKeyMachineIdPrefix.toString() +
locality.machineId().get().toString()))
.present();
}
return false;
}
// Gets the list of already excluded localities (with failed option)
std::set<std::string> DatabaseConfiguration::getExcludedLocalities() const {
// TODO: revisit all const_cast usages
@ -897,4 +883,4 @@ TEST_CASE("/fdbclient/databaseConfiguration/overwriteCommitProxy") {
ASSERT(conf1.getDesiredCommitProxies() == conf2.getDesiredCommitProxies());
return Void();
}
}

View File

@ -2146,21 +2146,56 @@ std::pair<std::string, std::string> decodeLocality(const std::string& locality)
return std::make_pair("", "");
}
// Returns the list of IPAddresses of the servers that match the given locality.
// Example: locality="dcid:primary" returns all the ip addresses of the servers in the primary dc.
std::set<AddressExclusion> getServerAddressesByLocality(
const std::map<std::string, StorageServerInterface> server_interfaces,
const std::string& locality) {
std::pair<std::string, std::string> locality_key_value = decodeLocality(locality);
std::set<AddressExclusion> locality_addresses;
for (auto& server : server_interfaces) {
auto locality_value = server.second.locality.get(locality_key_value.first);
if (!locality_value.present()) {
continue;
}
if (locality_value.get() != locality_key_value.second) {
continue;
}
auto primary_address = server.second.address();
locality_addresses.insert(AddressExclusion(primary_address.ip, primary_address.port));
if (server.second.secondaryAddress().present()) {
auto secondary_address = server.second.secondaryAddress().get();
locality_addresses.insert(AddressExclusion(secondary_address.ip, secondary_address.port));
}
}
return locality_addresses;
}
// Returns the list of IPAddresses of the workers that match the given locality.
// Example: locality="locality_dcid:primary" returns all the ip addresses of the workers in the primary dc.
std::set<AddressExclusion> getAddressesByLocality(const std::vector<ProcessData>& workers,
const std::string& locality) {
std::pair<std::string, std::string> localityKeyValue = decodeLocality(locality);
std::pair<std::string, std::string> locality_key_value = decodeLocality(locality);
std::set<AddressExclusion> locality_addresses;
std::set<AddressExclusion> localityAddresses;
for (int i = 0; i < workers.size(); i++) {
auto localityValue = workers[i].locality.get(localityKeyValue.first);
if (localityValue.present() && localityValue.get() == localityKeyValue.second) {
localityAddresses.insert(AddressExclusion(workers[i].address.ip, workers[i].address.port));
auto locality_value = workers[i].locality.get(locality_key_value.first);
if (!locality_value.present()) {
continue;
}
if (locality_value.get() != locality_key_value.second) {
continue;
}
locality_addresses.insert(AddressExclusion(workers[i].address.ip, workers[i].address.port));
}
return localityAddresses;
return locality_addresses;
}
ACTOR Future<Void> printHealthyZone(Database cx) {

View File

@ -1219,7 +1219,12 @@ ACTOR Future<RangeResult> ExclusionInProgressActor(ReadYourWritesTransaction* ry
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); // necessary?
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
state std::vector<AddressExclusion> excl = wait((getAllExcludedServers(&tr)));
state Future<std::vector<AddressExclusion>> fExclusions = getAllExcludedServers(&tr);
state Future<std::vector<std::string>> fExcludedLocalities = getAllExcludedLocalities(&tr);
wait(success(fExclusions) && success(fExcludedLocalities));
state std::vector<AddressExclusion> excl = fExclusions.get();
state std::set<AddressExclusion> exclusions(excl.begin(), excl.end());
state std::set<NetworkAddress> inProgressExclusion;
// Just getting a consistent read version proves that a set of tlogs satisfying the exclusions has completed
@ -1227,18 +1232,48 @@ ACTOR Future<RangeResult> ExclusionInProgressActor(ReadYourWritesTransaction* ry
state RangeResult serverList = wait(tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY);
// We have to make use of the localities here to verify if a server is still in the server list,
// even if it might be missing in the workers as the server is not running anymore.
state std::vector<std::string> excludedLocalities = fExcludedLocalities.get();
// Decode the excluded localities to check if any server is excluded by locality.
state std::vector<std::pair<std::string, std::string>> decodedExcludedLocalities;
for (auto& excludedLocality : excludedLocalities) {
decodedExcludedLocalities.push_back(decodeLocality(excludedLocality));
}
for (auto& s : serverList) {
auto addresses = decodeServerListValue(s.value).getKeyValues.getEndpoint().addresses;
auto decodedServer = decodeServerListValue(s.value);
auto addresses = decodedServer.getKeyValues.getEndpoint().addresses;
if (addressExcluded(exclusions, addresses.address)) {
inProgressExclusion.insert(addresses.address);
}
if (addresses.secondaryAddress.present() && addressExcluded(exclusions, addresses.secondaryAddress.get())) {
inProgressExclusion.insert(addresses.secondaryAddress.get());
}
// Check if the server is excluded based on a locality.
for (auto& excludedLocality : decodedExcludedLocalities) {
if (!decodedServer.locality.isPresent(excludedLocality.first)) {
continue;
}
if (decodedServer.locality.get(excludedLocality.first) != excludedLocality.second) {
continue;
}
inProgressExclusion.insert(addresses.address);
if (addresses.secondaryAddress.present()) {
inProgressExclusion.insert(addresses.secondaryAddress.get());
}
}
}
Optional<Standalone<StringRef>> value = wait(tr.get(logsKey));
ASSERT(value.present());
// TODO(jscheuermann): The logs key range doesn't hold any information about localities. This is a limitation
// for locality based exclusions. The problematic edge case here is a log server that still has mutation on it
// but is currently not part of the worker list, e.g. because it was shutdown or is partitioned.
auto logs = decodeLogsValue(value.get());
for (auto const& log : logs.first) {
if (log.second == NetworkAddress() || addressExcluded(exclusions, log.second)) {
@ -1264,6 +1299,7 @@ ACTOR Future<RangeResult> ExclusionInProgressActor(ReadYourWritesTransaction* ry
result.arena().dependsOn(addrKey.arena());
}
}
return result;
}

View File

@ -266,7 +266,6 @@ struct DatabaseConfiguration {
// Excluded servers (no state should be here)
bool isExcludedServer(NetworkAddressList, const LocalityData& locality) const;
bool isExcludedLocality(const LocalityData& locality) const;
bool isMachineExcluded(const LocalityData& locality) const;
std::set<AddressExclusion> getExcludedServers() const;
std::set<std::string> getExcludedLocalities() const;

View File

@ -108,11 +108,17 @@ ACTOR Future<std::vector<std::string>> getExcludedLocalityList(Transaction* tr);
// Get the current list of failed localities.
ACTOR Future<std::vector<std::string>> getExcludedFailedLocalityList(Transaction* tr);
// Decodes the locality string to a pair of locality prefix and its value.
// The prefix could be dcid, processid, machineid, processid.
std::pair<std::string, std::string> decodeLocality(const std::string& locality);
std::set<AddressExclusion> getServerAddressesByLocality(
const std::map<std::string, StorageServerInterface> server_interfaces,
const std::string& locality);
std::set<AddressExclusion> getAddressesByLocality(const std::vector<ProcessData>& workers, const std::string& locality);
// Check for the given, previously excluded servers to be evacuated (no longer used for state). If waitForExclusion is
// true, this actor returns once it is safe to shut down all such machines without impacting fault tolerance, until and
// unless any of them are explicitly included with includeServers()
// Check for the given, previously excluded servers to be evacuated (no longer used for state). If waitForExclusion
// is true, this actor returns once it is safe to shut down all such machines without impacting fault tolerance,
// until and unless any of them are explicitly included with includeServers()
ACTOR Future<std::set<NetworkAddress>> checkForExcludingServers(Database cx,
std::vector<AddressExclusion> servers,
bool waitForAllExcluded);

View File

@ -26,7 +26,6 @@ const StringRef LocalityData::keyZoneId = "zoneid"_sr;
const StringRef LocalityData::keyDcId = "dcid"_sr;
const StringRef LocalityData::keyMachineId = "machineid"_sr;
const StringRef LocalityData::keyDataHallId = "data_hall"_sr;
const StringRef LocalityData::ExcludeLocalityKeyMachineIdPrefix = "locality_machineid:"_sr;
const StringRef LocalityData::ExcludeLocalityPrefix = "locality_"_sr;
ProcessClass::Fitness ProcessClass::machineClassFitness(ClusterRole role) const {

View File

@ -397,7 +397,6 @@ public:
}
static const UID UNSET_ID;
static const StringRef ExcludeLocalityKeyMachineIdPrefix;
static const StringRef ExcludeLocalityPrefix;
};