Add an verify option for \xff\xff/worker_interfaces special keys (#7873)

* Add the verify option for \xff\xff/worker_interfaces

* Remove unused code

* update documentations

* update documentations

* solve comments from review

* update some of the comments to be more clear
This commit is contained in:
Chaoguang Lin 2022-08-15 14:05:07 -04:00 committed by GitHub
parent 4317e528ab
commit 3fed0456ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 118 additions and 105 deletions

View File

@ -22,6 +22,8 @@ Each special key that existed before api version 630 is its own module. These ar
#. ``\xff\xff/cluster_file_path`` - See :ref:`cluster file client access <cluster-file-client-access>`
#. ``\xff\xff/status/json`` - See :doc:`Machine-readable status <mr-status>`
#. ``\xff\xff/worker_interfaces`` - key as the worker's network address and value as the serialized ClientWorkerInterface, not transactional
Prior to api version 630, it was also possible to read a range starting at ``\xff\xff/worker_interfaces``. This is mostly an implementation detail of fdbcli,
but it's available in api version 630 as a module with prefix ``\xff\xff/worker_interfaces/``.
@ -210,6 +212,7 @@ that process, and wait for necessary data to be moved away.
#. ``\xff\xff/management/options/failed_locality/force`` Read/write. Setting this key disables safety checks for writes to ``\xff\xff/management/failed_locality/<locality>``. Setting this key only has an effect in the current transaction and is not persisted on commit.
#. ``\xff\xff/management/tenant/map/<tenant>`` Read/write. Setting a key in this range to any value will result in a tenant being created with name ``<tenant>``. Clearing a key in this range will delete the tenant with name ``<tenant>``. Reading all or a portion of this range will return the list of tenants currently present in the cluster, excluding any changes in this transaction. Values read in this range will be JSON objects containing the metadata for the associated tenants.
#. ``\xff\xff/management/tenant/rename/<tenant>`` Read/write. Setting a key in this range to an unused tenant name will result in the tenant with the name ``<tenant>`` to be renamed to the value provided. If the rename operation is a transaction retried in a loop, it is possible for the rename to be applied twice, in which case ``tenant_not_found`` or ``tenant_already_exists`` errors may be returned. This can be avoided by checking for the tenant's existence first.
#. ``\xff\xff/management/options/worker_interfaces/verify`` Read/write. Setting this key will add a verification phase in reading ``\xff\xff/worker_interfaces``. Setting this key only has an effect in the current transaction and is not persisted on commit. Try to establish connections with every worker from the list returned by Cluster Controller and only return those workers that the client can connect to. This option is now only used in fdbcli commands ``kill``, ``suspend`` and ``expensive_data_check`` to populate the worker list.
An exclusion is syntactically either an ip address (e.g. ``127.0.0.1``), or
an ip address and port (e.g. ``127.0.0.1:4500``) or any locality (e.g ``locality_dcid:primary-satellite`` or

View File

@ -46,7 +46,7 @@ ACTOR Future<bool> expensiveDataCheckCommandActor(
if (tokens.size() == 1) {
// initialize worker interfaces
address_interface->clear();
wait(getWorkerInterfaces(tr, address_interface));
wait(getWorkerInterfaces(tr, address_interface, true));
}
if (tokens.size() == 1 || tokencmp(tokens[1], "list")) {
if (address_interface->size() == 0) {

View File

@ -44,7 +44,7 @@ ACTOR Future<bool> killCommandActor(Reference<IDatabase> db,
if (tokens.size() == 1) {
// initialize worker interfaces
address_interface->clear();
wait(getWorkerInterfaces(tr, address_interface));
wait(getWorkerInterfaces(tr, address_interface, true));
}
if (tokens.size() == 1 || tokencmp(tokens[1], "list")) {
if (address_interface->size() == 0) {

View File

@ -43,7 +43,7 @@ ACTOR Future<bool> suspendCommandActor(Reference<IDatabase> db,
if (tokens.size() == 1) {
// initialize worker interfaces
address_interface->clear();
wait(getWorkerInterfaces(tr, address_interface));
wait(getWorkerInterfaces(tr, address_interface, true));
if (address_interface->size() == 0) {
printf("\nNo addresses can be suspended.\n");
} else if (address_interface->size() == 1) {

View File

@ -62,56 +62,52 @@ ACTOR Future<std::string> getSpecialKeysFailureErrorMessage(Reference<ITransacti
return valueObj["message"].get_str();
}
ACTOR Future<Void> verifyAndAddInterface(std::map<Key, std::pair<Value, ClientLeaderRegInterface>>* address_interface,
Reference<FlowLock> connectLock,
KeyValue kv) {
wait(connectLock->take());
state FlowLock::Releaser releaser(*connectLock);
state ClientWorkerInterface workerInterf;
try {
// the interface is back-ward compatible, thus if parsing failed, it needs to upgrade cli version
workerInterf = BinaryReader::fromStringRef<ClientWorkerInterface>(kv.value, IncludeVersion());
} catch (Error& e) {
fprintf(stderr, "Error: %s; CLI version is too old, please update to use a newer version\n", e.what());
return Void();
}
state ClientLeaderRegInterface leaderInterf(workerInterf.address());
choose {
when(Optional<LeaderInfo> rep =
wait(brokenPromiseToNever(leaderInterf.getLeader.getReply(GetLeaderRequest())))) {
StringRef ip_port =
(kv.key.endsWith(LiteralStringRef(":tls")) ? kv.key.removeSuffix(LiteralStringRef(":tls")) : kv.key)
.removePrefix(LiteralStringRef("\xff\xff/worker_interfaces/"));
(*address_interface)[ip_port] = std::make_pair(kv.value, leaderInterf);
if (workerInterf.reboot.getEndpoint().addresses.secondaryAddress.present()) {
Key full_ip_port2 =
StringRef(workerInterf.reboot.getEndpoint().addresses.secondaryAddress.get().toString());
StringRef ip_port2 = full_ip_port2.endsWith(LiteralStringRef(":tls"))
? full_ip_port2.removeSuffix(LiteralStringRef(":tls"))
: full_ip_port2;
(*address_interface)[ip_port2] = std::make_pair(kv.value, leaderInterf);
}
void addInterfacesFromKVs(RangeResult& kvs,
std::map<Key, std::pair<Value, ClientLeaderRegInterface>>* address_interface) {
for (const auto& kv : kvs) {
ClientWorkerInterface workerInterf;
try {
// the interface is back-ward compatible, thus if parsing failed, it needs to upgrade cli version
workerInterf = BinaryReader::fromStringRef<ClientWorkerInterface>(kv.value, IncludeVersion());
} catch (Error& e) {
fprintf(stderr, "Error: %s; CLI version is too old, please update to use a newer version\n", e.what());
return;
}
ClientLeaderRegInterface leaderInterf(workerInterf.address());
StringRef ip_port =
(kv.key.endsWith(LiteralStringRef(":tls")) ? kv.key.removeSuffix(LiteralStringRef(":tls")) : kv.key)
.removePrefix(LiteralStringRef("\xff\xff/worker_interfaces/"));
(*address_interface)[ip_port] = std::make_pair(kv.value, leaderInterf);
if (workerInterf.reboot.getEndpoint().addresses.secondaryAddress.present()) {
Key full_ip_port2 =
StringRef(workerInterf.reboot.getEndpoint().addresses.secondaryAddress.get().toString());
StringRef ip_port2 = full_ip_port2.endsWith(LiteralStringRef(":tls"))
? full_ip_port2.removeSuffix(LiteralStringRef(":tls"))
: full_ip_port2;
(*address_interface)[ip_port2] = std::make_pair(kv.value, leaderInterf);
}
when(wait(delay(CLIENT_KNOBS->CLI_CONNECT_TIMEOUT))) {}
}
return Void();
}
ACTOR Future<Void> getWorkerInterfaces(Reference<ITransaction> tr,
std::map<Key, std::pair<Value, ClientLeaderRegInterface>>* address_interface) {
std::map<Key, std::pair<Value, ClientLeaderRegInterface>>* address_interface,
bool verify) {
if (verify) {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tr->set(workerInterfacesVerifyOptionSpecialKey, ValueRef());
}
// Hold the reference to the standalone's memory
state ThreadFuture<RangeResult> kvsFuture = tr->getRange(
KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"), LiteralStringRef("\xff\xff/worker_interfaces0")),
CLIENT_KNOBS->TOO_MANY);
RangeResult kvs = wait(safeThreadFutureToFuture(kvsFuture));
state RangeResult kvs = wait(safeThreadFutureToFuture(kvsFuture));
ASSERT(!kvs.more);
auto connectLock = makeReference<FlowLock>(CLIENT_KNOBS->CLI_CONNECT_PARALLELISM);
std::vector<Future<Void>> addInterfs;
for (auto it : kvs) {
addInterfs.push_back(verifyAndAddInterface(address_interface, connectLock, it));
if (verify) {
// remove the option if set
tr->clear(workerInterfacesVerifyOptionSpecialKey);
}
wait(waitForAll(addInterfs));
addInterfacesFromKVs(kvs, address_interface);
return Void();
}

View File

@ -1050,36 +1050,6 @@ Future<T> stopNetworkAfter(Future<T> what) {
}
}
ACTOR Future<Void> addInterface(std::map<Key, std::pair<Value, ClientLeaderRegInterface>>* address_interface,
Reference<FlowLock> connectLock,
KeyValue kv) {
wait(connectLock->take());
state FlowLock::Releaser releaser(*connectLock);
state ClientWorkerInterface workerInterf =
BinaryReader::fromStringRef<ClientWorkerInterface>(kv.value, IncludeVersion());
state ClientLeaderRegInterface leaderInterf(workerInterf.address());
choose {
when(Optional<LeaderInfo> rep =
wait(brokenPromiseToNever(leaderInterf.getLeader.getReply(GetLeaderRequest())))) {
StringRef ip_port =
(kv.key.endsWith(LiteralStringRef(":tls")) ? kv.key.removeSuffix(LiteralStringRef(":tls")) : kv.key)
.removePrefix(LiteralStringRef("\xff\xff/worker_interfaces/"));
(*address_interface)[ip_port] = std::make_pair(kv.value, leaderInterf);
if (workerInterf.reboot.getEndpoint().addresses.secondaryAddress.present()) {
Key full_ip_port2 =
StringRef(workerInterf.reboot.getEndpoint().addresses.secondaryAddress.get().toString());
StringRef ip_port2 = full_ip_port2.endsWith(LiteralStringRef(":tls"))
? full_ip_port2.removeSuffix(LiteralStringRef(":tls"))
: full_ip_port2;
(*address_interface)[ip_port2] = std::make_pair(kv.value, leaderInterf);
}
}
when(wait(delay(CLIENT_KNOBS->CLI_CONNECT_TIMEOUT))) {}
}
return Void();
}
ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
state LineNoise& linenoise = *plinenoise;
state bool intrans = false;

View File

@ -120,6 +120,7 @@ extern const KeyRangeRef processClassSourceSpecialKeyRange;
extern const KeyRangeRef processClassTypeSpecialKeyRange;
// Other special keys
inline const KeyRef errorMsgSpecialKey = LiteralStringRef("\xff\xff/error_message");
inline const KeyRef workerInterfacesVerifyOptionSpecialKey = "\xff\xff/management/options/worker_interfaces/verify"_sr;
// help functions (Copied from fdbcli.actor.cpp)
// get all workers' info
@ -132,13 +133,14 @@ void printUsage(StringRef command);
// Pre: tr failed with special_keys_api_failure error
// Read the error message special key and return the message
ACTOR Future<std::string> getSpecialKeysFailureErrorMessage(Reference<ITransaction> tr);
// Using \xff\xff/worker_interfaces/ special key, get all worker interfaces
// Using \xff\xff/worker_interfaces/ special key, get all worker interfaces.
// A worker list will be returned from CC.
// If verify, we will try to establish connections to all workers returned.
// In particular, it will deserialize \xff\xff/worker_interfaces/<address>:=<ClientInterface> kv pairs and issue RPC
// calls, then only return interfaces(kv pairs) the client can talk to
ACTOR Future<Void> getWorkerInterfaces(Reference<ITransaction> tr,
std::map<Key, std::pair<Value, ClientLeaderRegInterface>>* address_interface);
// Deserialize \xff\xff/worker_interfaces/<address>:=<ClientInterface> k-v pair and verify by a RPC call
ACTOR Future<Void> verifyAndAddInterface(std::map<Key, std::pair<Value, ClientLeaderRegInterface>>* address_interface,
Reference<FlowLock> connectLock,
KeyValue kv);
std::map<Key, std::pair<Value, ClientLeaderRegInterface>>* address_interface,
bool verify = false);
// print cluster status info
void printStatus(StatusObjectReader statusObj,
StatusClient::StatusLevel level,

View File

@ -1279,32 +1279,6 @@ void DatabaseContext::registerSpecialKeysImpl(SpecialKeySpace::MODULE module,
ACTOR Future<RangeResult> getWorkerInterfaces(Reference<IClusterConnectionRecord> clusterRecord);
ACTOR Future<Optional<Value>> getJSON(Database db);
struct WorkerInterfacesSpecialKeyImpl : SpecialKeyRangeReadImpl {
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const override {
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionRecord()) {
Key prefix = Key(getKeyRange().begin);
return map(getWorkerInterfaces(ryw->getDatabase()->getConnectionRecord()),
[prefix = prefix, kr = KeyRange(kr)](const RangeResult& in) {
RangeResult result;
for (const auto& [k_, v] : in) {
auto k = k_.withPrefix(prefix);
if (kr.contains(k))
result.push_back_deep(result.arena(), KeyValueRef(k, v));
}
std::sort(result.begin(), result.end(), KeyValueRef::OrderByKey{});
return result;
});
} else {
return RangeResult();
}
}
explicit WorkerInterfacesSpecialKeyImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
};
struct SingleSpecialKeyImpl : SpecialKeyRangeReadImpl {
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,

View File

@ -133,7 +133,8 @@ std::unordered_map<std::string, KeyRange> SpecialKeySpace::actorLineageApiComman
std::set<std::string> SpecialKeySpace::options = { "excluded/force",
"failed/force",
"excluded_locality/force",
"failed_locality/force" };
"failed_locality/force",
"worker_interfaces/verify" };
std::set<std::string> SpecialKeySpace::tracingOptions = { kTracingTransactionIdKey, kTracingTokenKey };
@ -2754,6 +2755,64 @@ Future<Optional<std::string>> FailedLocalitiesRangeImpl::commit(ReadYourWritesTr
return excludeLocalityCommitActor(ryw, true);
}
// Defined in ReadYourWrites.actor.cpp
ACTOR Future<RangeResult> getWorkerInterfaces(Reference<IClusterConnectionRecord> clusterRecord);
// Defined in NativeAPI.actor.cpp
ACTOR Future<bool> verifyInterfaceActor(Reference<FlowLock> connectLock, ClientWorkerInterface workerInterf);
ACTOR static Future<RangeResult> workerInterfacesImplGetRangeActor(ReadYourWritesTransaction* ryw,
KeyRef prefix,
KeyRangeRef kr) {
if (!ryw->getDatabase().getPtr() || !ryw->getDatabase()->getConnectionRecord())
return RangeResult();
state RangeResult interfs = wait(getWorkerInterfaces(ryw->getDatabase()->getConnectionRecord()));
// for options' special keys, the boolean flag indicates if it's a SET operation
auto [verify, _] = ryw->getSpecialKeySpaceWriteMap()[SpecialKeySpace::getManagementApiCommandOptionSpecialKey(
"worker_interfaces", "verify")];
state RangeResult result;
if (verify) {
// if verify option is set, we try to talk to every worker and only returns those we can talk to
Reference<FlowLock> connectLock(new FlowLock(CLIENT_KNOBS->CLI_CONNECT_PARALLELISM));
state std::vector<Future<bool>> verifyInterfs;
for (const auto& [k_, value] : interfs) {
auto k = k_.withPrefix(prefix);
if (kr.contains(k)) {
ClientWorkerInterface workerInterf =
BinaryReader::fromStringRef<ClientWorkerInterface>(value, IncludeVersion());
verifyInterfs.push_back(verifyInterfaceActor(connectLock, workerInterf));
} else {
verifyInterfs.push_back(false);
}
}
wait(waitForAll(verifyInterfs));
// state int index;
for (int index = 0; index < interfs.size(); index++) {
if (verifyInterfs[index].get()) {
// if we can establish a connection, add the kv pair into the result
result.push_back_deep(result.arena(),
KeyValueRef(interfs[index].key.withPrefix(prefix), interfs[index].value));
}
}
} else {
for (const auto& [k_, v] : interfs) {
auto k = k_.withPrefix(prefix);
if (kr.contains(k))
result.push_back_deep(result.arena(), KeyValueRef(k, v));
}
}
std::sort(result.begin(), result.end(), KeyValueRef::OrderByKey{});
return result;
}
WorkerInterfacesSpecialKeyImpl::WorkerInterfacesSpecialKeyImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
Future<RangeResult> WorkerInterfacesSpecialKeyImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const {
return workerInterfacesImplGetRangeActor(ryw, getKeyRange().begin, kr);
}
ACTOR Future<Void> validateSpecialSubrangeRead(ReadYourWritesTransaction* ryw,
KeySelector begin,
KeySelector end,

View File

@ -548,6 +548,15 @@ public:
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
class WorkerInterfacesSpecialKeyImpl : public SpecialKeyRangeReadImpl {
public:
explicit WorkerInterfacesSpecialKeyImpl(KeyRangeRef kr);
Future<RangeResult> getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr,
GetRangeLimits limitsHint) const override;
};
// If the underlying set of key-value pairs of a key space is not changing, then we expect repeating a read to give the
// same result. Additionally, we can generate the expected result of any read if that read is reading a subrange. This
// actor performs a read of an arbitrary subrange of [begin, end) and validates the results.