Merge pull request #4832 from neethuhaneesha/master

exclude servers based on locality match
This commit is contained in:
neethuhaneesha 2021-06-28 12:13:09 -07:00 committed by GitHub
commit 63e530fc44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 860 additions and 83 deletions

View File

@ -182,7 +182,7 @@ You can add new machines to a cluster at any time:
user@host2$ sudo service foundationdb restart
5) If you have previously :ref:`excluded <removing-machines-from-a-cluster>` a machine from the cluster, you will need to take it off the exclusion list using the ``include <ip>`` command of fdbcli before it can be a full participant in the cluster.
5) If you have previously :ref:`excluded <removing-machines-from-a-cluster>` a machine from the cluster, you will need to take it off the exclusion list using the ``include <ip>`` or ``include <locality>`` command of fdbcli before it can be a full participant in the cluster.
.. note:: Addresses have the form ``IP``:``PORT``. This form is used even if TLS is enabled.
@ -207,12 +207,12 @@ To temporarily or permanently remove one or more machines from a FoundationDB cl
The database is available.
Welcome to the fdbcli. For help, type `help'.
fdb> exclude 1.2.3.4 1.2.3.5 1.2.3.6
fdb> exclude 1.2.3.4 1.2.3.5 1.2.3.6 locality_dcid:primary-satellite locality_zoneid:primary-satellite-log-2 locality_machineid:primary-stateless-1 locality_processid:223be2da244ca0182375364e4d122c30 or any locality
Waiting for state to be removed from all excluded servers. This may take a while.
It is now safe to remove these machines or processes from the cluster.
``exclude`` can be used to exclude either machines (by specifying an IP address) or individual processes (by specifying an ``IP``:``PORT`` pair).
``exclude`` can be used to exclude either machines (by specifying an IP address) or individual processes (by specifying an ``IP``:``PORT`` pair or by specifying a locality match).
.. note:: Addresses have the form ``IP``:``PORT``. This form is used even if TLS is enabled.

View File

@ -139,9 +139,9 @@ For more information on setting the cluster description, see :ref:`configuration
exclude
-------
The ``exclude`` command excludes servers from the database or marks them as failed. Its syntax is ``exclude [failed] <ADDRESS...>``. If no addresses are specified, the command provides the set of excluded and failed servers.
The ``exclude`` command excludes servers from the database or marks them as failed. Its syntax is ``exclude [failed] [<ADDRESS...>] [locality_dcid:<excludedcid>] [locality_zoneid:<excludezoneid>] [locality_machineid:<excludemachineid>] [locality_processid:<excludeprocessid>] or any locality``. If no addresses are specified, the command provides the set of excluded and failed servers and localities.
For each IP address or IP:port pair in ``<ADDRESS...>``, the command adds the address to the set of excluded servers. It then waits until all database state has been safely moved off the specified servers.
For each IP address or IP:port pair in ``<ADDRESS...>`` or locality (which include anything set on LocalityData like dcid, zoneid, machineid, processid), the command adds the address/locality to the set of excluded servers and localities. It then waits until all database state has been safely moved off the specified servers.
If the ``failed`` keyword is specified, the address is marked as failed and added to the set of failed servers. It will not wait for the database state to move off the specified servers.
@ -246,15 +246,15 @@ The following options are available for use with the ``option`` command:
include
-------
The ``include`` command permits previously excluded or failed servers to rejoin the database. Its syntax is ``include [failed] all|<ADDRESS...>``.
The ``include`` command permits previously excluded or failed servers/localities to rejoin the database. Its syntax is ``include [failed] all|[<ADDRESS...>] [locality_dcid:<excludedcid>] [locality_zoneid:<excludezoneid>] [locality_machineid:<excludemachineid>] [locality_processid:<excludeprocessid>] or any locality``.
The ``failed`` keyword is required if the servers were previously marked as failed rather than excluded.
If ``all`` is specified, the excluded servers list is cleared. This will not clear the failed servers list.
If ``all`` is specified, the excluded servers and localities list is cleared. This will not clear the failed servers and localities list.
If ``failed all`` or ``all failed`` is specified, the failed servers list is cleared. This will not clear the excluded servers list.
If ``failed all`` or ``all failed`` is specified, the failed servers and localities list is cleared. This will not clear the excluded servers and localities list.
For each IP address or IP:port pair in ``<ADDRESS...>``, the command removes any matching exclusions from the excluded servers list. (A specified IP will match all ``IP:*`` exclusion entries).
For each IP address or IP:port pair in ``<ADDRESS...>`` or locality, the command removes any matching exclusions from the excluded servers/localities list. (A specified IP will match all ``IP:*`` exclusion entries).
For information on adding machines to a cluster, see :ref:`adding-machines-to-a-cluster`.

View File

@ -38,7 +38,7 @@ $ sudo service foundationdb stop
7. Exclude the original machines from the cluster using ``exclude`` in ``fdbcli``. This command will not return until all database state has been moved off of the original machines and fully replicated to the new machines. For example::
fdb> exclude 192.168.1.1:4500 192.168.1.2:4500 192.168.1.3:4500
fdb> exclude 192.168.1.1:4500 192.168.1.2:4500 192.168.1.3:4500 locality_dcid:primary-satellite locality_zoneid:primary-satellite-log-2 locality_machineid:primary-stateless-1 locality_processid:223be2da244ca0182375364e4d122c30 or any locality
8. Run ``coordinators auto`` in ``fdbcli`` to move coordination state to the new machines. Please note that this will cause the fdb.cluster file to be updated with the addresses of the new machines. Any currently connected clients will be notified and (assuming they have appropriate file system :ref:`permissions <cluster_file_permissions>`) will update their own copy of the cluster file. As long as the original machines are still running, any clients that connect to them will be automatically forwarded to the new cluster coordinators. However, if you have a client that has not yet connected or only connects intermittently, you will need to copy the new cluster file from one of the new machines to the client machine.

View File

@ -695,7 +695,8 @@
"coordinators_count":1,
"excluded_servers":[
{
"address":"10.0.4.1"
"address":"10.0.4.1",
"locality":"locality_processid:e9816ca4a89ff64ddb7ba2a5ec10b75b"
}
],
"auto_commit_proxies":3,

View File

@ -201,10 +201,16 @@ that process, and wait for necessary data to be moved away.
#. ``\xff\xff/management/consistency_check_suspended`` Read/write. Set or read this key will set or read the underlying system key ``\xff\x02/ConsistencyCheck/Suspend``. The value of this special key is unused thus if present, will be empty. In particular, if the key exists, then consistency is suspended. For more details, see help text of ``fdbcli`` command ``consistencycheck``.
#. ``\xff\xff/management/db_locked`` Read/write. A single key that can be read and modified. Set the key will lock the database and clear the key will unlock. If the database is already locked, then the commit will fail with the ``special_keys_api_failure`` error. For more details, see help text of ``fdbcli`` command ``lock`` and ``unlock``.
#. ``\xff\xff/management/auto_coordinators`` Read-only. A single key, if read, will return a set of processes which is able to satisfy the current redundency level and serve as new coordinators. The return value is formatted as a comma delimited string of network addresses of coordinators, i.e. ``<ip:port>,<ip:port>,...,<ip:port>``.
#. ``\xff\xff/management/excluded_locality/<locality>`` Read/write. Indicates that the cluster should move data away from processes matching ``<locality>``, so that they can be safely removed. See :ref:`removing machines from a cluster <removing-machines-from-a-cluster>` for documentation for the corresponding fdbcli command.
#. ``\xff\xff/management/failed_locality/<locality>`` Read/write. Indicates that the cluster should consider matching processes as permanently failed. This allows the cluster to avoid maintaining extra state and doing extra work in the hope that these processes come back. See :ref:`removing machines from a cluster <removing-machines-from-a-cluster>` for documentation for the corresponding fdbcli command.
#. ``\xff\xff/management/options/excluded_locality/force`` Read/write. Setting this key disables safety checks for writes to ``\xff\xff/management/excluded_locality/<locality>``. Setting this key only has an effect in the current transaction and is not persisted on commit.
#. ``\xff\xff/management/options/failed_locality/force`` Read/write. Setting this key disables safety checks for writes to ``\xff\xff/management/failed_locality/<locality>``. Setting this key only has an effect in the current transaction and is not persisted on commit.
An exclusion is syntactically either an ip address (e.g. ``127.0.0.1``), or
an ip address and port (e.g. ``127.0.0.1:4500``). If no port is specified,
then all processes on that host match the exclusion.
an ip address and port (e.g. ``127.0.0.1:4500``) or any locality (e.g ``locality_dcid:primary-satellite`` or
``locality_zoneid:primary-satellite-log-2`` or ``locality_machineid:primary-stateless-1`` or ``locality_processid:223be2da244ca0182375364e4d122c30``).
If no port is specified, then all processes on that host match the exclusion.
For locality, all processes that match the given locality are excluded.
Configuration module
--------------------

View File

@ -546,21 +546,26 @@ void initHelp() {
"10.0.0.1:4000 10.0.0.2:4000 10.0.0.3:4000\n\nIf 'description=desc' is specified then the description field in "
"the cluster\nfile is changed to desc, which must match [A-Za-z0-9_]+.");
helpMap["exclude"] = CommandHelp(
"exclude [FORCE] [failed] [no_wait] <ADDRESS...>",
"exclude servers from the database",
"If no addresses are specified, lists the set of excluded servers.\n\nFor each IP address or "
"IP:port pair in <ADDRESS...>, adds the address to the set of excluded servers then waits until all "
"database state has been safely moved away from the specified servers. If 'no_wait' is set, the "
"exclude [FORCE] [failed] [no_wait] [<ADDRESS...>] [locality_dcid:<excludedcid>] "
"[locality_zoneid:<excludezoneid>] [locality_machineid:<excludemachineid>] "
"[locality_processid:<excludeprocessid>] or any locality data",
"exclude servers from the database either with IP address match or locality match",
"If no addresses or locaities are specified, lists the set of excluded addresses and localities."
"\n\nFor each IP address or IP:port pair in <ADDRESS...> or any LocalityData attributes (like dcid, zoneid, "
"machineid, processid), adds the address/locality to the set of excluded servers and localities then waits "
"until all database state has been safely moved away from the specified servers. If 'no_wait' is set, the "
"command returns \nimmediately without checking if the exclusions have completed successfully.\n"
"If 'FORCE' is set, the command does not perform safety checks before excluding.\n"
"If 'failed' is set, the transaction log queue is dropped pre-emptively before waiting\n"
"for data movement to finish and the server cannot be included again.");
helpMap["include"] =
CommandHelp("include all|<ADDRESS...>",
"permit previously-excluded servers to rejoin the database",
"If `all' is specified, the excluded servers list is cleared.\n\nFor each IP address or IP:port "
"pair in <ADDRESS...>, removes any matching exclusions from the excluded servers list. (A "
"specified IP will match all IP:* exclusion entries)");
helpMap["include"] = CommandHelp(
"include all|[<ADDRESS...>] [locality_dcid:<excludedcid>] [locality_zoneid:<excludezoneid>] "
"[locality_machineid:<excludemachineid>] [locality_processid:<excludeprocessid>] or any locality data",
"permit previously-excluded servers and localities to rejoin the database",
"If `all' is specified, the excluded servers and localities list is cleared.\n\nFor each IP address or IP:port "
"pair in <ADDRESS...> or any LocalityData (like dcid, zoneid, machineid, processid), removes any "
"matching exclusions from the excluded servers and localities list. "
"(A specified IP will match all IP:* exclusion entries)");
helpMap["setclass"] =
CommandHelp("setclass [<ADDRESS> <CLASS>]",
"change the class of a process",
@ -2388,19 +2393,26 @@ ACTOR Future<bool> coordinators(Database db, std::vector<StringRef> tokens, bool
return err;
}
// Includes the servers that could be IP addresses or localities back to the cluster.
ACTOR Future<bool> include(Database db, std::vector<StringRef> tokens) {
std::vector<AddressExclusion> addresses;
bool failed = false;
bool all = false;
state std::vector<std::string> localities;
state bool failed = false;
state bool all = false;
for (auto t = tokens.begin() + 1; t != tokens.end(); ++t) {
if (*t == LiteralStringRef("all")) {
all = true;
} else if (*t == LiteralStringRef("failed")) {
failed = true;
} else if (t->startsWith(LocalityData::ExcludeLocalityPrefix) && t->toString().find(':') != std::string::npos) {
// if the token starts with 'locality_' prefix.
localities.push_back(t->toString());
} else {
auto a = AddressExclusion::parse(*t);
if (!a.isValid()) {
fprintf(stderr, "ERROR: '%s' is not a valid network endpoint address\n", t->toString().c_str());
fprintf(stderr,
"ERROR: '%s' is neither a valid network endpoint address nor a locality\n",
t->toString().c_str());
if (t->toString().find(":tls") != std::string::npos)
printf(" Do not include the `:tls' suffix when naming a process\n");
return true;
@ -2412,9 +2424,16 @@ ACTOR Future<bool> include(Database db, std::vector<StringRef> tokens) {
std::vector<AddressExclusion> includeAll;
includeAll.push_back(AddressExclusion());
wait(makeInterruptable(includeServers(db, includeAll, failed)));
wait(makeInterruptable(includeLocalities(db, localities, failed, all)));
} else {
if (!addresses.empty()) {
wait(makeInterruptable(includeServers(db, addresses, failed)));
}
if (!localities.empty()) {
// includes the servers that belong to given localities.
wait(makeInterruptable(includeLocalities(db, localities, failed, all)));
}
}
return false;
};
@ -2423,16 +2442,25 @@ ACTOR Future<bool> exclude(Database db,
Reference<ClusterConnectionFile> ccf,
Future<Void> warn) {
if (tokens.size() <= 1) {
vector<AddressExclusion> excl = wait(makeInterruptable(getExcludedServers(db)));
if (!excl.size()) {
printf("There are currently no servers excluded from the database.\n"
state Future<vector<AddressExclusion>> fexclAddresses = makeInterruptable(getExcludedServers(db));
state Future<vector<std::string>> fexclLocalities = makeInterruptable(getExcludedLocalities(db));
wait(success(fexclAddresses) && success(fexclLocalities));
vector<AddressExclusion> exclAddresses = fexclAddresses.get();
vector<std::string> exclLocalities = fexclLocalities.get();
if (!exclAddresses.size() && !exclLocalities.size()) {
printf("There are currently no servers or localities excluded from the database.\n"
"To learn how to exclude a server, type `help exclude'.\n");
return false;
}
printf("There are currently %zu servers or processes being excluded from the database:\n", excl.size());
for (const auto& e : excl)
printf("There are currently %zu servers or localities being excluded from the database:\n",
exclAddresses.size() + exclLocalities.size());
for (const auto& e : exclAddresses)
printf(" %s\n", e.toString().c_str());
for (const auto& e : exclLocalities)
printf(" %s\n", e.c_str());
printf("To find out whether it is safe to remove one or more of these\n"
"servers from the cluster, type `exclude <addresses>'.\n"
@ -2442,9 +2470,13 @@ ACTOR Future<bool> exclude(Database db,
} else {
state std::vector<AddressExclusion> exclusionVector;
state std::set<AddressExclusion> exclusionSet;
bool force = false;
state std::vector<AddressExclusion> exclusionAddresses;
state std::unordered_set<std::string> exclusionLocalities;
state std::vector<std::string> noMatchLocalities;
state bool force = false;
state bool waitForAllExcluded = true;
state bool markFailed = false;
state std::vector<ProcessData> workers = wait(makeInterruptable(getWorkers(db)));
for (auto t = tokens.begin() + 1; t != tokens.end(); ++t) {
if (*t == LiteralStringRef("FORCE")) {
force = true;
@ -2452,19 +2484,38 @@ ACTOR Future<bool> exclude(Database db,
waitForAllExcluded = false;
} else if (*t == LiteralStringRef("failed")) {
markFailed = true;
} else if (t->startsWith(LocalityData::ExcludeLocalityPrefix) &&
t->toString().find(':') != std::string::npos) {
std::set<AddressExclusion> localityAddresses = getAddressesByLocality(workers, t->toString());
if (localityAddresses.empty()) {
noMatchLocalities.push_back(t->toString());
} else {
// add all the server ipaddresses that belong to the given localities to the exclusionSet.
exclusionVector.insert(exclusionVector.end(), localityAddresses.begin(), localityAddresses.end());
exclusionSet.insert(localityAddresses.begin(), localityAddresses.end());
}
exclusionLocalities.insert(t->toString());
} else {
auto a = AddressExclusion::parse(*t);
if (!a.isValid()) {
fprintf(stderr, "ERROR: '%s' is not a valid network endpoint address\n", t->toString().c_str());
fprintf(stderr,
"ERROR: '%s' is neither a valid network endpoint address nor a locality\n",
t->toString().c_str());
if (t->toString().find(":tls") != std::string::npos)
printf(" Do not include the `:tls' suffix when naming a process\n");
return true;
}
exclusionVector.push_back(a);
exclusionSet.insert(a);
exclusionAddresses.push_back(a);
}
}
if (exclusionAddresses.empty() && exclusionLocalities.empty()) {
fprintf(stderr, "ERROR: At least one valid network endpoint address or a locality is not provided\n");
return true;
}
if (!force) {
if (markFailed) {
state bool safe;
@ -2576,7 +2627,12 @@ ACTOR Future<bool> exclude(Database db,
}
}
wait(makeInterruptable(excludeServers(db, exclusionVector, markFailed)));
if (!exclusionAddresses.empty()) {
wait(makeInterruptable(excludeServers(db, exclusionAddresses, markFailed)));
}
if (!exclusionLocalities.empty()) {
wait(makeInterruptable(excludeLocalities(db, exclusionLocalities, markFailed)));
}
if (waitForAllExcluded) {
printf("Waiting for state to be removed from all excluded servers. This may take a while.\n");
@ -2588,7 +2644,6 @@ ACTOR Future<bool> exclude(Database db,
state std::set<NetworkAddress> notExcludedServers =
wait(makeInterruptable(checkForExcludingServers(db, exclusionVector, waitForAllExcluded)));
std::vector<ProcessData> workers = wait(makeInterruptable(getWorkers(db)));
std::map<IPAddress, std::set<uint16_t>> workerPorts;
for (auto addr : workers)
workerPorts[addr.address.ip].insert(addr.address.port);
@ -2643,6 +2698,14 @@ ACTOR Future<bool> exclude(Database db,
}
}
for (const auto& locality : noMatchLocalities) {
fprintf(
stderr,
" %s ---- WARNING: Currently no servers found with this locality match! Be sure that you excluded "
"the correct locality.\n",
locality.c_str());
}
bool foundCoordinator = false;
auto ccs = ClusterConnectionFile(ccf->getFilename()).getConnectionString();
for (const auto& c : ccs.coordinators()) {

View File

@ -632,6 +632,57 @@ std::set<AddressExclusion> DatabaseConfiguration::getExcludedServers() const {
return addrs;
}
// checks if the locality is excluded or not by checking if the key is present.
bool DatabaseConfiguration::isExcludedLocality(const LocalityData& locality) const {
std::map<std::string, std::string> localityData = locality.getAllData();
for (const auto& l : localityData) {
if (get(StringRef(encodeExcludedLocalityKey(LocalityData::ExcludeLocalityPrefix.toString() + l.first + ":" +
l.second)))
.present() ||
get(StringRef(
encodeFailedLocalityKey(LocalityData::ExcludeLocalityPrefix.toString() + l.first + ":" + l.second)))
.present()) {
return true;
}
}
return false;
}
// checks if this machineid of given locality is excluded.
bool DatabaseConfiguration::isMachineExcluded(const LocalityData& locality) const {
if (locality.machineId().present()) {
return get(encodeExcludedLocalityKey(LocalityData::ExcludeLocalityKeyMachineIdPrefix.toString() +
locality.machineId().get().toString()))
.present() ||
get(encodeFailedLocalityKey(LocalityData::ExcludeLocalityKeyMachineIdPrefix.toString() +
locality.machineId().get().toString()))
.present();
}
return false;
}
// Gets the list of already excluded localities (with failed option)
std::set<std::string> DatabaseConfiguration::getExcludedLocalities() const {
// TODO: revisit all const_cast usages
const_cast<DatabaseConfiguration*>(this)->makeConfigurationImmutable();
std::set<std::string> localities;
for (auto i = lower_bound(rawConfiguration, excludedLocalityKeys.begin);
i != rawConfiguration.end() && i->key < excludedLocalityKeys.end;
++i) {
std::string l = decodeExcludedLocalityKey(i->key);
localities.insert(l);
}
for (auto i = lower_bound(rawConfiguration, failedLocalityKeys.begin);
i != rawConfiguration.end() && i->key < failedLocalityKeys.end;
++i) {
std::string l = decodeFailedLocalityKey(i->key);
localities.insert(l);
}
return localities;
}
void DatabaseConfiguration::makeConfigurationMutable() {
if (mutableConfiguration.present())
return;

View File

@ -248,7 +248,10 @@ struct DatabaseConfiguration {
// Excluded servers (no state should be here)
bool isExcludedServer(NetworkAddressList) const;
bool isExcludedLocality(const LocalityData& locality) const;
bool isMachineExcluded(const LocalityData& locality) const;
std::set<AddressExclusion> getExcludedServers() const;
std::set<std::string> getExcludedLocalities() const;
int32_t getDesiredCommitProxies() const {
if (commitProxyCount == -1)

View File

@ -25,6 +25,7 @@
#include <set>
#include <string>
#include <vector>
#include <unordered_set>
#include "flow/Arena.h"
#include "flow/flow.h"
@ -244,6 +245,11 @@ std::string describe(std::vector<T> const& items, int max_items = -1) {
return describeList(items, max_items);
}
template <class T>
std::string describe(std::unordered_set<T> const& items, int max_items = -1) {
return describeList(items, max_items);
}
template <typename T>
struct Traceable<std::vector<T>> : std::true_type {
static std::string toString(const std::vector<T>& value) { return describe(value); }

View File

@ -40,6 +40,7 @@
#include "fdbrpc/ReplicationPolicy.h"
#include "fdbrpc/Replication.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "fdbclient/Schemas.h"
bool isInteger(const std::string& s) {
if (s.empty())
@ -1576,6 +1577,7 @@ ACTOR Future<Void> excludeServers(Database cx, vector<AddressExclusion> servers,
wait(ryw.commit());
return Void();
} catch (Error& e) {
TraceEvent("ExcludeServersError").error(e, true);
wait(ryw.onError(e));
}
}
@ -1587,6 +1589,70 @@ ACTOR Future<Void> excludeServers(Database cx, vector<AddressExclusion> servers,
wait(tr.commit());
return Void();
} catch (Error& e) {
TraceEvent("ExcludeServersError").error(e, true);
wait(tr.onError(e));
}
}
}
}
// excludes localities by setting the keys in api version below 7.0
void excludeLocalities(Transaction& tr, std::unordered_set<std::string> localities, bool failed) {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::USE_PROVISIONAL_PROXIES);
std::string excludeVersionKey = deterministicRandom()->randomUniqueID().toString();
auto localityVersionKey = failed ? failedLocalityVersionKey : excludedLocalityVersionKey;
tr.addReadConflictRange(singleKeyRange(localityVersionKey)); // To conflict with parallel includeLocalities
tr.set(localityVersionKey, excludeVersionKey);
for (const auto& l : localities) {
if (failed) {
tr.set(encodeFailedLocalityKey(l), StringRef());
} else {
tr.set(encodeExcludedLocalityKey(l), StringRef());
}
}
TraceEvent("ExcludeLocalitiesCommit").detail("Localities", describe(localities)).detail("ExcludeFailed", failed);
}
// Exclude the servers matching the given set of localities from use as state servers.
// excludes localities by setting the keys.
ACTOR Future<Void> excludeLocalities(Database cx, std::unordered_set<std::string> localities, bool failed) {
if (cx->apiVersionAtLeast(700)) {
state ReadYourWritesTransaction ryw(cx);
loop {
try {
ryw.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
ryw.set(SpecialKeySpace::getManagementApiCommandOptionSpecialKey(
failed ? "failed_locality" : "excluded_locality", "force"),
ValueRef());
for (const auto& l : localities) {
Key addr = failed
? SpecialKeySpace::getManagementApiCommandPrefix("failedlocality").withSuffix(l)
: SpecialKeySpace::getManagementApiCommandPrefix("excludedlocality").withSuffix(l);
ryw.set(addr, ValueRef());
}
TraceEvent("ExcludeLocalitiesSpecialKeySpaceCommit")
.detail("Localities", describe(localities))
.detail("ExcludeFailed", failed);
wait(ryw.commit());
return Void();
} catch (Error& e) {
TraceEvent("ExcludeLocalitiesError").error(e, true);
wait(ryw.onError(e));
}
}
} else {
state Transaction tr(cx);
loop {
try {
excludeLocalities(tr, localities, failed);
wait(tr.commit());
return Void();
} catch (Error& e) {
TraceEvent("ExcludeLocalitiesError").error(e, true);
wait(tr.onError(e));
}
}
@ -1694,6 +1760,92 @@ ACTOR Future<Void> includeServers(Database cx, vector<AddressExclusion> servers,
}
}
// Remove the given localities from the exclusion list.
// include localities by clearing the keys.
ACTOR Future<Void> includeLocalities(Database cx, vector<std::string> localities, bool failed, bool includeAll) {
state std::string versionKey = deterministicRandom()->randomUniqueID().toString();
if (cx->apiVersionAtLeast(700)) {
state ReadYourWritesTransaction ryw(cx);
loop {
try {
ryw.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
if (includeAll) {
if (failed) {
ryw.clear(SpecialKeySpace::getManamentApiCommandRange("failedlocality"));
} else {
ryw.clear(SpecialKeySpace::getManamentApiCommandRange("excludedlocality"));
}
} else {
for (const auto& l : localities) {
Key locality =
failed ? SpecialKeySpace::getManagementApiCommandPrefix("failedlocality").withSuffix(l)
: SpecialKeySpace::getManagementApiCommandPrefix("excludedlocality").withSuffix(l);
ryw.clear(locality);
}
}
TraceEvent("IncludeLocalitiesCommit")
.detail("Localities", describe(localities))
.detail("Failed", failed)
.detail("IncludeAll", includeAll);
wait(ryw.commit());
return Void();
} catch (Error& e) {
TraceEvent("IncludeLocalitiesError").error(e, true);
wait(ryw.onError(e));
}
}
} else {
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::USE_PROVISIONAL_PROXIES);
// includeLocalities might be used in an emergency transaction, so make sure it is
// retry-self-conflicting and CAUSAL_WRITE_RISKY
tr.setOption(FDBTransactionOptions::CAUSAL_WRITE_RISKY);
if (failed) {
tr.addReadConflictRange(singleKeyRange(failedLocalityVersionKey));
tr.set(failedLocalityVersionKey, versionKey);
} else {
tr.addReadConflictRange(singleKeyRange(excludedLocalityVersionKey));
tr.set(excludedLocalityVersionKey, versionKey);
}
if (includeAll) {
if (failed) {
tr.clear(failedLocalityKeys);
} else {
tr.clear(excludedLocalityKeys);
}
} else {
for (const auto& l : localities) {
if (failed) {
tr.clear(encodeFailedLocalityKey(l));
} else {
tr.clear(encodeExcludedLocalityKey(l));
}
}
}
TraceEvent("IncludeLocalitiesCommit")
.detail("Localities", describe(localities))
.detail("Failed", failed)
.detail("IncludeAll", includeAll);
wait(tr.commit());
return Void();
} catch (Error& e) {
TraceEvent("IncludeLocalitiesError").error(e, true);
wait(tr.onError(e));
}
}
}
}
ACTOR Future<Void> setClass(Database cx, AddressExclusion server, ProcessClass processClass) {
state Transaction tr(cx);
@ -1765,6 +1917,72 @@ ACTOR Future<vector<AddressExclusion>> getExcludedServers(Database cx) {
}
}
// Get the current list of excluded localities by reading the keys.
ACTOR Future<vector<std::string>> getExcludedLocalities(Transaction* tr) {
state RangeResult r = wait(tr->getRange(excludedLocalityKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!r.more && r.size() < CLIENT_KNOBS->TOO_MANY);
state RangeResult r2 = wait(tr->getRange(failedLocalityKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!r2.more && r2.size() < CLIENT_KNOBS->TOO_MANY);
vector<std::string> excludedLocalities;
for (const auto& i : r) {
auto a = decodeExcludedLocalityKey(i.key);
excludedLocalities.push_back(a);
}
for (const auto& i : r2) {
auto a = decodeFailedLocalityKey(i.key);
excludedLocalities.push_back(a);
}
uniquify(excludedLocalities);
return excludedLocalities;
}
// Get the list of excluded localities by reading the keys.
ACTOR Future<vector<std::string>> getExcludedLocalities(Database cx) {
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
vector<std::string> exclusions = wait(getExcludedLocalities(&tr));
return exclusions;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
// Decodes the locality string to a pair of locality prefix and its value.
// The prefix could be dcid, processid, machineid, processid.
std::pair<std::string, std::string> decodeLocality(const std::string& locality) {
StringRef localityRef(locality.c_str());
std::string localityKeyValue = localityRef.removePrefix(LocalityData::ExcludeLocalityPrefix).toString();
int split = localityKeyValue.find(':');
if (split != std::string::npos) {
return std::make_pair(localityKeyValue.substr(0, split), localityKeyValue.substr(split + 1));
}
return std::make_pair("", "");
}
// Returns the list of IPAddresses of the workers that match the given locality.
// Example: locality="dcid:primary" returns all the ip addresses of the workers in the primary dc.
std::set<AddressExclusion> getAddressesByLocality(const std::vector<ProcessData>& workers,
const std::string& locality) {
std::pair<std::string, std::string> localityKeyValue = decodeLocality(locality);
std::set<AddressExclusion> localityAddresses;
for (int i = 0; i < workers.size(); i++) {
if (workers[i].locality.isPresent(localityKeyValue.first) &&
workers[i].locality.get(localityKeyValue.first) == localityKeyValue.second) {
localityAddresses.insert(AddressExclusion(workers[i].address.ip, workers[i].address.port));
}
}
return localityAddresses;
}
ACTOR Future<Void> printHealthyZone(Database cx) {
state Transaction tr(cx);
loop {

View File

@ -163,10 +163,21 @@ Reference<IQuorumChange> nameQuorumChange(std::string const& name, Reference<IQu
ACTOR Future<Void> excludeServers(Database cx, vector<AddressExclusion> servers, bool failed = false);
void excludeServers(Transaction& tr, vector<AddressExclusion>& servers, bool failed = false);
// Exclude the servers matching the given set of localities from use as state servers. Returns as soon as the change
// is durable, without necessarily waiting for the servers to be evacuated.
ACTOR Future<Void> excludeLocalities(Database cx, std::unordered_set<std::string> localities, bool failed = false);
void excludeLocalities(Transaction& tr, std::unordered_set<std::string> localities, bool failed = false);
// Remove the given servers from the exclusion list. A NetworkAddress with a port of 0 means all servers on the given
// IP. A NetworkAddress() means all servers (don't exclude anything)
ACTOR Future<Void> includeServers(Database cx, vector<AddressExclusion> servers, bool failed = false);
// Remove the given localities from the exclusion list.
ACTOR Future<Void> includeLocalities(Database cx,
vector<std::string> localities,
bool failed = false,
bool includeAll = false);
// Set the process class of processes with the given address. A NetworkAddress with a port of 0 means all servers on
// the given IP.
ACTOR Future<Void> setClass(Database cx, AddressExclusion server, ProcessClass processClass);
@ -175,6 +186,12 @@ ACTOR Future<Void> setClass(Database cx, AddressExclusion server, ProcessClass p
ACTOR Future<vector<AddressExclusion>> getExcludedServers(Database cx);
ACTOR Future<vector<AddressExclusion>> getExcludedServers(Transaction* tr);
// Get the current list of excluded localities
ACTOR Future<vector<std::string>> getExcludedLocalities(Database cx);
ACTOR Future<vector<std::string>> getExcludedLocalities(Transaction* tr);
std::set<AddressExclusion> getAddressesByLocality(const std::vector<ProcessData>& workers, const std::string& locality);
// Check for the given, previously excluded servers to be evacuated (no longer used for state). If waitForExclusion is
// true, this actor returns once it is safe to shut down all such machines without impacting fault tolerance, until and
// unless any of them are explicitly included with includeServers()

View File

@ -1159,6 +1159,14 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<FailedServersRangeImpl>(SpecialKeySpace::getManamentApiCommandRange("failed")));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ExcludedLocalitiesRangeImpl>(
SpecialKeySpace::getManamentApiCommandRange("excludedlocality")));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<FailedLocalitiesRangeImpl>(SpecialKeySpace::getManamentApiCommandRange("failedlocality")));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::MANAGEMENT,
SpecialKeySpace::IMPLTYPE::READONLY,

View File

@ -747,7 +747,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"coordinators_count":1,
"excluded_servers":[
{
"address":"10.0.4.1"
"address":"10.0.4.1",
"locality":"locality_processid:e9816ca4a89ff64ddb7ba2a5ec10b75b"
}
],
"auto_commit_proxies":3,

View File

@ -77,6 +77,12 @@ std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandT
{ "failed",
KeyRangeRef(LiteralStringRef("failed/"), LiteralStringRef("failed0"))
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
{ "excludedlocality",
KeyRangeRef(LiteralStringRef("excluded_locality/"), LiteralStringRef("excluded_locality0"))
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
{ "failedlocality",
KeyRangeRef(LiteralStringRef("failed_locality/"), LiteralStringRef("failed_locality0"))
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
{ "lock", singleKeyRange(LiteralStringRef("db_locked")).withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
{ "consistencycheck",
singleKeyRange(LiteralStringRef("consistency_check_suspended"))
@ -98,7 +104,10 @@ std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandT
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }
};
std::set<std::string> SpecialKeySpace::options = { "excluded/force", "failed/force" };
std::set<std::string> SpecialKeySpace::options = { "excluded/force",
"failed/force",
"excluded_locality/force",
"failed_locality/force" };
std::set<std::string> SpecialKeySpace::tracingOptions = { kTracingTransactionIdKey, kTracingTokenKey };
@ -2110,3 +2119,158 @@ Future<Optional<std::string>> DataDistributionImpl::commit(ReadYourWritesTransac
}
return msg;
}
// Clears the special management api keys excludeLocality and failedLocality.
void includeLocalities(ReadYourWritesTransaction* ryw) {
ryw->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
ryw->setOption(FDBTransactionOptions::LOCK_AWARE);
ryw->setOption(FDBTransactionOptions::USE_PROVISIONAL_PROXIES);
// includeLocalities might be used in an emergency transaction, so make sure it is retry-self-conflicting and
// CAUSAL_WRITE_RISKY
ryw->setOption(FDBTransactionOptions::CAUSAL_WRITE_RISKY);
std::string versionKey = deterministicRandom()->randomUniqueID().toString();
// for excluded localities
auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(
SpecialKeySpace::getManamentApiCommandRange("excludedlocality"));
Transaction& tr = ryw->getTransaction();
for (auto& iter : ranges) {
auto entry = iter.value();
if (entry.first && !entry.second.present()) {
tr.addReadConflictRange(singleKeyRange(excludedLocalityVersionKey));
tr.set(excludedLocalityVersionKey, versionKey);
tr.clear(ryw->getDatabase()->specialKeySpace->decode(iter.range()));
}
}
// for failed localities
ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(
SpecialKeySpace::getManamentApiCommandRange("failedlocality"));
for (auto& iter : ranges) {
auto entry = iter.value();
if (entry.first && !entry.second.present()) {
tr.addReadConflictRange(singleKeyRange(failedLocalityVersionKey));
tr.set(failedLocalityVersionKey, versionKey);
tr.clear(ryw->getDatabase()->specialKeySpace->decode(iter.range()));
}
}
}
// Reads the excludedlocality and failed locality keys using managment api,
// parses them and returns the list.
bool parseLocalitiesFromKeys(ReadYourWritesTransaction* ryw,
bool failed,
std::unordered_set<std::string>& localities,
std::vector<AddressExclusion>& addresses,
std::set<AddressExclusion>& exclusions,
std::vector<ProcessData>& workers,
Optional<std::string>& msg) {
KeyRangeRef range = failed ? SpecialKeySpace::getManamentApiCommandRange("failedlocality")
: SpecialKeySpace::getManamentApiCommandRange("excludedlocality");
auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(range);
auto iter = ranges.begin();
while (iter != ranges.end()) {
auto entry = iter->value();
// only check for exclude(set) operation, include(clear) are not checked
TraceEvent(SevDebug, "ParseLocalities")
.detail("Valid", entry.first)
.detail("Set", entry.second.present())
.detail("Key", iter->begin().toString());
if (entry.first && entry.second.present()) {
Key locality = iter->begin().removePrefix(range.begin);
if (locality.startsWith(LocalityData::ExcludeLocalityPrefix) &&
locality.toString().find(':') != std::string::npos) {
std::set<AddressExclusion> localityAddresses = getAddressesByLocality(workers, locality.toString());
if (!localityAddresses.empty()) {
std::copy(localityAddresses.begin(), localityAddresses.end(), back_inserter(addresses));
exclusions.insert(localityAddresses.begin(), localityAddresses.end());
}
localities.insert(locality.toString());
} else {
std::string error = "ERROR: \'" + locality.toString() + "\' is not a valid locality\n";
msg = ManagementAPIError::toJsonString(
false, entry.second.present() ? (failed ? "exclude failed" : "exclude") : "include", error);
return false;
}
}
++iter;
}
return true;
}
// On commit, parses the special exclusion keys and get the localities to be excluded, check for exclusions
// and add them to the exclusion list. Also, clears the special management api keys with includeLocalities.
ACTOR Future<Optional<std::string>> excludeLocalityCommitActor(ReadYourWritesTransaction* ryw, bool failed) {
state Optional<std::string> result;
state std::unordered_set<std::string> localities;
state std::vector<AddressExclusion> addresses;
state std::set<AddressExclusion> exclusions;
state std::vector<ProcessData> workers = wait(getWorkers(&ryw->getTransaction()));
if (!parseLocalitiesFromKeys(ryw, failed, localities, addresses, exclusions, workers, result))
return result;
// If force option is not set, we need to do safety check
auto force = ryw->getSpecialKeySpaceWriteMap()[SpecialKeySpace::getManagementApiCommandOptionSpecialKey(
failed ? "failed_locality" : "excluded_locality", "force")];
// only do safety check when we have localities to be excluded and the force option key is not set
if (localities.size() && !(force.first && force.second.present())) {
bool safe = wait(checkExclusion(ryw->getDatabase(), &addresses, &exclusions, failed, &result));
if (!safe)
return result;
}
excludeLocalities(ryw->getTransaction(), localities, failed);
includeLocalities(ryw);
return result;
}
ExcludedLocalitiesRangeImpl::ExcludedLocalitiesRangeImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
Future<RangeResult> ExcludedLocalitiesRangeImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
return rwModuleWithMappingGetRangeActor(ryw, this, kr);
}
void ExcludedLocalitiesRangeImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) {
// ignore value
ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>(ValueRef())));
}
Key ExcludedLocalitiesRangeImpl::decode(const KeyRef& key) const {
return key.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)
.withPrefix(LiteralStringRef("\xff/conf/"));
}
Key ExcludedLocalitiesRangeImpl::encode(const KeyRef& key) const {
return key.removePrefix(LiteralStringRef("\xff/conf/"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin);
}
Future<Optional<std::string>> ExcludedLocalitiesRangeImpl::commit(ReadYourWritesTransaction* ryw) {
// exclude locality with failed option as false.
return excludeLocalityCommitActor(ryw, false);
}
FailedLocalitiesRangeImpl::FailedLocalitiesRangeImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
Future<RangeResult> FailedLocalitiesRangeImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
return rwModuleWithMappingGetRangeActor(ryw, this, kr);
}
void FailedLocalitiesRangeImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) {
// ignore value
ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>(ValueRef())));
}
Key FailedLocalitiesRangeImpl::decode(const KeyRef& key) const {
return key.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)
.withPrefix(LiteralStringRef("\xff/conf/"));
}
Key FailedLocalitiesRangeImpl::encode(const KeyRef& key) const {
return key.removePrefix(LiteralStringRef("\xff/conf/"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin);
}
Future<Optional<std::string>> FailedLocalitiesRangeImpl::commit(ReadYourWritesTransaction* ryw) {
// exclude locality with failed option as true.
return excludeLocalityCommitActor(ryw, true);
}

View File

@ -280,6 +280,28 @@ public:
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
// Special key management api for excluding localities (exclude_locality)
class ExcludedLocalitiesRangeImpl : public SpecialKeyRangeRWImpl {
public:
explicit ExcludedLocalitiesRangeImpl(KeyRangeRef kr);
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) override;
Key decode(const KeyRef& key) const override;
Key encode(const KeyRef& key) const override;
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
// Special key management api for excluding localities with failed option (failed_locality)
class FailedLocalitiesRangeImpl : public SpecialKeyRangeRWImpl {
public:
explicit FailedLocalitiesRangeImpl(KeyRangeRef kr);
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) override;
Key decode(const KeyRef& key) const override;
Key encode(const KeyRef& key) const override;
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
class ExcludeServersRangeImpl : public SpecialKeyRangeRWImpl {
public:
explicit ExcludeServersRangeImpl(KeyRangeRef kr);

View File

@ -634,7 +634,7 @@ const KeyRef triggerDDTeamInfoPrintKey(LiteralStringRef("\xff/triggerDDTeamInfoP
const KeyRangeRef excludedServersKeys(LiteralStringRef("\xff/conf/excluded/"), LiteralStringRef("\xff/conf/excluded0"));
const KeyRef excludedServersPrefix = excludedServersKeys.begin;
const KeyRef excludedServersVersionKey = LiteralStringRef("\xff/conf/excluded");
const AddressExclusion decodeExcludedServersKey(KeyRef const& key) {
AddressExclusion decodeExcludedServersKey(KeyRef const& key) {
ASSERT(key.startsWith(excludedServersPrefix));
// Returns an invalid NetworkAddress if given an invalid key (within the prefix)
// Excluded servers have IP in x.x.x.x format, port optional, and no SSL suffix
@ -648,10 +648,22 @@ std::string encodeExcludedServersKey(AddressExclusion const& addr) {
return excludedServersPrefix.toString() + addr.toString();
}
const KeyRangeRef excludedLocalityKeys(LiteralStringRef("\xff/conf/excluded_locality/"),
LiteralStringRef("\xff/conf/excluded_locality0"));
const KeyRef excludedLocalityPrefix = excludedLocalityKeys.begin;
const KeyRef excludedLocalityVersionKey = LiteralStringRef("\xff/conf/excluded_locality");
std::string decodeExcludedLocalityKey(KeyRef const& key) {
ASSERT(key.startsWith(excludedLocalityPrefix));
return key.removePrefix(excludedLocalityPrefix).toString();
}
std::string encodeExcludedLocalityKey(std::string const& locality) {
return excludedLocalityPrefix.toString() + locality;
}
const KeyRangeRef failedServersKeys(LiteralStringRef("\xff/conf/failed/"), LiteralStringRef("\xff/conf/failed0"));
const KeyRef failedServersPrefix = failedServersKeys.begin;
const KeyRef failedServersVersionKey = LiteralStringRef("\xff/conf/failed");
const AddressExclusion decodeFailedServersKey(KeyRef const& key) {
AddressExclusion decodeFailedServersKey(KeyRef const& key) {
ASSERT(key.startsWith(failedServersPrefix));
// Returns an invalid NetworkAddress if given an invalid key (within the prefix)
// Excluded servers have IP in x.x.x.x format, port optional, and no SSL suffix
@ -665,6 +677,18 @@ std::string encodeFailedServersKey(AddressExclusion const& addr) {
return failedServersPrefix.toString() + addr.toString();
}
const KeyRangeRef failedLocalityKeys(LiteralStringRef("\xff/conf/failed_locality/"),
LiteralStringRef("\xff/conf/failed_locality0"));
const KeyRef failedLocalityPrefix = failedLocalityKeys.begin;
const KeyRef failedLocalityVersionKey = LiteralStringRef("\xff/conf/failed_locality");
std::string decodeFailedLocalityKey(KeyRef const& key) {
ASSERT(key.startsWith(failedLocalityPrefix));
return key.removePrefix(failedLocalityPrefix).toString();
}
std::string encodeFailedLocalityKey(std::string const& locality) {
return failedLocalityPrefix.toString() + locality;
}
// const KeyRangeRef globalConfigKeys( LiteralStringRef("\xff/globalConfig/"), LiteralStringRef("\xff/globalConfig0") );
// const KeyRef globalConfigPrefix = globalConfigKeys.begin;

View File

@ -224,9 +224,16 @@ extern const KeyRef excludedServersPrefix;
extern const KeyRangeRef excludedServersKeys;
extern const KeyRef excludedServersVersionKey; // The value of this key shall be changed by any transaction that
// modifies the excluded servers list
const AddressExclusion decodeExcludedServersKey(KeyRef const& key); // where key.startsWith(excludedServersPrefix)
AddressExclusion decodeExcludedServersKey(KeyRef const& key); // where key.startsWith(excludedServersPrefix)
std::string encodeExcludedServersKey(AddressExclusion const&);
extern const KeyRef excludedLocalityPrefix;
extern const KeyRangeRef excludedLocalityKeys;
extern const KeyRef excludedLocalityVersionKey; // The value of this key shall be changed by any transaction that
// modifies the excluded localities list
std::string decodeExcludedLocalityKey(KeyRef const& key); // where key.startsWith(excludedLocalityPrefix)
std::string encodeExcludedLocalityKey(std::string const&);
// "\xff/conf/failed/1.2.3.4" := ""
// "\xff/conf/failed/1.2.3.4:4000" := ""
// These are inside configKeysPrefix since they represent a form of configuration and they are convenient
@ -238,9 +245,16 @@ extern const KeyRef failedServersPrefix;
extern const KeyRangeRef failedServersKeys;
extern const KeyRef failedServersVersionKey; // The value of this key shall be changed by any transaction that modifies
// the failed servers list
const AddressExclusion decodeFailedServersKey(KeyRef const& key); // where key.startsWith(failedServersPrefix)
AddressExclusion decodeFailedServersKey(KeyRef const& key); // where key.startsWith(failedServersPrefix)
std::string encodeFailedServersKey(AddressExclusion const&);
extern const KeyRef failedLocalityPrefix;
extern const KeyRangeRef failedLocalityKeys;
extern const KeyRef failedLocalityVersionKey; // The value of this key shall be changed by any transaction that modifies
// the failed localities list
std::string decodeFailedLocalityKey(KeyRef const& key); // where key.startsWith(failedLocalityPrefix)
std::string encodeFailedLocalityKey(std::string const&);
// "\xff/globalConfig/[[option]]" := "value"
// An umbrella prefix for global configuration data synchronized to all nodes.
// extern const KeyRangeRef globalConfigData;

View File

@ -26,6 +26,8 @@ const StringRef LocalityData::keyZoneId = LiteralStringRef("zoneid");
const StringRef LocalityData::keyDcId = LiteralStringRef("dcid");
const StringRef LocalityData::keyMachineId = LiteralStringRef("machineid");
const StringRef LocalityData::keyDataHallId = LiteralStringRef("data_hall");
const StringRef LocalityData::ExcludeLocalityKeyMachineIdPrefix = LiteralStringRef("locality_machineid:");
const StringRef LocalityData::ExcludeLocalityPrefix = LiteralStringRef("locality_");
ProcessClass::Fitness ProcessClass::machineClassFitness(ClusterRole role) const {
switch (role) {

View File

@ -308,7 +308,19 @@ public:
}
}
std::map<std::string, std::string> getAllData() const {
std::map<std::string, std::string> data;
for (const auto& d : _data) {
if (d.second.present()) {
data[d.first.toString()] = d.second.get().toString();
}
}
return data;
}
static const UID UNSET_ID;
static const StringRef ExcludeLocalityKeyMachineIdPrefix;
static const StringRef ExcludeLocalityPrefix;
};
static std::string describe(std::vector<LocalityData> const& items, StringRef const key, int max_items = -1) {

View File

@ -213,7 +213,9 @@ void applyMetadataMutations(SpanID const& spanContext,
.castTo<StringRef>()) { // FIXME: Make this check more specific, here or by reading
// configuration whenever there is a change
if ((!m.param1.startsWith(excludedServersPrefix) && m.param1 != excludedServersVersionKey) &&
(!m.param1.startsWith(failedServersPrefix) && m.param1 != failedServersVersionKey)) {
(!m.param1.startsWith(failedServersPrefix) && m.param1 != failedServersVersionKey) &&
(!m.param1.startsWith(excludedLocalityPrefix) && m.param1 != excludedLocalityVersionKey) &&
(!m.param1.startsWith(failedLocalityPrefix) && m.param1 != failedLocalityVersionKey)) {
auto t = txnStateStore->readValue(m.param1).get();
TraceEvent("MutationRequiresRestart", dbgid)
.detail("M", m.toString())
@ -427,7 +429,8 @@ void applyMetadataMutations(SpanID const& spanContext,
if (configKeys.intersects(range)) {
if (!initialCommit)
txnStateStore->clear(range & configKeys);
if (!excludedServersKeys.contains(range) && !failedServersKeys.contains(range)) {
if (!excludedServersKeys.contains(range) && !failedServersKeys.contains(range) &&
!excludedLocalityKeys.contains(range) && !failedLocalityKeys.contains(range)) {
TraceEvent("MutationRequiresRestart", dbgid).detail("M", m.toString());
confChange = true;
}

View File

@ -3815,16 +3815,26 @@ ACTOR Future<Void> trackExcludedServers(DDTeamCollection* self) {
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state Future<RangeResult> fresultsExclude = tr.getRange(excludedServersKeys, CLIENT_KNOBS->TOO_MANY);
state Future<RangeResult> fresultsFailed = tr.getRange(failedServersKeys, CLIENT_KNOBS->TOO_MANY);
wait(success(fresultsExclude) && success(fresultsFailed));
state Future<RangeResult> flocalitiesExclude = tr.getRange(excludedLocalityKeys, CLIENT_KNOBS->TOO_MANY);
state Future<RangeResult> flocalitiesFailed = tr.getRange(failedLocalityKeys, CLIENT_KNOBS->TOO_MANY);
state Future<std::vector<ProcessData>> fworkers = getWorkers(self->cx);
wait(success(fresultsExclude) && success(fresultsFailed) && success(flocalitiesExclude) &&
success(flocalitiesFailed));
RangeResult excludedResults = fresultsExclude.get();
state RangeResult excludedResults = fresultsExclude.get();
ASSERT(!excludedResults.more && excludedResults.size() < CLIENT_KNOBS->TOO_MANY);
RangeResult failedResults = fresultsFailed.get();
state RangeResult failedResults = fresultsFailed.get();
ASSERT(!failedResults.more && failedResults.size() < CLIENT_KNOBS->TOO_MANY);
std::set<AddressExclusion> excluded;
std::set<AddressExclusion> failed;
state RangeResult excludedLocalityResults = flocalitiesExclude.get();
ASSERT(!excludedLocalityResults.more && excludedLocalityResults.size() < CLIENT_KNOBS->TOO_MANY);
state RangeResult failedLocalityResults = flocalitiesFailed.get();
ASSERT(!failedLocalityResults.more && failedLocalityResults.size() < CLIENT_KNOBS->TOO_MANY);
state std::set<AddressExclusion> excluded;
state std::set<AddressExclusion> failed;
for (const auto& r : excludedResults) {
AddressExclusion addr = decodeExcludedServersKey(r.key);
if (addr.isValid()) {
@ -3838,6 +3848,19 @@ ACTOR Future<Void> trackExcludedServers(DDTeamCollection* self) {
}
}
wait(success(fworkers));
std::vector<ProcessData> workers = fworkers.get();
for (const auto& r : excludedLocalityResults) {
std::string locality = decodeExcludedLocalityKey(r.key);
std::set<AddressExclusion> localityExcludedAddresses = getAddressesByLocality(workers, locality);
excluded.insert(localityExcludedAddresses.begin(), localityExcludedAddresses.end());
}
for (const auto& r : failedLocalityResults) {
std::string locality = decodeFailedLocalityKey(r.key);
std::set<AddressExclusion> localityFailedAddresses = getAddressesByLocality(workers, locality);
failed.insert(localityFailedAddresses.begin(), localityFailedAddresses.end());
}
// Reset and reassign self->excludedServers based on excluded, but we only
// want to trigger entries that are different
// Do not retrigger and double-overwrite failed or wiggling servers
@ -3860,11 +3883,14 @@ ACTOR Future<Void> trackExcludedServers(DDTeamCollection* self) {
}
TraceEvent("DDExcludedServersChanged", self->distributorId)
.detail("RowsExcluded", excludedResults.size())
.detail("RowsFailed", failedResults.size());
.detail("AddressesExcluded", excludedResults.size())
.detail("AddressesFailed", failedResults.size())
.detail("LocalitiesExcluded", excludedLocalityResults.size())
.detail("LocalitiesFailed", failedLocalityResults.size());
self->restartRecruiting.trigger();
state Future<Void> watchFuture = tr.watch(excludedServersVersionKey) || tr.watch(failedServersVersionKey);
state Future<Void> watchFuture = tr.watch(excludedServersVersionKey) || tr.watch(failedServersVersionKey) ||
tr.watch(excludedLocalityVersionKey) || tr.watch(failedLocalityVersionKey);
wait(tr.commit());
wait(watchFuture);
tr.reset();

View File

@ -1032,6 +1032,13 @@ ACTOR Future<std::pair<Version, Tag>> addStorageServer(Database cx, StorageServe
? tr->get(StringRef(encodeFailedServersKey(AddressExclusion(server.secondaryAddress().get().ip))))
: Future<Optional<Value>>(Optional<Value>());
state vector<Future<Optional<Value>>> localityExclusions;
std::map<std::string, std::string> localityData = server.locality.getAllData();
for (const auto& l : localityData) {
localityExclusions.push_back(tr->get(StringRef(encodeExcludedLocalityKey(
LocalityData::ExcludeLocalityPrefix.toString() + l.first + ":" + l.second))));
}
state Future<RangeResult> fTags = tr->getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY, true);
state Future<RangeResult> fHistoryTags = tr->getRange(serverTagHistoryKeys, CLIENT_KNOBS->TOO_MANY, true);
@ -1039,13 +1046,23 @@ ACTOR Future<std::pair<Version, Tag>> addStorageServer(Database cx, StorageServe
success(fExclProc) && success(fExclIP) && success(fFailProc) && success(fFailIP) &&
success(fExclProc2) && success(fExclIP2) && success(fFailProc2) && success(fFailIP2));
// If we have been added to the excluded/failed state servers list, we have to fail
for (const auto& exclusion : localityExclusions) {
wait(success(exclusion));
}
// If we have been added to the excluded/failed state servers or localities list, we have to fail
if (fExclProc.get().present() || fExclIP.get().present() || fFailProc.get().present() ||
fFailIP.get().present() || fExclProc2.get().present() || fExclIP2.get().present() ||
fFailProc2.get().present() || fFailIP2.get().present()) {
throw recruitment_failed();
}
for (const auto& exclusion : localityExclusions) {
if (exclusion.get().present()) {
throw recruitment_failed();
}
}
if (fTagLocalities.get().more || fTags.get().more || fHistoryTags.get().more)
ASSERT(false);

View File

@ -1394,9 +1394,10 @@ ACTOR Future<Void> configurationMonitor(RatekeeperData* self) {
self->configuration.fromKeyValues((VectorRef<KeyValueRef>)results);
state Future<Void> watchFuture = tr.watch(moveKeysLockOwnerKey) ||
tr.watch(excludedServersVersionKey) ||
tr.watch(failedServersVersionKey);
state Future<Void> watchFuture =
tr.watch(moveKeysLockOwnerKey) || tr.watch(excludedServersVersionKey) ||
tr.watch(failedServersVersionKey) || tr.watch(excludedLocalityVersionKey) ||
tr.watch(failedLocalityVersionKey);
wait(tr.commit());
wait(watchFuture);
break;

View File

@ -358,8 +358,15 @@ static JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics,
// FIXME: this will not catch if the secondary address of the process was excluded
NetworkAddressList tempList;
tempList.address = it->first;
if (configuration.present() && !configuration.get().isExcludedServer(tempList))
notExcludedMap[machineId] = false;
bool excludedServer = false;
bool excludedLocality = false;
if (configuration.present() && configuration.get().isExcludedServer(tempList))
excludedServer = true;
if (locality.count(it->first) && configuration.present() &&
configuration.get().isMachineExcluded(locality[it->first]))
excludedLocality = true;
notExcludedMap[machineId] = excludedServer || excludedLocality;
workerContribMap[machineId]++;
} catch (Error&) {
++failed;
@ -1012,7 +1019,8 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
statusObj["roles"] = roles.getStatusForAddress(address);
if (configuration.present()) {
statusObj["excluded"] = configuration.get().isExcludedServer(workerItr->interf.addresses());
statusObj["excluded"] = configuration.get().isExcludedServer(workerItr->interf.addresses()) ||
configuration.get().isExcludedLocality(workerItr->interf.locality);
}
statusObj["class_type"] = workerItr->processClass.toString();
@ -1605,6 +1613,12 @@ static JsonBuilderObject configurationFetcher(Optional<DatabaseConfiguration> co
statusObj["address"] = it->toString();
excludedServersArr.push_back(statusObj);
}
std::set<std::string> excludedLocalities = configuration.getExcludedLocalities();
for (const auto& it : excludedLocalities) {
JsonBuilderObject statusObj;
statusObj["locality"] = it;
excludedServersArr.push_back(statusObj);
}
statusObj["excluded_servers"] = excludedServersArr;
}
vector<ClientLeaderRegInterface> coordinatorLeaderServers = coordinators.clientLeaderServers;

View File

@ -1537,9 +1537,10 @@ ACTOR Future<Void> configurationMonitor(Reference<MasterData> self, Database cx)
self->registrationTrigger.trigger();
}
state Future<Void> watchFuture = tr.watch(moveKeysLockOwnerKey) ||
tr.watch(excludedServersVersionKey) ||
tr.watch(failedServersVersionKey);
state Future<Void> watchFuture =
tr.watch(moveKeysLockOwnerKey) || tr.watch(excludedServersVersionKey) ||
tr.watch(failedServersVersionKey) || tr.watch(excludedLocalityVersionKey) ||
tr.watch(failedLocalityVersionKey);
wait(tr.commit());
wait(watchFuture);
break;

View File

@ -75,7 +75,10 @@ struct RemoveServersSafelyWorkload : TestWorkload {
TraceEvent("RemoveAndKill")
.detail("Step", "listAddresses")
.detail("Address", pAddr.toString())
.detail("Process", describe(*it));
.detail("Process", describe(*it))
.detail("Dcid", it->locality.dcId().get().toString())
.detail("Zoneid", it->locality.zoneId().get().toString())
.detail("MachineId", it->locality.machineId().get().toString());
if (g_simulator.protectedAddresses.count(it->address) == 0)
processAddrs.push_back(pAddr);
@ -335,6 +338,7 @@ struct RemoveServersSafelyWorkload : TestWorkload {
double waitSeconds,
std::set<AddressExclusion> toKill1,
std::set<AddressExclusion> toKill2) {
wait(updateProcessIds(cx));
wait(delay(waitSeconds));
// Removing the first set of machines might legitimately bring the database down, so a timeout is not an error
@ -374,6 +378,7 @@ struct RemoveServersSafelyWorkload : TestWorkload {
// Include the servers, if unable to exclude
// Reinclude when buggify is on to increase the surface area of the next set of excludes
state bool failed = true;
if (!bClearedFirst || BUGGIFY) {
// Get the updated list of processes which may have changed due to reboots, deletes, etc
TraceEvent("RemoveAndKill")
@ -382,6 +387,8 @@ struct RemoveServersSafelyWorkload : TestWorkload {
.detail("ToKill", describe(toKill1))
.detail("ClusterAvailable", g_simulator.isAvailable());
wait(includeServers(cx, vector<AddressExclusion>(1)));
wait(includeLocalities(cx, vector<std::string>(), failed, true));
wait(includeLocalities(cx, vector<std::string>(), !failed, true));
self->includeAddresses(toKill1);
}
@ -418,8 +425,6 @@ struct RemoveServersSafelyWorkload : TestWorkload {
.detail("ToKill", describe(toKill2))
.detail("ClusterAvailable", g_simulator.isAvailable());
// Reinclude all of the machine, if buggified
if (BUGGIFY) {
// Get the updated list of processes which may have changed due to reboots, deletes, etc
TraceEvent("RemoveAndKill")
.detail("Step", "include all second")
@ -427,8 +432,9 @@ struct RemoveServersSafelyWorkload : TestWorkload {
.detail("ToKill", describe(toKill2))
.detail("ClusterAvailable", g_simulator.isAvailable());
wait(includeServers(cx, vector<AddressExclusion>(1)));
wait(includeLocalities(cx, vector<std::string>(), failed, true));
wait(includeLocalities(cx, vector<std::string>(), !failed, true));
self->includeAddresses(toKill2);
}
return Void();
}
@ -516,7 +522,10 @@ struct RemoveServersSafelyWorkload : TestWorkload {
.detail("Step", "Including all")
.detail("ClusterAvailable", g_simulator.isAvailable())
.detail("MarkExcludeAsFailed", markExcludeAsFailed);
state bool failed = true;
wait(includeServers(cx, vector<AddressExclusion>(1)));
wait(includeLocalities(cx, vector<std::string>(), failed, true));
wait(includeLocalities(cx, vector<std::string>(), !failed, true));
TraceEvent("RemoveAndKill", functionId)
.detail("Step", "Included all")
.detail("ClusterAvailable", g_simulator.isAvailable())
@ -617,20 +626,60 @@ struct RemoveServersSafelyWorkload : TestWorkload {
.detail("FailedAddresses", describe(toKillMarkFailedArray))
.detail("ClusterAvailable", g_simulator.isAvailable())
.detail("MarkExcludeAsFailed", markExcludeAsFailed);
state bool excludeLocalitiesInsteadOfServers = deterministicRandom()->coinflip();
if (markExcludeAsFailed) {
if (excludeLocalitiesInsteadOfServers) {
state std::unordered_set<std::string> toKillLocalitiesFailed =
self->getLocalitiesFromAddresses(toKillMarkFailedArray);
TraceEvent("RemoveAndKill", functionId)
.detail("Step", "Excluding localities with failed option")
.detail("FailedAddressesSize", toKillMarkFailedArray.size())
.detail("FailedAddresses", describe(toKillMarkFailedArray))
.detail("FailedLocaitiesSize", toKillLocalitiesFailed.size())
.detail("FailedLocaities", describe(toKillLocalitiesFailed));
wait(excludeLocalities(cx, toKillLocalitiesFailed, true));
} else {
TraceEvent("RemoveAndKill", functionId)
.detail("Step", "Excluding servers with failed option")
.detail("FailedAddressesSize", toKillMarkFailedArray.size())
.detail("FailedAddresses", describe(toKillMarkFailedArray));
wait(excludeServers(cx, toKillMarkFailedArray, true));
}
}
if (excludeLocalitiesInsteadOfServers) {
state std::unordered_set<std::string> toKillLocalities = self->getLocalitiesFromAddresses(toKillArray);
TraceEvent("RemoveAndKill", functionId)
.detail("Step", "Excluding localities without failed option")
.detail("AddressesSize", toKillArray.size())
.detail("Addresses", describe(toKillArray))
.detail("LocaitiesSize", toKillLocalities.size())
.detail("Locaities", describe(toKillLocalities));
wait(excludeLocalities(cx, toKillLocalities, false));
} else {
TraceEvent("RemoveAndKill", functionId)
.detail("Step", "Excluding servers without failed option")
.detail("AddressesSize", toKillArray.size())
.detail("Addresses", describe(toKillArray));
wait(excludeServers(cx, toKillArray));
}
// We need to skip at least the quorum change if there's nothing to kill, because there might not be enough
// servers left alive to do a coordinators auto (?)
if (toKill.size()) {
if (!excludeLocalitiesInsteadOfServers) {
// Wait for removal to be safe
TraceEvent("RemoveAndKill", functionId)
.detail("Step", "Wait For Server Exclusion")
.detail("Addresses", describe(toKill))
.detail("ClusterAvailable", g_simulator.isAvailable());
wait(success(checkForExcludingServers(cx, toKillArray, true /* wait for exclusion */)));
}
TraceEvent("RemoveAndKill", functionId)
.detail("Step", "coordinators auto")
@ -697,6 +746,60 @@ struct RemoveServersSafelyWorkload : TestWorkload {
return kill.excludes(process) || (machineProcesses.find(kill) != machineProcesses.end() &&
machineProcesses[kill].count(AddressExclusion(process.ip, process.port)) > 0);
}
// Finds the localities list that can be excluded from the safe killable addresses list.
// If excluding based on a particular locality of the safe process, kills any other process, that
// particular locality is not included in the killable localities list.
std::unordered_set<std::string> getLocalitiesFromAddresses(const std::vector<AddressExclusion>& addresses) {
std::unordered_map<std::string, int> allLocalitiesCount;
std::unordered_map<std::string, int> killableLocalitiesCount;
auto processes = getServers();
for (const auto& processInfo : processes) {
std::map<std::string, std::string> localityData = processInfo->locality.getAllData();
for (const auto& l : localityData) {
allLocalitiesCount[LocalityData::ExcludeLocalityPrefix.toString() + l.first + ":" + l.second]++;
}
AddressExclusion pAddr(processInfo->address.ip, processInfo->address.port);
if (std::find(addresses.begin(), addresses.end(), pAddr) != addresses.end()) {
for (const auto& l : localityData) {
killableLocalitiesCount[LocalityData::ExcludeLocalityPrefix.toString() + l.first + ":" +
l.second]++;
}
}
}
std::unordered_set<std::string> toKillLocalities;
for (const auto& l : killableLocalitiesCount) {
if (l.second == allLocalitiesCount[l.first]) {
toKillLocalities.insert(l.first);
}
}
return toKillLocalities;
}
// Update the g_simulator processes list with the process ids
// of the workers, that are generated as part of worker creation.
ACTOR static Future<Void> updateProcessIds(Database cx) {
std::vector<ProcessData> workers = wait(getWorkers(cx));
std::unordered_map<NetworkAddress, int> addressToIndexMap;
for (int i = 0; i < workers.size(); i++) {
addressToIndexMap[workers[i].address] = i;
}
vector<ISimulator::ProcessInfo*> processes = g_simulator.getAllProcesses();
for (auto process : processes) {
if (addressToIndexMap.find(process->address) != addressToIndexMap.end()) {
if (workers[addressToIndexMap[process->address]].locality.processId().present()) {
process->locality.set(LocalityData::keyProcessId,
workers[addressToIndexMap[process->address]].locality.processId());
}
}
}
return Void();
}
};
WorkloadFactory<RemoveServersSafelyWorkload> RemoveServersSafelyWorkloadFactory("RemoveServersSafely");