diff --git a/documentation/sphinx/source/administration.rst b/documentation/sphinx/source/administration.rst index 553375c979..6df97325ec 100644 --- a/documentation/sphinx/source/administration.rst +++ b/documentation/sphinx/source/administration.rst @@ -182,7 +182,7 @@ You can add new machines to a cluster at any time: user@host2$ sudo service foundationdb restart -5) If you have previously :ref:`excluded ` a machine from the cluster, you will need to take it off the exclusion list using the ``include `` command of fdbcli before it can be a full participant in the cluster. +5) If you have previously :ref:`excluded ` a machine from the cluster, you will need to take it off the exclusion list using the ``include `` or ``include `` command of fdbcli before it can be a full participant in the cluster. .. note:: Addresses have the form ``IP``:``PORT``. This form is used even if TLS is enabled. @@ -207,12 +207,12 @@ To temporarily or permanently remove one or more machines from a FoundationDB cl The database is available. Welcome to the fdbcli. For help, type `help'. - fdb> exclude 1.2.3.4 1.2.3.5 1.2.3.6 + fdb> exclude 1.2.3.4 1.2.3.5 1.2.3.6 locality_dcid:primary-satellite locality_zoneid:primary-satellite-log-2 locality_machineid:primary-stateless-1 locality_processid:223be2da244ca0182375364e4d122c30 or any locality Waiting for state to be removed from all excluded servers. This may take a while. It is now safe to remove these machines or processes from the cluster. -``exclude`` can be used to exclude either machines (by specifying an IP address) or individual processes (by specifying an ``IP``:``PORT`` pair). +``exclude`` can be used to exclude either machines (by specifying an IP address) or individual processes (by specifying an ``IP``:``PORT`` pair or by specifying a locality match). .. note:: Addresses have the form ``IP``:``PORT``. This form is used even if TLS is enabled. diff --git a/documentation/sphinx/source/command-line-interface.rst b/documentation/sphinx/source/command-line-interface.rst index 78fd074db0..c7d717d799 100644 --- a/documentation/sphinx/source/command-line-interface.rst +++ b/documentation/sphinx/source/command-line-interface.rst @@ -139,9 +139,9 @@ For more information on setting the cluster description, see :ref:`configuration exclude ------- -The ``exclude`` command excludes servers from the database or marks them as failed. Its syntax is ``exclude [failed] ``. If no addresses are specified, the command provides the set of excluded and failed servers. +The ``exclude`` command excludes servers from the database or marks them as failed. Its syntax is ``exclude [failed] [] [locality_dcid:] [locality_zoneid:] [locality_machineid:] [locality_processid:] or any locality``. If no addresses are specified, the command provides the set of excluded and failed servers and localities. -For each IP address or IP:port pair in ````, the command adds the address to the set of excluded servers. It then waits until all database state has been safely moved off the specified servers. +For each IP address or IP:port pair in ```` or locality (which include anything set on LocalityData like dcid, zoneid, machineid, processid), the command adds the address/locality to the set of excluded servers and localities. It then waits until all database state has been safely moved off the specified servers. If the ``failed`` keyword is specified, the address is marked as failed and added to the set of failed servers. It will not wait for the database state to move off the specified servers. @@ -246,15 +246,15 @@ The following options are available for use with the ``option`` command: include ------- -The ``include`` command permits previously excluded or failed servers to rejoin the database. Its syntax is ``include [failed] all|``. +The ``include`` command permits previously excluded or failed servers/localities to rejoin the database. Its syntax is ``include [failed] all|[] [locality_dcid:] [locality_zoneid:] [locality_machineid:] [locality_processid:] or any locality``. The ``failed`` keyword is required if the servers were previously marked as failed rather than excluded. -If ``all`` is specified, the excluded servers list is cleared. This will not clear the failed servers list. +If ``all`` is specified, the excluded servers and localities list is cleared. This will not clear the failed servers and localities list. -If ``failed all`` or ``all failed`` is specified, the failed servers list is cleared. This will not clear the excluded servers list. +If ``failed all`` or ``all failed`` is specified, the failed servers and localities list is cleared. This will not clear the excluded servers and localities list. -For each IP address or IP:port pair in ````, the command removes any matching exclusions from the excluded servers list. (A specified IP will match all ``IP:*`` exclusion entries). +For each IP address or IP:port pair in ```` or locality, the command removes any matching exclusions from the excluded servers/localities list. (A specified IP will match all ``IP:*`` exclusion entries). For information on adding machines to a cluster, see :ref:`adding-machines-to-a-cluster`. diff --git a/documentation/sphinx/source/moving-a-cluster.rst b/documentation/sphinx/source/moving-a-cluster.rst index 56cbcc27e0..0e6f49ef70 100644 --- a/documentation/sphinx/source/moving-a-cluster.rst +++ b/documentation/sphinx/source/moving-a-cluster.rst @@ -38,7 +38,7 @@ $ sudo service foundationdb stop 7. Exclude the original machines from the cluster using ``exclude`` in ``fdbcli``. This command will not return until all database state has been moved off of the original machines and fully replicated to the new machines. For example:: - fdb> exclude 192.168.1.1:4500 192.168.1.2:4500 192.168.1.3:4500 + fdb> exclude 192.168.1.1:4500 192.168.1.2:4500 192.168.1.3:4500 locality_dcid:primary-satellite locality_zoneid:primary-satellite-log-2 locality_machineid:primary-stateless-1 locality_processid:223be2da244ca0182375364e4d122c30 or any locality 8. Run ``coordinators auto`` in ``fdbcli`` to move coordination state to the new machines. Please note that this will cause the fdb.cluster file to be updated with the addresses of the new machines. Any currently connected clients will be notified and (assuming they have appropriate file system :ref:`permissions `) will update their own copy of the cluster file. As long as the original machines are still running, any clients that connect to them will be automatically forwarded to the new cluster coordinators. However, if you have a client that has not yet connected or only connects intermittently, you will need to copy the new cluster file from one of the new machines to the client machine. diff --git a/documentation/sphinx/source/mr-status-json-schemas.rst.inc b/documentation/sphinx/source/mr-status-json-schemas.rst.inc index 34ffcfc4c5..1eb6b3db82 100644 --- a/documentation/sphinx/source/mr-status-json-schemas.rst.inc +++ b/documentation/sphinx/source/mr-status-json-schemas.rst.inc @@ -695,7 +695,8 @@ "coordinators_count":1, "excluded_servers":[ { - "address":"10.0.4.1" + "address":"10.0.4.1", + "locality":"locality_processid:e9816ca4a89ff64ddb7ba2a5ec10b75b" } ], "auto_commit_proxies":3, diff --git a/documentation/sphinx/source/special-keys.rst b/documentation/sphinx/source/special-keys.rst index 4d8abdf177..4d80169f6f 100644 --- a/documentation/sphinx/source/special-keys.rst +++ b/documentation/sphinx/source/special-keys.rst @@ -201,10 +201,16 @@ that process, and wait for necessary data to be moved away. #. ``\xff\xff/management/consistency_check_suspended`` Read/write. Set or read this key will set or read the underlying system key ``\xff\x02/ConsistencyCheck/Suspend``. The value of this special key is unused thus if present, will be empty. In particular, if the key exists, then consistency is suspended. For more details, see help text of ``fdbcli`` command ``consistencycheck``. #. ``\xff\xff/management/db_locked`` Read/write. A single key that can be read and modified. Set the key will lock the database and clear the key will unlock. If the database is already locked, then the commit will fail with the ``special_keys_api_failure`` error. For more details, see help text of ``fdbcli`` command ``lock`` and ``unlock``. #. ``\xff\xff/management/auto_coordinators`` Read-only. A single key, if read, will return a set of processes which is able to satisfy the current redundency level and serve as new coordinators. The return value is formatted as a comma delimited string of network addresses of coordinators, i.e. ``,,...,``. +#. ``\xff\xff/management/excluded_locality/`` Read/write. Indicates that the cluster should move data away from processes matching ````, so that they can be safely removed. See :ref:`removing machines from a cluster ` for documentation for the corresponding fdbcli command. +#. ``\xff\xff/management/failed_locality/`` Read/write. Indicates that the cluster should consider matching processes as permanently failed. This allows the cluster to avoid maintaining extra state and doing extra work in the hope that these processes come back. See :ref:`removing machines from a cluster ` for documentation for the corresponding fdbcli command. +#. ``\xff\xff/management/options/excluded_locality/force`` Read/write. Setting this key disables safety checks for writes to ``\xff\xff/management/excluded_locality/``. Setting this key only has an effect in the current transaction and is not persisted on commit. +#. ``\xff\xff/management/options/failed_locality/force`` Read/write. Setting this key disables safety checks for writes to ``\xff\xff/management/failed_locality/``. Setting this key only has an effect in the current transaction and is not persisted on commit. An exclusion is syntactically either an ip address (e.g. ``127.0.0.1``), or -an ip address and port (e.g. ``127.0.0.1:4500``). If no port is specified, -then all processes on that host match the exclusion. +an ip address and port (e.g. ``127.0.0.1:4500``) or any locality (e.g ``locality_dcid:primary-satellite`` or +``locality_zoneid:primary-satellite-log-2`` or ``locality_machineid:primary-stateless-1`` or ``locality_processid:223be2da244ca0182375364e4d122c30``). +If no port is specified, then all processes on that host match the exclusion. +For locality, all processes that match the given locality are excluded. Configuration module -------------------- diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp index fa388109f9..7e7abd2e3c 100644 --- a/fdbcli/fdbcli.actor.cpp +++ b/fdbcli/fdbcli.actor.cpp @@ -546,21 +546,26 @@ void initHelp() { "10.0.0.1:4000 10.0.0.2:4000 10.0.0.3:4000\n\nIf 'description=desc' is specified then the description field in " "the cluster\nfile is changed to desc, which must match [A-Za-z0-9_]+."); helpMap["exclude"] = CommandHelp( - "exclude [FORCE] [failed] [no_wait] ", - "exclude servers from the database", - "If no addresses are specified, lists the set of excluded servers.\n\nFor each IP address or " - "IP:port pair in , adds the address to the set of excluded servers then waits until all " - "database state has been safely moved away from the specified servers. If 'no_wait' is set, the " + "exclude [FORCE] [failed] [no_wait] [] [locality_dcid:] " + "[locality_zoneid:] [locality_machineid:] " + "[locality_processid:] or any locality data", + "exclude servers from the database either with IP address match or locality match", + "If no addresses or locaities are specified, lists the set of excluded addresses and localities." + "\n\nFor each IP address or IP:port pair in or any LocalityData attributes (like dcid, zoneid, " + "machineid, processid), adds the address/locality to the set of excluded servers and localities then waits " + "until all database state has been safely moved away from the specified servers. If 'no_wait' is set, the " "command returns \nimmediately without checking if the exclusions have completed successfully.\n" "If 'FORCE' is set, the command does not perform safety checks before excluding.\n" "If 'failed' is set, the transaction log queue is dropped pre-emptively before waiting\n" "for data movement to finish and the server cannot be included again."); - helpMap["include"] = - CommandHelp("include all|", - "permit previously-excluded servers to rejoin the database", - "If `all' is specified, the excluded servers list is cleared.\n\nFor each IP address or IP:port " - "pair in , removes any matching exclusions from the excluded servers list. (A " - "specified IP will match all IP:* exclusion entries)"); + helpMap["include"] = CommandHelp( + "include all|[] [locality_dcid:] [locality_zoneid:] " + "[locality_machineid:] [locality_processid:] or any locality data", + "permit previously-excluded servers and localities to rejoin the database", + "If `all' is specified, the excluded servers and localities list is cleared.\n\nFor each IP address or IP:port " + "pair in or any LocalityData (like dcid, zoneid, machineid, processid), removes any " + "matching exclusions from the excluded servers and localities list. " + "(A specified IP will match all IP:* exclusion entries)"); helpMap["setclass"] = CommandHelp("setclass [
]", "change the class of a process", @@ -2388,19 +2393,26 @@ ACTOR Future coordinators(Database db, std::vector tokens, bool return err; } +// Includes the servers that could be IP addresses or localities back to the cluster. ACTOR Future include(Database db, std::vector tokens) { std::vector addresses; - bool failed = false; - bool all = false; + state std::vector localities; + state bool failed = false; + state bool all = false; for (auto t = tokens.begin() + 1; t != tokens.end(); ++t) { if (*t == LiteralStringRef("all")) { all = true; } else if (*t == LiteralStringRef("failed")) { failed = true; + } else if (t->startsWith(LocalityData::ExcludeLocalityPrefix) && t->toString().find(':') != std::string::npos) { + // if the token starts with 'locality_' prefix. + localities.push_back(t->toString()); } else { auto a = AddressExclusion::parse(*t); if (!a.isValid()) { - fprintf(stderr, "ERROR: '%s' is not a valid network endpoint address\n", t->toString().c_str()); + fprintf(stderr, + "ERROR: '%s' is neither a valid network endpoint address nor a locality\n", + t->toString().c_str()); if (t->toString().find(":tls") != std::string::npos) printf(" Do not include the `:tls' suffix when naming a process\n"); return true; @@ -2412,8 +2424,15 @@ ACTOR Future include(Database db, std::vector tokens) { std::vector includeAll; includeAll.push_back(AddressExclusion()); wait(makeInterruptable(includeServers(db, includeAll, failed))); + wait(makeInterruptable(includeLocalities(db, localities, failed, all))); } else { - wait(makeInterruptable(includeServers(db, addresses, failed))); + if (!addresses.empty()) { + wait(makeInterruptable(includeServers(db, addresses, failed))); + } + if (!localities.empty()) { + // includes the servers that belong to given localities. + wait(makeInterruptable(includeLocalities(db, localities, failed, all))); + } } return false; }; @@ -2423,16 +2442,25 @@ ACTOR Future exclude(Database db, Reference ccf, Future warn) { if (tokens.size() <= 1) { - vector excl = wait(makeInterruptable(getExcludedServers(db))); - if (!excl.size()) { - printf("There are currently no servers excluded from the database.\n" + state Future> fexclAddresses = makeInterruptable(getExcludedServers(db)); + state Future> fexclLocalities = makeInterruptable(getExcludedLocalities(db)); + + wait(success(fexclAddresses) && success(fexclLocalities)); + vector exclAddresses = fexclAddresses.get(); + vector exclLocalities = fexclLocalities.get(); + + if (!exclAddresses.size() && !exclLocalities.size()) { + printf("There are currently no servers or localities excluded from the database.\n" "To learn how to exclude a server, type `help exclude'.\n"); return false; } - printf("There are currently %zu servers or processes being excluded from the database:\n", excl.size()); - for (const auto& e : excl) + printf("There are currently %zu servers or localities being excluded from the database:\n", + exclAddresses.size() + exclLocalities.size()); + for (const auto& e : exclAddresses) printf(" %s\n", e.toString().c_str()); + for (const auto& e : exclLocalities) + printf(" %s\n", e.c_str()); printf("To find out whether it is safe to remove one or more of these\n" "servers from the cluster, type `exclude '.\n" @@ -2442,9 +2470,13 @@ ACTOR Future exclude(Database db, } else { state std::vector exclusionVector; state std::set exclusionSet; - bool force = false; + state std::vector exclusionAddresses; + state std::unordered_set exclusionLocalities; + state std::vector noMatchLocalities; + state bool force = false; state bool waitForAllExcluded = true; state bool markFailed = false; + state std::vector workers = wait(makeInterruptable(getWorkers(db))); for (auto t = tokens.begin() + 1; t != tokens.end(); ++t) { if (*t == LiteralStringRef("FORCE")) { force = true; @@ -2452,19 +2484,38 @@ ACTOR Future exclude(Database db, waitForAllExcluded = false; } else if (*t == LiteralStringRef("failed")) { markFailed = true; + } else if (t->startsWith(LocalityData::ExcludeLocalityPrefix) && + t->toString().find(':') != std::string::npos) { + std::set localityAddresses = getAddressesByLocality(workers, t->toString()); + if (localityAddresses.empty()) { + noMatchLocalities.push_back(t->toString()); + } else { + // add all the server ipaddresses that belong to the given localities to the exclusionSet. + exclusionVector.insert(exclusionVector.end(), localityAddresses.begin(), localityAddresses.end()); + exclusionSet.insert(localityAddresses.begin(), localityAddresses.end()); + } + exclusionLocalities.insert(t->toString()); } else { auto a = AddressExclusion::parse(*t); if (!a.isValid()) { - fprintf(stderr, "ERROR: '%s' is not a valid network endpoint address\n", t->toString().c_str()); + fprintf(stderr, + "ERROR: '%s' is neither a valid network endpoint address nor a locality\n", + t->toString().c_str()); if (t->toString().find(":tls") != std::string::npos) printf(" Do not include the `:tls' suffix when naming a process\n"); return true; } exclusionVector.push_back(a); exclusionSet.insert(a); + exclusionAddresses.push_back(a); } } + if (exclusionAddresses.empty() && exclusionLocalities.empty()) { + fprintf(stderr, "ERROR: At least one valid network endpoint address or a locality is not provided\n"); + return true; + } + if (!force) { if (markFailed) { state bool safe; @@ -2576,7 +2627,12 @@ ACTOR Future exclude(Database db, } } - wait(makeInterruptable(excludeServers(db, exclusionVector, markFailed))); + if (!exclusionAddresses.empty()) { + wait(makeInterruptable(excludeServers(db, exclusionAddresses, markFailed))); + } + if (!exclusionLocalities.empty()) { + wait(makeInterruptable(excludeLocalities(db, exclusionLocalities, markFailed))); + } if (waitForAllExcluded) { printf("Waiting for state to be removed from all excluded servers. This may take a while.\n"); @@ -2588,7 +2644,6 @@ ACTOR Future exclude(Database db, state std::set notExcludedServers = wait(makeInterruptable(checkForExcludingServers(db, exclusionVector, waitForAllExcluded))); - std::vector workers = wait(makeInterruptable(getWorkers(db))); std::map> workerPorts; for (auto addr : workers) workerPorts[addr.address.ip].insert(addr.address.port); @@ -2643,6 +2698,14 @@ ACTOR Future exclude(Database db, } } + for (const auto& locality : noMatchLocalities) { + fprintf( + stderr, + " %s ---- WARNING: Currently no servers found with this locality match! Be sure that you excluded " + "the correct locality.\n", + locality.c_str()); + } + bool foundCoordinator = false; auto ccs = ClusterConnectionFile(ccf->getFilename()).getConnectionString(); for (const auto& c : ccs.coordinators()) { diff --git a/fdbclient/DatabaseConfiguration.cpp b/fdbclient/DatabaseConfiguration.cpp index db99a2d780..e0817fcdf5 100644 --- a/fdbclient/DatabaseConfiguration.cpp +++ b/fdbclient/DatabaseConfiguration.cpp @@ -632,6 +632,57 @@ std::set DatabaseConfiguration::getExcludedServers() const { return addrs; } +// checks if the locality is excluded or not by checking if the key is present. +bool DatabaseConfiguration::isExcludedLocality(const LocalityData& locality) const { + std::map localityData = locality.getAllData(); + for (const auto& l : localityData) { + if (get(StringRef(encodeExcludedLocalityKey(LocalityData::ExcludeLocalityPrefix.toString() + l.first + ":" + + l.second))) + .present() || + get(StringRef( + encodeFailedLocalityKey(LocalityData::ExcludeLocalityPrefix.toString() + l.first + ":" + l.second))) + .present()) { + return true; + } + } + + return false; +} + +// checks if this machineid of given locality is excluded. +bool DatabaseConfiguration::isMachineExcluded(const LocalityData& locality) const { + if (locality.machineId().present()) { + return get(encodeExcludedLocalityKey(LocalityData::ExcludeLocalityKeyMachineIdPrefix.toString() + + locality.machineId().get().toString())) + .present() || + get(encodeFailedLocalityKey(LocalityData::ExcludeLocalityKeyMachineIdPrefix.toString() + + locality.machineId().get().toString())) + .present(); + } + + return false; +} + +// Gets the list of already excluded localities (with failed option) +std::set DatabaseConfiguration::getExcludedLocalities() const { + // TODO: revisit all const_cast usages + const_cast(this)->makeConfigurationImmutable(); + std::set localities; + for (auto i = lower_bound(rawConfiguration, excludedLocalityKeys.begin); + i != rawConfiguration.end() && i->key < excludedLocalityKeys.end; + ++i) { + std::string l = decodeExcludedLocalityKey(i->key); + localities.insert(l); + } + for (auto i = lower_bound(rawConfiguration, failedLocalityKeys.begin); + i != rawConfiguration.end() && i->key < failedLocalityKeys.end; + ++i) { + std::string l = decodeFailedLocalityKey(i->key); + localities.insert(l); + } + return localities; +} + void DatabaseConfiguration::makeConfigurationMutable() { if (mutableConfiguration.present()) return; diff --git a/fdbclient/DatabaseConfiguration.h b/fdbclient/DatabaseConfiguration.h index 0df45ce228..1d0ceab5c3 100644 --- a/fdbclient/DatabaseConfiguration.h +++ b/fdbclient/DatabaseConfiguration.h @@ -248,7 +248,10 @@ struct DatabaseConfiguration { // Excluded servers (no state should be here) bool isExcludedServer(NetworkAddressList) const; + bool isExcludedLocality(const LocalityData& locality) const; + bool isMachineExcluded(const LocalityData& locality) const; std::set getExcludedServers() const; + std::set getExcludedLocalities() const; int32_t getDesiredCommitProxies() const { if (commitProxyCount == -1) diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index ab4ac07e00..25e31d1134 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -25,6 +25,7 @@ #include #include #include +#include #include "flow/Arena.h" #include "flow/flow.h" @@ -244,6 +245,11 @@ std::string describe(std::vector const& items, int max_items = -1) { return describeList(items, max_items); } +template +std::string describe(std::unordered_set const& items, int max_items = -1) { + return describeList(items, max_items); +} + template struct Traceable> : std::true_type { static std::string toString(const std::vector& value) { return describe(value); } diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index 1cc1dde6ce..18de758b78 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -40,6 +40,7 @@ #include "fdbrpc/ReplicationPolicy.h" #include "fdbrpc/Replication.h" #include "flow/actorcompiler.h" // This must be the last #include. +#include "fdbclient/Schemas.h" bool isInteger(const std::string& s) { if (s.empty()) @@ -1576,6 +1577,7 @@ ACTOR Future excludeServers(Database cx, vector servers, wait(ryw.commit()); return Void(); } catch (Error& e) { + TraceEvent("ExcludeServersError").error(e, true); wait(ryw.onError(e)); } } @@ -1587,6 +1589,70 @@ ACTOR Future excludeServers(Database cx, vector servers, wait(tr.commit()); return Void(); } catch (Error& e) { + TraceEvent("ExcludeServersError").error(e, true); + wait(tr.onError(e)); + } + } + } +} + +// excludes localities by setting the keys in api version below 7.0 +void excludeLocalities(Transaction& tr, std::unordered_set localities, bool failed) { + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + tr.setOption(FDBTransactionOptions::USE_PROVISIONAL_PROXIES); + std::string excludeVersionKey = deterministicRandom()->randomUniqueID().toString(); + auto localityVersionKey = failed ? failedLocalityVersionKey : excludedLocalityVersionKey; + tr.addReadConflictRange(singleKeyRange(localityVersionKey)); // To conflict with parallel includeLocalities + tr.set(localityVersionKey, excludeVersionKey); + for (const auto& l : localities) { + if (failed) { + tr.set(encodeFailedLocalityKey(l), StringRef()); + } else { + tr.set(encodeExcludedLocalityKey(l), StringRef()); + } + } + TraceEvent("ExcludeLocalitiesCommit").detail("Localities", describe(localities)).detail("ExcludeFailed", failed); +} + +// Exclude the servers matching the given set of localities from use as state servers. +// excludes localities by setting the keys. +ACTOR Future excludeLocalities(Database cx, std::unordered_set localities, bool failed) { + if (cx->apiVersionAtLeast(700)) { + state ReadYourWritesTransaction ryw(cx); + loop { + try { + ryw.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + ryw.set(SpecialKeySpace::getManagementApiCommandOptionSpecialKey( + failed ? "failed_locality" : "excluded_locality", "force"), + ValueRef()); + for (const auto& l : localities) { + Key addr = failed + ? SpecialKeySpace::getManagementApiCommandPrefix("failedlocality").withSuffix(l) + : SpecialKeySpace::getManagementApiCommandPrefix("excludedlocality").withSuffix(l); + ryw.set(addr, ValueRef()); + } + TraceEvent("ExcludeLocalitiesSpecialKeySpaceCommit") + .detail("Localities", describe(localities)) + .detail("ExcludeFailed", failed); + + wait(ryw.commit()); + return Void(); + } catch (Error& e) { + TraceEvent("ExcludeLocalitiesError").error(e, true); + wait(ryw.onError(e)); + } + } + } else { + state Transaction tr(cx); + loop { + try { + excludeLocalities(tr, localities, failed); + wait(tr.commit()); + return Void(); + } catch (Error& e) { + TraceEvent("ExcludeLocalitiesError").error(e, true); wait(tr.onError(e)); } } @@ -1694,6 +1760,92 @@ ACTOR Future includeServers(Database cx, vector servers, } } +// Remove the given localities from the exclusion list. +// include localities by clearing the keys. +ACTOR Future includeLocalities(Database cx, vector localities, bool failed, bool includeAll) { + state std::string versionKey = deterministicRandom()->randomUniqueID().toString(); + if (cx->apiVersionAtLeast(700)) { + state ReadYourWritesTransaction ryw(cx); + loop { + try { + ryw.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + if (includeAll) { + if (failed) { + ryw.clear(SpecialKeySpace::getManamentApiCommandRange("failedlocality")); + } else { + ryw.clear(SpecialKeySpace::getManamentApiCommandRange("excludedlocality")); + } + } else { + for (const auto& l : localities) { + Key locality = + failed ? SpecialKeySpace::getManagementApiCommandPrefix("failedlocality").withSuffix(l) + : SpecialKeySpace::getManagementApiCommandPrefix("excludedlocality").withSuffix(l); + ryw.clear(locality); + } + } + TraceEvent("IncludeLocalitiesCommit") + .detail("Localities", describe(localities)) + .detail("Failed", failed) + .detail("IncludeAll", includeAll); + + wait(ryw.commit()); + return Void(); + } catch (Error& e) { + TraceEvent("IncludeLocalitiesError").error(e, true); + wait(ryw.onError(e)); + } + } + } else { + state Transaction tr(cx); + loop { + try { + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + tr.setOption(FDBTransactionOptions::USE_PROVISIONAL_PROXIES); + + // includeLocalities might be used in an emergency transaction, so make sure it is + // retry-self-conflicting and CAUSAL_WRITE_RISKY + tr.setOption(FDBTransactionOptions::CAUSAL_WRITE_RISKY); + if (failed) { + tr.addReadConflictRange(singleKeyRange(failedLocalityVersionKey)); + tr.set(failedLocalityVersionKey, versionKey); + } else { + tr.addReadConflictRange(singleKeyRange(excludedLocalityVersionKey)); + tr.set(excludedLocalityVersionKey, versionKey); + } + + if (includeAll) { + if (failed) { + tr.clear(failedLocalityKeys); + } else { + tr.clear(excludedLocalityKeys); + } + } else { + for (const auto& l : localities) { + if (failed) { + tr.clear(encodeFailedLocalityKey(l)); + } else { + tr.clear(encodeExcludedLocalityKey(l)); + } + } + } + + TraceEvent("IncludeLocalitiesCommit") + .detail("Localities", describe(localities)) + .detail("Failed", failed) + .detail("IncludeAll", includeAll); + + wait(tr.commit()); + return Void(); + } catch (Error& e) { + TraceEvent("IncludeLocalitiesError").error(e, true); + wait(tr.onError(e)); + } + } + } +} + ACTOR Future setClass(Database cx, AddressExclusion server, ProcessClass processClass) { state Transaction tr(cx); @@ -1765,6 +1917,72 @@ ACTOR Future> getExcludedServers(Database cx) { } } +// Get the current list of excluded localities by reading the keys. +ACTOR Future> getExcludedLocalities(Transaction* tr) { + state RangeResult r = wait(tr->getRange(excludedLocalityKeys, CLIENT_KNOBS->TOO_MANY)); + ASSERT(!r.more && r.size() < CLIENT_KNOBS->TOO_MANY); + state RangeResult r2 = wait(tr->getRange(failedLocalityKeys, CLIENT_KNOBS->TOO_MANY)); + ASSERT(!r2.more && r2.size() < CLIENT_KNOBS->TOO_MANY); + + vector excludedLocalities; + for (const auto& i : r) { + auto a = decodeExcludedLocalityKey(i.key); + excludedLocalities.push_back(a); + } + for (const auto& i : r2) { + auto a = decodeFailedLocalityKey(i.key); + excludedLocalities.push_back(a); + } + uniquify(excludedLocalities); + return excludedLocalities; +} + +// Get the list of excluded localities by reading the keys. +ACTOR Future> getExcludedLocalities(Database cx) { + state Transaction tr(cx); + loop { + try { + tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + vector exclusions = wait(getExcludedLocalities(&tr)); + return exclusions; + } catch (Error& e) { + wait(tr.onError(e)); + } + } +} + +// Decodes the locality string to a pair of locality prefix and its value. +// The prefix could be dcid, processid, machineid, processid. +std::pair decodeLocality(const std::string& locality) { + StringRef localityRef(locality.c_str()); + std::string localityKeyValue = localityRef.removePrefix(LocalityData::ExcludeLocalityPrefix).toString(); + int split = localityKeyValue.find(':'); + if (split != std::string::npos) { + return std::make_pair(localityKeyValue.substr(0, split), localityKeyValue.substr(split + 1)); + } + + return std::make_pair("", ""); +} + +// Returns the list of IPAddresses of the workers that match the given locality. +// Example: locality="dcid:primary" returns all the ip addresses of the workers in the primary dc. +std::set getAddressesByLocality(const std::vector& workers, + const std::string& locality) { + std::pair localityKeyValue = decodeLocality(locality); + + std::set localityAddresses; + for (int i = 0; i < workers.size(); i++) { + if (workers[i].locality.isPresent(localityKeyValue.first) && + workers[i].locality.get(localityKeyValue.first) == localityKeyValue.second) { + localityAddresses.insert(AddressExclusion(workers[i].address.ip, workers[i].address.port)); + } + } + + return localityAddresses; +} + ACTOR Future printHealthyZone(Database cx) { state Transaction tr(cx); loop { diff --git a/fdbclient/ManagementAPI.actor.h b/fdbclient/ManagementAPI.actor.h index b66419c7ad..4cf46a3cf8 100644 --- a/fdbclient/ManagementAPI.actor.h +++ b/fdbclient/ManagementAPI.actor.h @@ -163,10 +163,21 @@ Reference nameQuorumChange(std::string const& name, Reference excludeServers(Database cx, vector servers, bool failed = false); void excludeServers(Transaction& tr, vector& servers, bool failed = false); +// Exclude the servers matching the given set of localities from use as state servers. Returns as soon as the change +// is durable, without necessarily waiting for the servers to be evacuated. +ACTOR Future excludeLocalities(Database cx, std::unordered_set localities, bool failed = false); +void excludeLocalities(Transaction& tr, std::unordered_set localities, bool failed = false); + // Remove the given servers from the exclusion list. A NetworkAddress with a port of 0 means all servers on the given // IP. A NetworkAddress() means all servers (don't exclude anything) ACTOR Future includeServers(Database cx, vector servers, bool failed = false); +// Remove the given localities from the exclusion list. +ACTOR Future includeLocalities(Database cx, + vector localities, + bool failed = false, + bool includeAll = false); + // Set the process class of processes with the given address. A NetworkAddress with a port of 0 means all servers on // the given IP. ACTOR Future setClass(Database cx, AddressExclusion server, ProcessClass processClass); @@ -175,6 +186,12 @@ ACTOR Future setClass(Database cx, AddressExclusion server, ProcessClass p ACTOR Future> getExcludedServers(Database cx); ACTOR Future> getExcludedServers(Transaction* tr); +// Get the current list of excluded localities +ACTOR Future> getExcludedLocalities(Database cx); +ACTOR Future> getExcludedLocalities(Transaction* tr); + +std::set getAddressesByLocality(const std::vector& workers, const std::string& locality); + // Check for the given, previously excluded servers to be evacuated (no longer used for state). If waitForExclusion is // true, this actor returns once it is safe to shut down all such machines without impacting fault tolerance, until and // unless any of them are explicitly included with includeServers() diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index e09c1915f7..6343169a6a 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1159,6 +1159,14 @@ DatabaseContext::DatabaseContext(Reference(SpecialKeySpace::getManamentApiCommandRange("failed"))); + registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::MANAGEMENT, + SpecialKeySpace::IMPLTYPE::READWRITE, + std::make_unique( + SpecialKeySpace::getManamentApiCommandRange("excludedlocality"))); + registerSpecialKeySpaceModule( + SpecialKeySpace::MODULE::MANAGEMENT, + SpecialKeySpace::IMPLTYPE::READWRITE, + std::make_unique(SpecialKeySpace::getManamentApiCommandRange("failedlocality"))); registerSpecialKeySpaceModule( SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READONLY, diff --git a/fdbclient/Schemas.cpp b/fdbclient/Schemas.cpp index 2e4ec84dbd..0cecd29250 100644 --- a/fdbclient/Schemas.cpp +++ b/fdbclient/Schemas.cpp @@ -747,7 +747,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema( "coordinators_count":1, "excluded_servers":[ { - "address":"10.0.4.1" + "address":"10.0.4.1", + "locality":"locality_processid:e9816ca4a89ff64ddb7ba2a5ec10b75b" } ], "auto_commit_proxies":3, diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index d51d27a23c..fe1a2d5409 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -77,6 +77,12 @@ std::unordered_map SpecialKeySpace::managementApiCommandT { "failed", KeyRangeRef(LiteralStringRef("failed/"), LiteralStringRef("failed0")) .withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }, + { "excludedlocality", + KeyRangeRef(LiteralStringRef("excluded_locality/"), LiteralStringRef("excluded_locality0")) + .withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }, + { "failedlocality", + KeyRangeRef(LiteralStringRef("failed_locality/"), LiteralStringRef("failed_locality0")) + .withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }, { "lock", singleKeyRange(LiteralStringRef("db_locked")).withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }, { "consistencycheck", singleKeyRange(LiteralStringRef("consistency_check_suspended")) @@ -98,7 +104,10 @@ std::unordered_map SpecialKeySpace::managementApiCommandT .withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) } }; -std::set SpecialKeySpace::options = { "excluded/force", "failed/force" }; +std::set SpecialKeySpace::options = { "excluded/force", + "failed/force", + "excluded_locality/force", + "failed_locality/force" }; std::set SpecialKeySpace::tracingOptions = { kTracingTransactionIdKey, kTracingTokenKey }; @@ -2110,3 +2119,158 @@ Future> DataDistributionImpl::commit(ReadYourWritesTransac } return msg; } + +// Clears the special management api keys excludeLocality and failedLocality. +void includeLocalities(ReadYourWritesTransaction* ryw) { + ryw->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + ryw->setOption(FDBTransactionOptions::LOCK_AWARE); + ryw->setOption(FDBTransactionOptions::USE_PROVISIONAL_PROXIES); + // includeLocalities might be used in an emergency transaction, so make sure it is retry-self-conflicting and + // CAUSAL_WRITE_RISKY + ryw->setOption(FDBTransactionOptions::CAUSAL_WRITE_RISKY); + std::string versionKey = deterministicRandom()->randomUniqueID().toString(); + // for excluded localities + auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges( + SpecialKeySpace::getManamentApiCommandRange("excludedlocality")); + Transaction& tr = ryw->getTransaction(); + for (auto& iter : ranges) { + auto entry = iter.value(); + if (entry.first && !entry.second.present()) { + tr.addReadConflictRange(singleKeyRange(excludedLocalityVersionKey)); + tr.set(excludedLocalityVersionKey, versionKey); + tr.clear(ryw->getDatabase()->specialKeySpace->decode(iter.range())); + } + } + // for failed localities + ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges( + SpecialKeySpace::getManamentApiCommandRange("failedlocality")); + for (auto& iter : ranges) { + auto entry = iter.value(); + if (entry.first && !entry.second.present()) { + tr.addReadConflictRange(singleKeyRange(failedLocalityVersionKey)); + tr.set(failedLocalityVersionKey, versionKey); + tr.clear(ryw->getDatabase()->specialKeySpace->decode(iter.range())); + } + } +} + +// Reads the excludedlocality and failed locality keys using managment api, +// parses them and returns the list. +bool parseLocalitiesFromKeys(ReadYourWritesTransaction* ryw, + bool failed, + std::unordered_set& localities, + std::vector& addresses, + std::set& exclusions, + std::vector& workers, + Optional& msg) { + KeyRangeRef range = failed ? SpecialKeySpace::getManamentApiCommandRange("failedlocality") + : SpecialKeySpace::getManamentApiCommandRange("excludedlocality"); + auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(range); + auto iter = ranges.begin(); + while (iter != ranges.end()) { + auto entry = iter->value(); + // only check for exclude(set) operation, include(clear) are not checked + TraceEvent(SevDebug, "ParseLocalities") + .detail("Valid", entry.first) + .detail("Set", entry.second.present()) + .detail("Key", iter->begin().toString()); + if (entry.first && entry.second.present()) { + Key locality = iter->begin().removePrefix(range.begin); + if (locality.startsWith(LocalityData::ExcludeLocalityPrefix) && + locality.toString().find(':') != std::string::npos) { + std::set localityAddresses = getAddressesByLocality(workers, locality.toString()); + if (!localityAddresses.empty()) { + std::copy(localityAddresses.begin(), localityAddresses.end(), back_inserter(addresses)); + exclusions.insert(localityAddresses.begin(), localityAddresses.end()); + } + + localities.insert(locality.toString()); + } else { + std::string error = "ERROR: \'" + locality.toString() + "\' is not a valid locality\n"; + msg = ManagementAPIError::toJsonString( + false, entry.second.present() ? (failed ? "exclude failed" : "exclude") : "include", error); + return false; + } + } + ++iter; + } + return true; +} + +// On commit, parses the special exclusion keys and get the localities to be excluded, check for exclusions +// and add them to the exclusion list. Also, clears the special management api keys with includeLocalities. +ACTOR Future> excludeLocalityCommitActor(ReadYourWritesTransaction* ryw, bool failed) { + state Optional result; + state std::unordered_set localities; + state std::vector addresses; + state std::set exclusions; + state std::vector workers = wait(getWorkers(&ryw->getTransaction())); + if (!parseLocalitiesFromKeys(ryw, failed, localities, addresses, exclusions, workers, result)) + return result; + // If force option is not set, we need to do safety check + auto force = ryw->getSpecialKeySpaceWriteMap()[SpecialKeySpace::getManagementApiCommandOptionSpecialKey( + failed ? "failed_locality" : "excluded_locality", "force")]; + // only do safety check when we have localities to be excluded and the force option key is not set + if (localities.size() && !(force.first && force.second.present())) { + bool safe = wait(checkExclusion(ryw->getDatabase(), &addresses, &exclusions, failed, &result)); + if (!safe) + return result; + } + + excludeLocalities(ryw->getTransaction(), localities, failed); + includeLocalities(ryw); + + return result; +} + +ExcludedLocalitiesRangeImpl::ExcludedLocalitiesRangeImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {} + +Future ExcludedLocalitiesRangeImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const { + return rwModuleWithMappingGetRangeActor(ryw, this, kr); +} + +void ExcludedLocalitiesRangeImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) { + // ignore value + ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional(ValueRef()))); +} + +Key ExcludedLocalitiesRangeImpl::decode(const KeyRef& key) const { + return key.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin) + .withPrefix(LiteralStringRef("\xff/conf/")); +} + +Key ExcludedLocalitiesRangeImpl::encode(const KeyRef& key) const { + return key.removePrefix(LiteralStringRef("\xff/conf/")) + .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin); +} + +Future> ExcludedLocalitiesRangeImpl::commit(ReadYourWritesTransaction* ryw) { + // exclude locality with failed option as false. + return excludeLocalityCommitActor(ryw, false); +} + +FailedLocalitiesRangeImpl::FailedLocalitiesRangeImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {} + +Future FailedLocalitiesRangeImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const { + return rwModuleWithMappingGetRangeActor(ryw, this, kr); +} + +void FailedLocalitiesRangeImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) { + // ignore value + ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional(ValueRef()))); +} + +Key FailedLocalitiesRangeImpl::decode(const KeyRef& key) const { + return key.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin) + .withPrefix(LiteralStringRef("\xff/conf/")); +} + +Key FailedLocalitiesRangeImpl::encode(const KeyRef& key) const { + return key.removePrefix(LiteralStringRef("\xff/conf/")) + .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin); +} + +Future> FailedLocalitiesRangeImpl::commit(ReadYourWritesTransaction* ryw) { + // exclude locality with failed option as true. + return excludeLocalityCommitActor(ryw, true); +} diff --git a/fdbclient/SpecialKeySpace.actor.h b/fdbclient/SpecialKeySpace.actor.h index 084135bfb6..8076c320b9 100644 --- a/fdbclient/SpecialKeySpace.actor.h +++ b/fdbclient/SpecialKeySpace.actor.h @@ -280,6 +280,28 @@ public: Future> commit(ReadYourWritesTransaction* ryw) override; }; +// Special key management api for excluding localities (exclude_locality) +class ExcludedLocalitiesRangeImpl : public SpecialKeyRangeRWImpl { +public: + explicit ExcludedLocalitiesRangeImpl(KeyRangeRef kr); + Future getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override; + void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) override; + Key decode(const KeyRef& key) const override; + Key encode(const KeyRef& key) const override; + Future> commit(ReadYourWritesTransaction* ryw) override; +}; + +// Special key management api for excluding localities with failed option (failed_locality) +class FailedLocalitiesRangeImpl : public SpecialKeyRangeRWImpl { +public: + explicit FailedLocalitiesRangeImpl(KeyRangeRef kr); + Future getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override; + void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) override; + Key decode(const KeyRef& key) const override; + Key encode(const KeyRef& key) const override; + Future> commit(ReadYourWritesTransaction* ryw) override; +}; + class ExcludeServersRangeImpl : public SpecialKeyRangeRWImpl { public: explicit ExcludeServersRangeImpl(KeyRangeRef kr); diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index c447f0a0c1..a7a6396039 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -634,7 +634,7 @@ const KeyRef triggerDDTeamInfoPrintKey(LiteralStringRef("\xff/triggerDDTeamInfoP const KeyRangeRef excludedServersKeys(LiteralStringRef("\xff/conf/excluded/"), LiteralStringRef("\xff/conf/excluded0")); const KeyRef excludedServersPrefix = excludedServersKeys.begin; const KeyRef excludedServersVersionKey = LiteralStringRef("\xff/conf/excluded"); -const AddressExclusion decodeExcludedServersKey(KeyRef const& key) { +AddressExclusion decodeExcludedServersKey(KeyRef const& key) { ASSERT(key.startsWith(excludedServersPrefix)); // Returns an invalid NetworkAddress if given an invalid key (within the prefix) // Excluded servers have IP in x.x.x.x format, port optional, and no SSL suffix @@ -648,10 +648,22 @@ std::string encodeExcludedServersKey(AddressExclusion const& addr) { return excludedServersPrefix.toString() + addr.toString(); } +const KeyRangeRef excludedLocalityKeys(LiteralStringRef("\xff/conf/excluded_locality/"), + LiteralStringRef("\xff/conf/excluded_locality0")); +const KeyRef excludedLocalityPrefix = excludedLocalityKeys.begin; +const KeyRef excludedLocalityVersionKey = LiteralStringRef("\xff/conf/excluded_locality"); +std::string decodeExcludedLocalityKey(KeyRef const& key) { + ASSERT(key.startsWith(excludedLocalityPrefix)); + return key.removePrefix(excludedLocalityPrefix).toString(); +} +std::string encodeExcludedLocalityKey(std::string const& locality) { + return excludedLocalityPrefix.toString() + locality; +} + const KeyRangeRef failedServersKeys(LiteralStringRef("\xff/conf/failed/"), LiteralStringRef("\xff/conf/failed0")); const KeyRef failedServersPrefix = failedServersKeys.begin; const KeyRef failedServersVersionKey = LiteralStringRef("\xff/conf/failed"); -const AddressExclusion decodeFailedServersKey(KeyRef const& key) { +AddressExclusion decodeFailedServersKey(KeyRef const& key) { ASSERT(key.startsWith(failedServersPrefix)); // Returns an invalid NetworkAddress if given an invalid key (within the prefix) // Excluded servers have IP in x.x.x.x format, port optional, and no SSL suffix @@ -665,6 +677,18 @@ std::string encodeFailedServersKey(AddressExclusion const& addr) { return failedServersPrefix.toString() + addr.toString(); } +const KeyRangeRef failedLocalityKeys(LiteralStringRef("\xff/conf/failed_locality/"), + LiteralStringRef("\xff/conf/failed_locality0")); +const KeyRef failedLocalityPrefix = failedLocalityKeys.begin; +const KeyRef failedLocalityVersionKey = LiteralStringRef("\xff/conf/failed_locality"); +std::string decodeFailedLocalityKey(KeyRef const& key) { + ASSERT(key.startsWith(failedLocalityPrefix)); + return key.removePrefix(failedLocalityPrefix).toString(); +} +std::string encodeFailedLocalityKey(std::string const& locality) { + return failedLocalityPrefix.toString() + locality; +} + // const KeyRangeRef globalConfigKeys( LiteralStringRef("\xff/globalConfig/"), LiteralStringRef("\xff/globalConfig0") ); // const KeyRef globalConfigPrefix = globalConfigKeys.begin; diff --git a/fdbclient/SystemData.h b/fdbclient/SystemData.h index 922c8fccb4..5bdf88419c 100644 --- a/fdbclient/SystemData.h +++ b/fdbclient/SystemData.h @@ -224,9 +224,16 @@ extern const KeyRef excludedServersPrefix; extern const KeyRangeRef excludedServersKeys; extern const KeyRef excludedServersVersionKey; // The value of this key shall be changed by any transaction that // modifies the excluded servers list -const AddressExclusion decodeExcludedServersKey(KeyRef const& key); // where key.startsWith(excludedServersPrefix) +AddressExclusion decodeExcludedServersKey(KeyRef const& key); // where key.startsWith(excludedServersPrefix) std::string encodeExcludedServersKey(AddressExclusion const&); +extern const KeyRef excludedLocalityPrefix; +extern const KeyRangeRef excludedLocalityKeys; +extern const KeyRef excludedLocalityVersionKey; // The value of this key shall be changed by any transaction that + // modifies the excluded localities list +std::string decodeExcludedLocalityKey(KeyRef const& key); // where key.startsWith(excludedLocalityPrefix) +std::string encodeExcludedLocalityKey(std::string const&); + // "\xff/conf/failed/1.2.3.4" := "" // "\xff/conf/failed/1.2.3.4:4000" := "" // These are inside configKeysPrefix since they represent a form of configuration and they are convenient @@ -238,9 +245,16 @@ extern const KeyRef failedServersPrefix; extern const KeyRangeRef failedServersKeys; extern const KeyRef failedServersVersionKey; // The value of this key shall be changed by any transaction that modifies // the failed servers list -const AddressExclusion decodeFailedServersKey(KeyRef const& key); // where key.startsWith(failedServersPrefix) +AddressExclusion decodeFailedServersKey(KeyRef const& key); // where key.startsWith(failedServersPrefix) std::string encodeFailedServersKey(AddressExclusion const&); +extern const KeyRef failedLocalityPrefix; +extern const KeyRangeRef failedLocalityKeys; +extern const KeyRef failedLocalityVersionKey; // The value of this key shall be changed by any transaction that modifies + // the failed localities list +std::string decodeFailedLocalityKey(KeyRef const& key); // where key.startsWith(failedLocalityPrefix) +std::string encodeFailedLocalityKey(std::string const&); + // "\xff/globalConfig/[[option]]" := "value" // An umbrella prefix for global configuration data synchronized to all nodes. // extern const KeyRangeRef globalConfigData; diff --git a/fdbrpc/Locality.cpp b/fdbrpc/Locality.cpp index 8cdc0751c4..b897322f7b 100644 --- a/fdbrpc/Locality.cpp +++ b/fdbrpc/Locality.cpp @@ -26,6 +26,8 @@ const StringRef LocalityData::keyZoneId = LiteralStringRef("zoneid"); const StringRef LocalityData::keyDcId = LiteralStringRef("dcid"); const StringRef LocalityData::keyMachineId = LiteralStringRef("machineid"); const StringRef LocalityData::keyDataHallId = LiteralStringRef("data_hall"); +const StringRef LocalityData::ExcludeLocalityKeyMachineIdPrefix = LiteralStringRef("locality_machineid:"); +const StringRef LocalityData::ExcludeLocalityPrefix = LiteralStringRef("locality_"); ProcessClass::Fitness ProcessClass::machineClassFitness(ClusterRole role) const { switch (role) { diff --git a/fdbrpc/Locality.h b/fdbrpc/Locality.h index 0a7467e0bf..11e1e27d8d 100644 --- a/fdbrpc/Locality.h +++ b/fdbrpc/Locality.h @@ -308,7 +308,19 @@ public: } } + std::map getAllData() const { + std::map data; + for (const auto& d : _data) { + if (d.second.present()) { + data[d.first.toString()] = d.second.get().toString(); + } + } + return data; + } + static const UID UNSET_ID; + static const StringRef ExcludeLocalityKeyMachineIdPrefix; + static const StringRef ExcludeLocalityPrefix; }; static std::string describe(std::vector const& items, StringRef const key, int max_items = -1) { diff --git a/fdbserver/ApplyMetadataMutation.cpp b/fdbserver/ApplyMetadataMutation.cpp index f7dc855d12..cea35794a9 100644 --- a/fdbserver/ApplyMetadataMutation.cpp +++ b/fdbserver/ApplyMetadataMutation.cpp @@ -213,7 +213,9 @@ void applyMetadataMutations(SpanID const& spanContext, .castTo()) { // FIXME: Make this check more specific, here or by reading // configuration whenever there is a change if ((!m.param1.startsWith(excludedServersPrefix) && m.param1 != excludedServersVersionKey) && - (!m.param1.startsWith(failedServersPrefix) && m.param1 != failedServersVersionKey)) { + (!m.param1.startsWith(failedServersPrefix) && m.param1 != failedServersVersionKey) && + (!m.param1.startsWith(excludedLocalityPrefix) && m.param1 != excludedLocalityVersionKey) && + (!m.param1.startsWith(failedLocalityPrefix) && m.param1 != failedLocalityVersionKey)) { auto t = txnStateStore->readValue(m.param1).get(); TraceEvent("MutationRequiresRestart", dbgid) .detail("M", m.toString()) @@ -427,7 +429,8 @@ void applyMetadataMutations(SpanID const& spanContext, if (configKeys.intersects(range)) { if (!initialCommit) txnStateStore->clear(range & configKeys); - if (!excludedServersKeys.contains(range) && !failedServersKeys.contains(range)) { + if (!excludedServersKeys.contains(range) && !failedServersKeys.contains(range) && + !excludedLocalityKeys.contains(range) && !failedLocalityKeys.contains(range)) { TraceEvent("MutationRequiresRestart", dbgid).detail("M", m.toString()); confChange = true; } diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 3dd6aceeec..a4553861e2 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -3815,16 +3815,26 @@ ACTOR Future trackExcludedServers(DDTeamCollection* self) { tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); state Future fresultsExclude = tr.getRange(excludedServersKeys, CLIENT_KNOBS->TOO_MANY); state Future fresultsFailed = tr.getRange(failedServersKeys, CLIENT_KNOBS->TOO_MANY); - wait(success(fresultsExclude) && success(fresultsFailed)); + state Future flocalitiesExclude = tr.getRange(excludedLocalityKeys, CLIENT_KNOBS->TOO_MANY); + state Future flocalitiesFailed = tr.getRange(failedLocalityKeys, CLIENT_KNOBS->TOO_MANY); + state Future> fworkers = getWorkers(self->cx); + wait(success(fresultsExclude) && success(fresultsFailed) && success(flocalitiesExclude) && + success(flocalitiesFailed)); - RangeResult excludedResults = fresultsExclude.get(); + state RangeResult excludedResults = fresultsExclude.get(); ASSERT(!excludedResults.more && excludedResults.size() < CLIENT_KNOBS->TOO_MANY); - RangeResult failedResults = fresultsFailed.get(); + state RangeResult failedResults = fresultsFailed.get(); ASSERT(!failedResults.more && failedResults.size() < CLIENT_KNOBS->TOO_MANY); - std::set excluded; - std::set failed; + state RangeResult excludedLocalityResults = flocalitiesExclude.get(); + ASSERT(!excludedLocalityResults.more && excludedLocalityResults.size() < CLIENT_KNOBS->TOO_MANY); + + state RangeResult failedLocalityResults = flocalitiesFailed.get(); + ASSERT(!failedLocalityResults.more && failedLocalityResults.size() < CLIENT_KNOBS->TOO_MANY); + + state std::set excluded; + state std::set failed; for (const auto& r : excludedResults) { AddressExclusion addr = decodeExcludedServersKey(r.key); if (addr.isValid()) { @@ -3838,6 +3848,19 @@ ACTOR Future trackExcludedServers(DDTeamCollection* self) { } } + wait(success(fworkers)); + std::vector workers = fworkers.get(); + for (const auto& r : excludedLocalityResults) { + std::string locality = decodeExcludedLocalityKey(r.key); + std::set localityExcludedAddresses = getAddressesByLocality(workers, locality); + excluded.insert(localityExcludedAddresses.begin(), localityExcludedAddresses.end()); + } + for (const auto& r : failedLocalityResults) { + std::string locality = decodeFailedLocalityKey(r.key); + std::set localityFailedAddresses = getAddressesByLocality(workers, locality); + failed.insert(localityFailedAddresses.begin(), localityFailedAddresses.end()); + } + // Reset and reassign self->excludedServers based on excluded, but we only // want to trigger entries that are different // Do not retrigger and double-overwrite failed or wiggling servers @@ -3860,11 +3883,14 @@ ACTOR Future trackExcludedServers(DDTeamCollection* self) { } TraceEvent("DDExcludedServersChanged", self->distributorId) - .detail("RowsExcluded", excludedResults.size()) - .detail("RowsFailed", failedResults.size()); + .detail("AddressesExcluded", excludedResults.size()) + .detail("AddressesFailed", failedResults.size()) + .detail("LocalitiesExcluded", excludedLocalityResults.size()) + .detail("LocalitiesFailed", failedLocalityResults.size()); self->restartRecruiting.trigger(); - state Future watchFuture = tr.watch(excludedServersVersionKey) || tr.watch(failedServersVersionKey); + state Future watchFuture = tr.watch(excludedServersVersionKey) || tr.watch(failedServersVersionKey) || + tr.watch(excludedLocalityVersionKey) || tr.watch(failedLocalityVersionKey); wait(tr.commit()); wait(watchFuture); tr.reset(); diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index aadc7ff998..8103314235 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -1032,6 +1032,13 @@ ACTOR Future> addStorageServer(Database cx, StorageServe ? tr->get(StringRef(encodeFailedServersKey(AddressExclusion(server.secondaryAddress().get().ip)))) : Future>(Optional()); + state vector>> localityExclusions; + std::map localityData = server.locality.getAllData(); + for (const auto& l : localityData) { + localityExclusions.push_back(tr->get(StringRef(encodeExcludedLocalityKey( + LocalityData::ExcludeLocalityPrefix.toString() + l.first + ":" + l.second)))); + } + state Future fTags = tr->getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY, true); state Future fHistoryTags = tr->getRange(serverTagHistoryKeys, CLIENT_KNOBS->TOO_MANY, true); @@ -1039,13 +1046,23 @@ ACTOR Future> addStorageServer(Database cx, StorageServe success(fExclProc) && success(fExclIP) && success(fFailProc) && success(fFailIP) && success(fExclProc2) && success(fExclIP2) && success(fFailProc2) && success(fFailIP2)); - // If we have been added to the excluded/failed state servers list, we have to fail + for (const auto& exclusion : localityExclusions) { + wait(success(exclusion)); + } + + // If we have been added to the excluded/failed state servers or localities list, we have to fail if (fExclProc.get().present() || fExclIP.get().present() || fFailProc.get().present() || fFailIP.get().present() || fExclProc2.get().present() || fExclIP2.get().present() || fFailProc2.get().present() || fFailIP2.get().present()) { throw recruitment_failed(); } + for (const auto& exclusion : localityExclusions) { + if (exclusion.get().present()) { + throw recruitment_failed(); + } + } + if (fTagLocalities.get().more || fTags.get().more || fHistoryTags.get().more) ASSERT(false); diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index 4881b00914..804e4a537e 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -1394,9 +1394,10 @@ ACTOR Future configurationMonitor(RatekeeperData* self) { self->configuration.fromKeyValues((VectorRef)results); - state Future watchFuture = tr.watch(moveKeysLockOwnerKey) || - tr.watch(excludedServersVersionKey) || - tr.watch(failedServersVersionKey); + state Future watchFuture = + tr.watch(moveKeysLockOwnerKey) || tr.watch(excludedServersVersionKey) || + tr.watch(failedServersVersionKey) || tr.watch(excludedLocalityVersionKey) || + tr.watch(failedLocalityVersionKey); wait(tr.commit()); wait(watchFuture); break; diff --git a/fdbserver/Status.actor.cpp b/fdbserver/Status.actor.cpp index 6182bb7d7d..ee4198c7b0 100644 --- a/fdbserver/Status.actor.cpp +++ b/fdbserver/Status.actor.cpp @@ -358,8 +358,15 @@ static JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics, // FIXME: this will not catch if the secondary address of the process was excluded NetworkAddressList tempList; tempList.address = it->first; - if (configuration.present() && !configuration.get().isExcludedServer(tempList)) - notExcludedMap[machineId] = false; + bool excludedServer = false; + bool excludedLocality = false; + if (configuration.present() && configuration.get().isExcludedServer(tempList)) + excludedServer = true; + if (locality.count(it->first) && configuration.present() && + configuration.get().isMachineExcluded(locality[it->first])) + excludedLocality = true; + + notExcludedMap[machineId] = excludedServer || excludedLocality; workerContribMap[machineId]++; } catch (Error&) { ++failed; @@ -1012,7 +1019,8 @@ ACTOR static Future processStatusFetcher( statusObj["roles"] = roles.getStatusForAddress(address); if (configuration.present()) { - statusObj["excluded"] = configuration.get().isExcludedServer(workerItr->interf.addresses()); + statusObj["excluded"] = configuration.get().isExcludedServer(workerItr->interf.addresses()) || + configuration.get().isExcludedLocality(workerItr->interf.locality); } statusObj["class_type"] = workerItr->processClass.toString(); @@ -1605,6 +1613,12 @@ static JsonBuilderObject configurationFetcher(Optional co statusObj["address"] = it->toString(); excludedServersArr.push_back(statusObj); } + std::set excludedLocalities = configuration.getExcludedLocalities(); + for (const auto& it : excludedLocalities) { + JsonBuilderObject statusObj; + statusObj["locality"] = it; + excludedServersArr.push_back(statusObj); + } statusObj["excluded_servers"] = excludedServersArr; } vector coordinatorLeaderServers = coordinators.clientLeaderServers; diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index 3981bbe2bc..4edcaa18cf 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -1537,9 +1537,10 @@ ACTOR Future configurationMonitor(Reference self, Database cx) self->registrationTrigger.trigger(); } - state Future watchFuture = tr.watch(moveKeysLockOwnerKey) || - tr.watch(excludedServersVersionKey) || - tr.watch(failedServersVersionKey); + state Future watchFuture = + tr.watch(moveKeysLockOwnerKey) || tr.watch(excludedServersVersionKey) || + tr.watch(failedServersVersionKey) || tr.watch(excludedLocalityVersionKey) || + tr.watch(failedLocalityVersionKey); wait(tr.commit()); wait(watchFuture); break; diff --git a/fdbserver/workloads/RemoveServersSafely.actor.cpp b/fdbserver/workloads/RemoveServersSafely.actor.cpp index c5fe4369da..255b75357c 100644 --- a/fdbserver/workloads/RemoveServersSafely.actor.cpp +++ b/fdbserver/workloads/RemoveServersSafely.actor.cpp @@ -75,7 +75,10 @@ struct RemoveServersSafelyWorkload : TestWorkload { TraceEvent("RemoveAndKill") .detail("Step", "listAddresses") .detail("Address", pAddr.toString()) - .detail("Process", describe(*it)); + .detail("Process", describe(*it)) + .detail("Dcid", it->locality.dcId().get().toString()) + .detail("Zoneid", it->locality.zoneId().get().toString()) + .detail("MachineId", it->locality.machineId().get().toString()); if (g_simulator.protectedAddresses.count(it->address) == 0) processAddrs.push_back(pAddr); @@ -335,6 +338,7 @@ struct RemoveServersSafelyWorkload : TestWorkload { double waitSeconds, std::set toKill1, std::set toKill2) { + wait(updateProcessIds(cx)); wait(delay(waitSeconds)); // Removing the first set of machines might legitimately bring the database down, so a timeout is not an error @@ -374,6 +378,7 @@ struct RemoveServersSafelyWorkload : TestWorkload { // Include the servers, if unable to exclude // Reinclude when buggify is on to increase the surface area of the next set of excludes + state bool failed = true; if (!bClearedFirst || BUGGIFY) { // Get the updated list of processes which may have changed due to reboots, deletes, etc TraceEvent("RemoveAndKill") @@ -382,6 +387,8 @@ struct RemoveServersSafelyWorkload : TestWorkload { .detail("ToKill", describe(toKill1)) .detail("ClusterAvailable", g_simulator.isAvailable()); wait(includeServers(cx, vector(1))); + wait(includeLocalities(cx, vector(), failed, true)); + wait(includeLocalities(cx, vector(), !failed, true)); self->includeAddresses(toKill1); } @@ -418,17 +425,16 @@ struct RemoveServersSafelyWorkload : TestWorkload { .detail("ToKill", describe(toKill2)) .detail("ClusterAvailable", g_simulator.isAvailable()); - // Reinclude all of the machine, if buggified - if (BUGGIFY) { - // Get the updated list of processes which may have changed due to reboots, deletes, etc - TraceEvent("RemoveAndKill") - .detail("Step", "include all second") - .detail("KillTotal", toKill2.size()) - .detail("ToKill", describe(toKill2)) - .detail("ClusterAvailable", g_simulator.isAvailable()); - wait(includeServers(cx, vector(1))); - self->includeAddresses(toKill2); - } + // Get the updated list of processes which may have changed due to reboots, deletes, etc + TraceEvent("RemoveAndKill") + .detail("Step", "include all second") + .detail("KillTotal", toKill2.size()) + .detail("ToKill", describe(toKill2)) + .detail("ClusterAvailable", g_simulator.isAvailable()); + wait(includeServers(cx, vector(1))); + wait(includeLocalities(cx, vector(), failed, true)); + wait(includeLocalities(cx, vector(), !failed, true)); + self->includeAddresses(toKill2); return Void(); } @@ -516,7 +522,10 @@ struct RemoveServersSafelyWorkload : TestWorkload { .detail("Step", "Including all") .detail("ClusterAvailable", g_simulator.isAvailable()) .detail("MarkExcludeAsFailed", markExcludeAsFailed); + state bool failed = true; wait(includeServers(cx, vector(1))); + wait(includeLocalities(cx, vector(), failed, true)); + wait(includeLocalities(cx, vector(), !failed, true)); TraceEvent("RemoveAndKill", functionId) .detail("Step", "Included all") .detail("ClusterAvailable", g_simulator.isAvailable()) @@ -617,20 +626,60 @@ struct RemoveServersSafelyWorkload : TestWorkload { .detail("FailedAddresses", describe(toKillMarkFailedArray)) .detail("ClusterAvailable", g_simulator.isAvailable()) .detail("MarkExcludeAsFailed", markExcludeAsFailed); + + state bool excludeLocalitiesInsteadOfServers = deterministicRandom()->coinflip(); if (markExcludeAsFailed) { - wait(excludeServers(cx, toKillMarkFailedArray, true)); + if (excludeLocalitiesInsteadOfServers) { + state std::unordered_set toKillLocalitiesFailed = + self->getLocalitiesFromAddresses(toKillMarkFailedArray); + TraceEvent("RemoveAndKill", functionId) + .detail("Step", "Excluding localities with failed option") + .detail("FailedAddressesSize", toKillMarkFailedArray.size()) + .detail("FailedAddresses", describe(toKillMarkFailedArray)) + .detail("FailedLocaitiesSize", toKillLocalitiesFailed.size()) + .detail("FailedLocaities", describe(toKillLocalitiesFailed)); + + wait(excludeLocalities(cx, toKillLocalitiesFailed, true)); + } else { + TraceEvent("RemoveAndKill", functionId) + .detail("Step", "Excluding servers with failed option") + .detail("FailedAddressesSize", toKillMarkFailedArray.size()) + .detail("FailedAddresses", describe(toKillMarkFailedArray)); + + wait(excludeServers(cx, toKillMarkFailedArray, true)); + } + } + + if (excludeLocalitiesInsteadOfServers) { + state std::unordered_set toKillLocalities = self->getLocalitiesFromAddresses(toKillArray); + TraceEvent("RemoveAndKill", functionId) + .detail("Step", "Excluding localities without failed option") + .detail("AddressesSize", toKillArray.size()) + .detail("Addresses", describe(toKillArray)) + .detail("LocaitiesSize", toKillLocalities.size()) + .detail("Locaities", describe(toKillLocalities)); + + wait(excludeLocalities(cx, toKillLocalities, false)); + } else { + TraceEvent("RemoveAndKill", functionId) + .detail("Step", "Excluding servers without failed option") + .detail("AddressesSize", toKillArray.size()) + .detail("Addresses", describe(toKillArray)); + + wait(excludeServers(cx, toKillArray)); } - wait(excludeServers(cx, toKillArray)); // We need to skip at least the quorum change if there's nothing to kill, because there might not be enough // servers left alive to do a coordinators auto (?) if (toKill.size()) { - // Wait for removal to be safe - TraceEvent("RemoveAndKill", functionId) - .detail("Step", "Wait For Server Exclusion") - .detail("Addresses", describe(toKill)) - .detail("ClusterAvailable", g_simulator.isAvailable()); - wait(success(checkForExcludingServers(cx, toKillArray, true /* wait for exclusion */))); + if (!excludeLocalitiesInsteadOfServers) { + // Wait for removal to be safe + TraceEvent("RemoveAndKill", functionId) + .detail("Step", "Wait For Server Exclusion") + .detail("Addresses", describe(toKill)) + .detail("ClusterAvailable", g_simulator.isAvailable()); + wait(success(checkForExcludingServers(cx, toKillArray, true /* wait for exclusion */))); + } TraceEvent("RemoveAndKill", functionId) .detail("Step", "coordinators auto") @@ -697,6 +746,60 @@ struct RemoveServersSafelyWorkload : TestWorkload { return kill.excludes(process) || (machineProcesses.find(kill) != machineProcesses.end() && machineProcesses[kill].count(AddressExclusion(process.ip, process.port)) > 0); } + + // Finds the localities list that can be excluded from the safe killable addresses list. + // If excluding based on a particular locality of the safe process, kills any other process, that + // particular locality is not included in the killable localities list. + std::unordered_set getLocalitiesFromAddresses(const std::vector& addresses) { + std::unordered_map allLocalitiesCount; + std::unordered_map killableLocalitiesCount; + auto processes = getServers(); + for (const auto& processInfo : processes) { + std::map localityData = processInfo->locality.getAllData(); + for (const auto& l : localityData) { + allLocalitiesCount[LocalityData::ExcludeLocalityPrefix.toString() + l.first + ":" + l.second]++; + } + + AddressExclusion pAddr(processInfo->address.ip, processInfo->address.port); + if (std::find(addresses.begin(), addresses.end(), pAddr) != addresses.end()) { + for (const auto& l : localityData) { + killableLocalitiesCount[LocalityData::ExcludeLocalityPrefix.toString() + l.first + ":" + + l.second]++; + } + } + } + + std::unordered_set toKillLocalities; + for (const auto& l : killableLocalitiesCount) { + if (l.second == allLocalitiesCount[l.first]) { + toKillLocalities.insert(l.first); + } + } + + return toKillLocalities; + } + + // Update the g_simulator processes list with the process ids + // of the workers, that are generated as part of worker creation. + ACTOR static Future updateProcessIds(Database cx) { + std::vector workers = wait(getWorkers(cx)); + std::unordered_map addressToIndexMap; + for (int i = 0; i < workers.size(); i++) { + addressToIndexMap[workers[i].address] = i; + } + + vector processes = g_simulator.getAllProcesses(); + for (auto process : processes) { + if (addressToIndexMap.find(process->address) != addressToIndexMap.end()) { + if (workers[addressToIndexMap[process->address]].locality.processId().present()) { + process->locality.set(LocalityData::keyProcessId, + workers[addressToIndexMap[process->address]].locality.processId()); + } + } + } + + return Void(); + } }; WorkloadFactory RemoveServersSafelyWorkloadFactory("RemoveServersSafely");