Merge pull request #3455 from sfc-gh-clin/add-management-api-into-special-key-space

Add management api into special key space
This commit is contained in:
Andrew Noyes 2020-08-01 16:54:51 -07:00 committed by GitHub
commit 9471d93619
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1230 additions and 257 deletions

View File

@ -20,16 +20,16 @@ Consequently, the special-key-space framework wants to integrate all client func
If your feature is exposing information to clients and the results are easily formatted as key-value pairs, then you can use special-key-space to implement your client function.
## How
If you choose to use, you need to implement a function class that inherits from `SpecialKeyRangeBaseImpl`, which has an abstract method `Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr)`.
If you choose to use, you need to implement a function class that inherits from `SpecialKeyRangeReadImpl`, which has an abstract method `Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr)`.
This method can be treated as a callback, whose implementation details are determined by the developer.
Once you fill out the method, register the function class to the corresponding key range.
Below is a detailed example.
```c++
// Implement the function class,
// the corresponding key range is [\xff\xff/example/, \xff\xff/example/\xff)
class SKRExampleImpl : public SpecialKeyRangeBaseImpl {
class SKRExampleImpl : public SpecialKeyRangeReadImpl {
public:
explicit SKRExampleImpl(KeyRangeRef kr): SpecialKeyRangeBaseImpl(kr) {
explicit SKRExampleImpl(KeyRangeRef kr): SpecialKeyRangeReadImpl(kr) {
// Our implementation is quite simple here, the key-value pairs are formatted as:
// \xff\xff/example/<country_name> : <capital_city_name>
CountryToCapitalCity[LiteralStringRef("USA")] = LiteralStringRef("Washington, D.C.");

View File

@ -2186,6 +2186,7 @@ ACTOR Future<bool> exclude( Database db, std::vector<StringRef> tokens, Referenc
bool _safe = wait(makeInterruptable(checkSafeExclusions(db, addresses)));
safe = _safe;
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) throw;
TraceEvent("CheckSafeExclusionsError").error(e);
safe = false;
}
@ -3369,7 +3370,11 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
printf("\n");
} else if (tokencmp(tokens[1], "all")) {
for( auto it : address_interface ) {
tr->set(LiteralStringRef("\xff\xff/reboot_worker"), it.second.first);
if (db->apiVersionAtLeast(700))
BinaryReader::fromStringRef<ClientWorkerInterface>(it.second.first, IncludeVersion())
.reboot.send(RebootRequest());
else
tr->set(LiteralStringRef("\xff\xff/reboot_worker"), it.second.first);
}
if (address_interface.size() == 0) {
printf("ERROR: no processes to kill. You must run the `kill command before running `kill all.\n");
@ -3387,7 +3392,13 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
if(!is_error) {
for(int i = 1; i < tokens.size(); i++) {
tr->set(LiteralStringRef("\xff\xff/reboot_worker"), address_interface[tokens[i]].first);
if (db->apiVersionAtLeast(700))
BinaryReader::fromStringRef<ClientWorkerInterface>(
address_interface[tokens[i]].first, IncludeVersion())
.reboot.send(RebootRequest());
else
tr->set(LiteralStringRef("\xff\xff/reboot_worker"),
address_interface[tokens[i]].first);
}
printf("Attempted to kill %zu processes\n", tokens.size() - 1);
}
@ -3398,7 +3409,11 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
if (tokencmp(tokens[0], "suspend")) {
getTransaction(db, tr, options, intrans);
if (tokens.size() == 1) {
Standalone<RangeResultRef> kvs = wait( makeInterruptable( tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces"), LiteralStringRef("\xff\xff\xff")), 1) ) );
Standalone<RangeResultRef> kvs = wait(
makeInterruptable(tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"),
LiteralStringRef("\xff\xff/worker_interfaces0")),
CLIENT_KNOBS->TOO_MANY)));
ASSERT(!kvs.more);
Reference<FlowLock> connectLock(new FlowLock(CLIENT_KNOBS->CLI_CONNECT_PARALLELISM));
std::vector<Future<Void>> addInterfs;
for( auto it : kvs ) {
@ -3439,7 +3454,13 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
int64_t timeout_ms = seconds*1000;
tr->setOption(FDBTransactionOptions::TIMEOUT, StringRef((uint8_t *)&timeout_ms, sizeof(int64_t)));
for(int i = 2; i < tokens.size(); i++) {
tr->set(LiteralStringRef("\xff\xff/suspend_worker"), address_interface[tokens[i]].first);
if (db->apiVersionAtLeast(700))
BinaryReader::fromStringRef<ClientWorkerInterface>(
address_interface[tokens[i]].first, IncludeVersion())
.reboot.send(RebootRequest(false, false, timeout_ms));
else
tr->set(LiteralStringRef("\xff\xff/suspend_worker"),
address_interface[tokens[i]].first);
}
printf("Attempted to suspend %zu processes\n", tokens.size() - 2);
}
@ -3691,13 +3712,17 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
continue;
}
getTransaction(db, tr, options, intrans);
Standalone<RangeResultRef> kvs = wait(makeInterruptable(
tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces"),
LiteralStringRef("\xff\xff\xff")),
1)));
Standalone<RangeResultRef> kvs = wait(
makeInterruptable(tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"),
LiteralStringRef("\xff\xff/worker_interfaces0")),
CLIENT_KNOBS->TOO_MANY)));
ASSERT(!kvs.more);
std::map<Key, ClientWorkerInterface> interfaces;
for (const auto& pair : kvs) {
auto ip_port = pair.key.endsWith(LiteralStringRef(":tls")) ? pair.key.removeSuffix(LiteralStringRef(":tls")) : pair.key;
auto ip_port = (pair.key.endsWith(LiteralStringRef(":tls"))
? pair.key.removeSuffix(LiteralStringRef(":tls"))
: pair.key)
.removePrefix(LiteralStringRef("\xff\xff/worker_interfaces/"));
interfaces.emplace(ip_port, BinaryReader::fromStringRef<ClientWorkerInterface>(pair.value, IncludeVersion()));
}
state Key ip_port = tokens[2];
@ -3722,7 +3747,11 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
if (tokencmp(tokens[0], "expensive_data_check")) {
getTransaction(db, tr, options, intrans);
if (tokens.size() == 1) {
Standalone<RangeResultRef> kvs = wait( makeInterruptable( tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces"), LiteralStringRef("\xff\xff\xff")), 1) ) );
Standalone<RangeResultRef> kvs = wait(
makeInterruptable(tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"),
LiteralStringRef("\xff\xff/worker_interfaces0")),
CLIENT_KNOBS->TOO_MANY)));
ASSERT(!kvs.more);
Reference<FlowLock> connectLock(new FlowLock(CLIENT_KNOBS->CLI_CONNECT_PARALLELISM));
std::vector<Future<Void>> addInterfs;
for( auto it : kvs ) {
@ -3744,7 +3773,11 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
printf("\n");
} else if (tokencmp(tokens[1], "all")) {
for( auto it : address_interface ) {
tr->set(LiteralStringRef("\xff\xff/reboot_and_check_worker"), it.second.first);
if (db->apiVersionAtLeast(700))
BinaryReader::fromStringRef<ClientWorkerInterface>(it.second.first, IncludeVersion())
.reboot.send(RebootRequest(false, true));
else
tr->set(LiteralStringRef("\xff\xff/reboot_and_check_worker"), it.second.first);
}
if (address_interface.size() == 0) {
printf("ERROR: no processes to check. You must run the `expensive_data_check command before running `expensive_data_check all.\n");
@ -3762,7 +3795,13 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
if(!is_error) {
for(int i = 1; i < tokens.size(); i++) {
tr->set(LiteralStringRef("\xff\xff/reboot_and_check_worker"), address_interface[tokens[i]].first);
if (db->apiVersionAtLeast(700))
BinaryReader::fromStringRef<ClientWorkerInterface>(
address_interface[tokens[i]].first, IncludeVersion())
.reboot.send(RebootRequest(false, true));
else
tr->set(LiteralStringRef("\xff\xff/reboot_and_check_worker"),
address_interface[tokens[i]].first);
}
printf("Attempted to kill and check %zu processes\n", tokens.size() - 1);
}

View File

@ -339,11 +339,13 @@ public:
double detailedHealthMetricsLastUpdated;
UniqueOrderedOptionList<FDBTransactionOptions> transactionDefaults;
Future<Void> cacheListMonitor;
AsyncTrigger updateCache;
std::vector<std::unique_ptr<SpecialKeyRangeBaseImpl>> specialKeySpaceModules;
std::vector<std::unique_ptr<SpecialKeyRangeReadImpl>> specialKeySpaceModules;
std::unique_ptr<SpecialKeySpace> specialKeySpace;
void registerSpecialKeySpaceModule(SpecialKeySpace::MODULE module, std::unique_ptr<SpecialKeyRangeBaseImpl> impl);
void registerSpecialKeySpaceModule(SpecialKeySpace::MODULE module, SpecialKeySpace::IMPLTYPE type,
std::unique_ptr<SpecialKeyRangeReadImpl> impl);
static bool debugUseTags;
static const std::vector<std::string> debugTransactionTagChoices;

View File

@ -38,7 +38,6 @@
#include "fdbrpc/Replication.h"
#include "flow/actorcompiler.h" // This must be the last #include.
ACTOR static Future<vector<AddressExclusion>> getExcludedServers(Transaction* tr);
bool isInteger(const std::string& s) {
if( s.empty() ) return false;
@ -1264,93 +1263,152 @@ struct AutoQuorumChange : IQuorumChange {
};
Reference<IQuorumChange> autoQuorumChange( int desired ) { return Reference<IQuorumChange>(new AutoQuorumChange(desired)); }
void excludeServers(Transaction& tr, vector<AddressExclusion>& servers, bool failed) {
tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS );
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
tr.setOption( FDBTransactionOptions::LOCK_AWARE );
tr.setOption( FDBTransactionOptions::USE_PROVISIONAL_PROXIES );
std::string excludeVersionKey = deterministicRandom()->randomUniqueID().toString();
auto serversVersionKey = failed ? failedServersVersionKey : excludedServersVersionKey;
tr.addReadConflictRange( singleKeyRange(serversVersionKey) ); //To conflict with parallel includeServers
tr.set( serversVersionKey, excludeVersionKey );
for(auto& s : servers) {
if (failed) {
tr.set( encodeFailedServersKey(s), StringRef() );
} else {
tr.set( encodeExcludedServersKey(s), StringRef() );
}
}
TraceEvent("ExcludeServersCommit").detail("Servers", describe(servers)).detail("ExcludeFailed", failed);
}
ACTOR Future<Void> excludeServers(Database cx, vector<AddressExclusion> servers, bool failed) {
state Transaction tr(cx);
state Key versionKey = BinaryWriter::toValue(deterministicRandom()->randomUniqueID(),Unversioned());
state std::string excludeVersionKey = deterministicRandom()->randomUniqueID().toString();
loop {
try {
tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS );
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
tr.setOption( FDBTransactionOptions::LOCK_AWARE );
tr.setOption( FDBTransactionOptions::USE_PROVISIONAL_PROXIES );
auto serversVersionKey = failed ? failedServersVersionKey : excludedServersVersionKey;
tr.addReadConflictRange( singleKeyRange(serversVersionKey) ); //To conflict with parallel includeServers
tr.set( serversVersionKey, excludeVersionKey );
for(auto& s : servers) {
if (failed) {
tr.set( encodeFailedServersKey(s), StringRef() );
} else {
tr.set( encodeExcludedServersKey(s), StringRef() );
if (cx->apiVersionAtLeast(700)) {
state ReadYourWritesTransaction ryw(cx);
loop {
try{
ryw.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
ryw.set(SpecialKeySpace::getManagementApiCommandOptionSpecialKey(failed ? "failed" : "exclude", "force"), ValueRef());
for(auto& s : servers) {
Key addr = failed ? SpecialKeySpace::getManagementApiCommandPrefix("failed").withSuffix(s.toString())
: SpecialKeySpace::getManagementApiCommandPrefix("exclude").withSuffix(s.toString());
ryw.set(addr, ValueRef());
}
TraceEvent("ExcludeServersSpecialKeySpaceCommit").detail("Servers", describe(servers)).detail("ExcludeFailed", failed);
wait(ryw.commit());
return Void();
} catch (Error& e) {
wait( ryw.onError(e) );
}
}
} else {
state Transaction tr(cx);
loop {
try {
excludeServers(tr, servers, failed);
wait( tr.commit() );
return Void();
} catch (Error& e) {
wait( tr.onError(e) );
}
TraceEvent("ExcludeServersCommit").detail("Servers", describe(servers)).detail("ExcludeFailed", failed);
wait( tr.commit() );
return Void();
} catch (Error& e) {
wait( tr.onError(e) );
}
}
}
ACTOR Future<Void> includeServers(Database cx, vector<AddressExclusion> servers, bool failed) {
state Transaction tr(cx);
state std::string versionKey = deterministicRandom()->randomUniqueID().toString();
loop {
try {
tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS );
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
tr.setOption( FDBTransactionOptions::LOCK_AWARE );
tr.setOption( FDBTransactionOptions::USE_PROVISIONAL_PROXIES );
// includeServers might be used in an emergency transaction, so make sure it is retry-self-conflicting and CAUSAL_WRITE_RISKY
tr.setOption( FDBTransactionOptions::CAUSAL_WRITE_RISKY );
if (failed) {
tr.addReadConflictRange(singleKeyRange(failedServersVersionKey));
tr.set(failedServersVersionKey, versionKey);
} else {
tr.addReadConflictRange(singleKeyRange(excludedServersVersionKey));
tr.set(excludedServersVersionKey, versionKey);
}
for(auto& s : servers ) {
if (!s.isValid()) { // Include all excluded servers if input servers are invalid
if (failed) {
tr.clear(failedServersKeys);
if (cx->apiVersionAtLeast(700)) {
state ReadYourWritesTransaction ryw(cx);
loop {
try {
ryw.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
for (auto& s : servers) {
if (!s.isValid()) {
if (failed) {
ryw.clear(SpecialKeySpace::getManamentApiCommandRange("failed"));
} else {
ryw.clear(SpecialKeySpace::getManamentApiCommandRange("exclude"));
}
} else {
tr.clear(excludedServersKeys);
}
} else if (s.isWholeMachine()) {
// Eliminate both any ip-level exclusion (1.2.3.4) and any
// port-level exclusions (1.2.3.4:5)
// The range ['IP', 'IP;'] was originally deleted. ';' is
// char(':' + 1). This does not work, as other for all
// x between 0 and 9, 'IPx' will also be in this range.
//
// This is why we now make two clears: first only of the ip
// address, the second will delete all ports.
auto addr = failed ? encodeFailedServersKey(s) : encodeExcludedServersKey(s);
tr.clear(singleKeyRange(addr));
tr.clear(KeyRangeRef(addr + ':', addr + char(':' + 1)));
} else {
if (failed) {
tr.clear(encodeFailedServersKey(s));
} else {
tr.clear(encodeExcludedServersKey(s));
Key addr = failed ? SpecialKeySpace::getManagementApiCommandPrefix("failed").withSuffix(s.toString())
: SpecialKeySpace::getManagementApiCommandPrefix("exclude").withSuffix(s.toString());
ryw.clear(addr);
// Eliminate both any ip-level exclusion (1.2.3.4) and any
// port-level exclusions (1.2.3.4:5)
// The range ['IP', 'IP;'] was originally deleted. ';' is
// char(':' + 1). This does not work, as other for all
// x between 0 and 9, 'IPx' will also be in this range.
//
// This is why we now make two clears: first only of the ip
// address, the second will delete all ports.
if (s.isWholeMachine())
ryw.clear(KeyRangeRef(addr.withSuffix(LiteralStringRef(":")), addr.withSuffix(LiteralStringRef(";"))));
}
}
TraceEvent("IncludeServersCommit").detail("Servers", describe(servers)).detail("Failed", failed);
wait( ryw.commit() );
return Void();
} catch (Error& e) {
TraceEvent("IncludeServersError").error(e, true);
wait( ryw.onError(e) );
}
}
} else {
state Transaction tr(cx);
loop {
try {
tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS );
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
tr.setOption( FDBTransactionOptions::LOCK_AWARE );
tr.setOption( FDBTransactionOptions::USE_PROVISIONAL_PROXIES );
TraceEvent("IncludeServersCommit").detail("Servers", describe(servers)).detail("Failed", failed);
// includeServers might be used in an emergency transaction, so make sure it is retry-self-conflicting and CAUSAL_WRITE_RISKY
tr.setOption( FDBTransactionOptions::CAUSAL_WRITE_RISKY );
if (failed) {
tr.addReadConflictRange(singleKeyRange(failedServersVersionKey));
tr.set(failedServersVersionKey, versionKey);
} else {
tr.addReadConflictRange(singleKeyRange(excludedServersVersionKey));
tr.set(excludedServersVersionKey, versionKey);
}
wait( tr.commit() );
return Void();
} catch (Error& e) {
TraceEvent("IncludeServersError").error(e, true);
wait( tr.onError(e) );
for(auto& s : servers ) {
if (!s.isValid()) {
if (failed) {
tr.clear(failedServersKeys);
} else {
tr.clear(excludedServersKeys);
}
} else if (s.isWholeMachine()) {
// Eliminate both any ip-level exclusion (1.2.3.4) and any
// port-level exclusions (1.2.3.4:5)
// The range ['IP', 'IP;'] was originally deleted. ';' is
// char(':' + 1). This does not work, as other for all
// x between 0 and 9, 'IPx' will also be in this range.
//
// This is why we now make two clears: first only of the ip
// address, the second will delete all ports.
auto addr = failed ? encodeFailedServersKey(s) : encodeExcludedServersKey(s);
tr.clear(singleKeyRange(addr));
tr.clear(KeyRangeRef(addr + ':', addr + char(':' + 1)));
} else {
if (failed) {
tr.clear(encodeFailedServersKey(s));
} else {
tr.clear(encodeExcludedServersKey(s));
}
}
}
TraceEvent("IncludeServersCommit").detail("Servers", describe(servers)).detail("Failed", failed);
wait( tr.commit() );
return Void();
} catch (Error& e) {
TraceEvent("IncludeServersError").error(e, true);
wait( tr.onError(e) );
}
}
}
}
@ -1389,7 +1447,7 @@ ACTOR Future<Void> setClass( Database cx, AddressExclusion server, ProcessClass
}
}
ACTOR static Future<vector<AddressExclusion>> getExcludedServers( Transaction* tr ) {
ACTOR Future<vector<AddressExclusion>> getExcludedServers( Transaction* tr ) {
state Standalone<RangeResultRef> r = wait( tr->getRange( excludedServersKeys, CLIENT_KNOBS->TOO_MANY ) );
ASSERT( !r.more && r.size() < CLIENT_KNOBS->TOO_MANY );
state Standalone<RangeResultRef> r2 = wait( tr->getRange( failedServersKeys, CLIENT_KNOBS->TOO_MANY ) );
@ -1559,59 +1617,67 @@ ACTOR Future<int> setDDMode( Database cx, int mode ) {
}
}
ACTOR Future<bool> checkForExcludingServersTxActor(ReadYourWritesTransaction* tr,
std::set<AddressExclusion>* exclusions,
std::set<NetworkAddress>* inProgressExclusion) {
// TODO : replace using ExclusionInProgressRangeImpl in special key space
ASSERT(inProgressExclusion->size() == 0); // Make sure every time it is cleared beforehand
if (!exclusions->size()) return true;
tr->setOption( FDBTransactionOptions::READ_SYSTEM_KEYS );
tr->setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); // necessary?
tr->setOption( FDBTransactionOptions::LOCK_AWARE );
// Just getting a consistent read version proves that a set of tlogs satisfying the exclusions has completed recovery
// Check that there aren't any storage servers with addresses violating the exclusions
Standalone<RangeResultRef> serverList = wait( tr->getRange( serverListKeys, CLIENT_KNOBS->TOO_MANY ) );
ASSERT( !serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY );
state bool ok = true;
for(auto& s : serverList) {
auto addresses = decodeServerListValue( s.value ).getKeyValues.getEndpoint().addresses;
if ( addressExcluded(*exclusions, addresses.address) ) {
ok = false;
inProgressExclusion->insert(addresses.address);
}
if ( addresses.secondaryAddress.present() && addressExcluded(*exclusions, addresses.secondaryAddress.get()) ) {
ok = false;
inProgressExclusion->insert(addresses.secondaryAddress.get());
}
}
if (ok) {
Optional<Standalone<StringRef>> value = wait( tr->get(logsKey) );
ASSERT(value.present());
auto logs = decodeLogsValue(value.get());
for( auto const& log : logs.first ) {
if (log.second == NetworkAddress() || addressExcluded(*exclusions, log.second)) {
ok = false;
inProgressExclusion->insert(log.second);
}
}
for( auto const& log : logs.second ) {
if (log.second == NetworkAddress() || addressExcluded(*exclusions, log.second)) {
ok = false;
inProgressExclusion->insert(log.second);
}
}
}
return ok;
}
ACTOR Future<std::set<NetworkAddress>> checkForExcludingServers(Database cx, vector<AddressExclusion> excl,
bool waitForAllExcluded) {
state std::set<AddressExclusion> exclusions( excl.begin(), excl.end() );
state std::set<NetworkAddress> inProgressExclusion;
if (!excl.size()) return inProgressExclusion;
loop {
state Transaction tr(cx);
state ReadYourWritesTransaction tr(cx);
inProgressExclusion.clear();
try {
tr.setOption( FDBTransactionOptions::READ_SYSTEM_KEYS );
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); // necessary?
tr.setOption( FDBTransactionOptions::LOCK_AWARE );
// Just getting a consistent read version proves that a set of tlogs satisfying the exclusions has completed recovery
// Check that there aren't any storage servers with addresses violating the exclusions
Standalone<RangeResultRef> serverList = wait( tr.getRange( serverListKeys, CLIENT_KNOBS->TOO_MANY ) );
ASSERT( !serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY );
state bool ok = true;
inProgressExclusion.clear();
for(auto& s : serverList) {
auto addresses = decodeServerListValue( s.value ).getKeyValues.getEndpoint().addresses;
if ( addressExcluded(exclusions, addresses.address) ) {
ok = false;
inProgressExclusion.insert(addresses.address);
}
if ( addresses.secondaryAddress.present() && addressExcluded(exclusions, addresses.secondaryAddress.get()) ) {
ok = false;
inProgressExclusion.insert(addresses.secondaryAddress.get());
}
}
if (ok) {
Optional<Standalone<StringRef>> value = wait( tr.get(logsKey) );
ASSERT(value.present());
auto logs = decodeLogsValue(value.get());
for( auto const& log : logs.first ) {
if (log.second == NetworkAddress() || addressExcluded(exclusions, log.second)) {
ok = false;
inProgressExclusion.insert(log.second);
}
}
for( auto const& log : logs.second ) {
if (log.second == NetworkAddress() || addressExcluded(exclusions, log.second)) {
ok = false;
inProgressExclusion.insert(log.second);
}
}
}
bool ok = wait(checkForExcludingServersTxActor(&tr, &exclusions, &inProgressExclusion));
if (ok) return inProgressExclusion;
if (!waitForAllExcluded) break;
@ -1620,7 +1686,6 @@ ACTOR Future<std::set<NetworkAddress>> checkForExcludingServers(Database cx, vec
wait( tr.onError(e) );
}
}
return inProgressExclusion;
}

View File

@ -144,6 +144,7 @@ Reference<IQuorumChange> nameQuorumChange(std::string const& name, Reference<IQu
// Exclude the given set of servers from use as state servers. Returns as soon as the change is durable, without necessarily waiting for
// the servers to be evacuated. A NetworkAddress with a port of 0 means all servers on the given IP.
ACTOR Future<Void> excludeServers( Database cx, vector<AddressExclusion> servers, bool failed = false );
void excludeServers(Transaction& tr, vector<AddressExclusion>& servers, bool failed = false);
// Remove the given servers from the exclusion list. A NetworkAddress with a port of 0 means all servers on the given IP. A NetworkAddress() means
// all servers (don't exclude anything)
@ -154,12 +155,16 @@ ACTOR Future<Void> setClass( Database cx, AddressExclusion server, ProcessClas
// Get the current list of excluded servers
ACTOR Future<vector<AddressExclusion>> getExcludedServers( Database cx );
ACTOR Future<vector<AddressExclusion>> getExcludedServers( Transaction* tr);
// Check for the given, previously excluded servers to be evacuated (no longer used for state). If waitForExclusion is
// true, this actor returns once it is safe to shut down all such machines without impacting fault tolerance, until and
// unless any of them are explicitly included with includeServers()
ACTOR Future<std::set<NetworkAddress>> checkForExcludingServers(Database cx, vector<AddressExclusion> servers,
bool waitForAllExcluded);
ACTOR Future<bool> checkForExcludingServersTxActor(ReadYourWritesTransaction* tr,
std::set<AddressExclusion>* exclusions,
std::set<NetworkAddress>* inProgressExclusion);
// Gets a list of all workers in the cluster (excluding testers)
ACTOR Future<vector<ProcessData>> getWorkers( Database cx );

View File

@ -696,15 +696,16 @@ Future<HealthMetrics> DatabaseContext::getHealthMetrics(bool detailed = false) {
return getHealthMetricsActor(this, detailed);
}
void DatabaseContext::registerSpecialKeySpaceModule(SpecialKeySpace::MODULE module, std::unique_ptr<SpecialKeyRangeBaseImpl> impl) {
specialKeySpace->registerKeyRange(module, impl->getKeyRange(), impl.get());
void DatabaseContext::registerSpecialKeySpaceModule(SpecialKeySpace::MODULE module, SpecialKeySpace::IMPLTYPE type,
std::unique_ptr<SpecialKeyRangeReadImpl> impl) {
specialKeySpace->registerKeyRange(module, type, impl->getKeyRange(), impl.get());
specialKeySpaceModules.push_back(std::move(impl));
}
ACTOR Future<Standalone<RangeResultRef>> getWorkerInterfaces(Reference<ClusterConnectionFile> clusterFile);
ACTOR Future<Optional<Value>> getJSON(Database db);
struct WorkerInterfacesSpecialKeyImpl : SpecialKeyRangeBaseImpl {
struct WorkerInterfacesSpecialKeyImpl : SpecialKeyRangeReadImpl {
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override {
if (ryw->getDatabase().getPtr() && ryw->getDatabase()->getConnectionFile()) {
Key prefix = Key(getKeyRange().begin);
@ -724,10 +725,10 @@ struct WorkerInterfacesSpecialKeyImpl : SpecialKeyRangeBaseImpl {
}
}
explicit WorkerInterfacesSpecialKeyImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {}
explicit WorkerInterfacesSpecialKeyImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
};
struct SingleSpecialKeyImpl : SpecialKeyRangeBaseImpl {
struct SingleSpecialKeyImpl : SpecialKeyRangeReadImpl {
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override {
ASSERT(kr.contains(k));
return map(f(ryw), [k = k](Optional<Value> v) {
@ -740,7 +741,7 @@ struct SingleSpecialKeyImpl : SpecialKeyRangeBaseImpl {
}
SingleSpecialKeyImpl(KeyRef k, const std::function<Future<Optional<Value>>(ReadYourWritesTransaction*)>& f)
: SpecialKeyRangeBaseImpl(singleKeyRange(k)), k(k), f(f) {}
: SpecialKeyRangeReadImpl(singleKeyRange(k)), k(k), f(f) {}
private:
Key k;
@ -894,20 +895,52 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
monitorMasterProxiesInfoChange = monitorMasterProxiesChange(clientInfo, &masterProxiesChangeTrigger);
clientStatusUpdater.actor = clientStatusUpdateActor(this);
cacheListMonitor = monitorCacheList(this);
if (apiVersionAtLeast(700)) {
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::ERRORMSG, SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<SingleSpecialKeyImpl>(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin,
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
if (ryw->getSpecialKeySpaceErrorMsg().present())
return Optional<Value>(ryw->getSpecialKeySpaceErrorMsg().get());
else
return Optional<Value>();
}));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ManagementCommandsOptionsImpl>(
KeyRangeRef(LiteralStringRef("options/"), LiteralStringRef("options0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<ExcludeServersRangeImpl>(SpecialKeySpace::getManamentApiCommandRange("exclude")));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<FailedServersRangeImpl>(SpecialKeySpace::getManamentApiCommandRange("failed")));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<ExclusionInProgressRangeImpl>(
KeyRangeRef(LiteralStringRef("inProgressExclusion/"), LiteralStringRef("inProgressExclusion0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
}
if (apiVersionAtLeast(630)) {
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, std::make_unique<ConflictingKeysImpl>(conflictingKeysRange));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, std::make_unique<ReadConflictRangeImpl>(readConflictRangeKeysRange));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, std::make_unique<WriteConflictRangeImpl>(writeConflictRangeKeysRange));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::METRICS,
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<ConflictingKeysImpl>(conflictingKeysRange));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<ReadConflictRangeImpl>(readConflictRangeKeysRange));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<WriteConflictRangeImpl>(writeConflictRangeKeysRange));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::METRICS, SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<DDStatsRangeImpl>(ddStatsRange));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::METRICS,
SpecialKeySpace::MODULE::METRICS, SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<HealthMetricsRangeImpl>(KeyRangeRef(LiteralStringRef("\xff\xff/metrics/health/"),
LiteralStringRef("\xff\xff/metrics/health0"))));
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::WORKERINTERFACE, std::make_unique<WorkerInterfacesSpecialKeyImpl>(KeyRangeRef(
LiteralStringRef("\xff\xff/worker_interfaces/"), LiteralStringRef("\xff\xff/worker_interfaces0"))));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::STATUSJSON,
SpecialKeySpace::MODULE::WORKERINTERFACE, SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<WorkerInterfacesSpecialKeyImpl>(KeyRangeRef(
LiteralStringRef("\xff\xff/worker_interfaces/"), LiteralStringRef("\xff\xff/worker_interfaces0"))));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::STATUSJSON, SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<SingleSpecialKeyImpl>(LiteralStringRef("\xff\xff/status/json"),
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
if (ryw->getDatabase().getPtr() &&
@ -918,7 +951,7 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
}
}));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::CLUSTERFILEPATH,
SpecialKeySpace::MODULE::CLUSTERFILEPATH, SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<SingleSpecialKeyImpl>(
LiteralStringRef("\xff\xff/cluster_file_path"),
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {
@ -934,7 +967,7 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
}));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::CONNECTIONSTRING,
SpecialKeySpace::MODULE::CONNECTIONSTRING, SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<SingleSpecialKeyImpl>(
LiteralStringRef("\xff\xff/connection_string"),
[](ReadYourWritesTransaction* ryw) -> Future<Optional<Value>> {

View File

@ -1045,6 +1045,9 @@ public:
try {
ryw->commitStarted = true;
if (ryw->options.specialKeySpaceChangeConfiguration)
wait(ryw->getDatabase()->specialKeySpace->commit(ryw));
Future<Void> ready = ryw->reading;
wait( ryw->resetPromise.getFuture() || ready );
@ -1160,7 +1163,8 @@ public:
ReadYourWritesTransaction::ReadYourWritesTransaction(Database const& cx)
: cache(&arena), writes(&arena), tr(cx), retries(0), approximateSize(0), creationTime(now()), commitStarted(false),
options(tr), deferredError(cx->deferredError), versionStampFuture(tr.getVersionstamp()) {
options(tr), deferredError(cx->deferredError), versionStampFuture(tr.getVersionstamp()),
specialKeySpaceWriteMap(std::make_pair(false, Optional<Value>()), specialKeys.end) {
std::copy(cx.getTransactionDefaults().begin(), cx.getTransactionDefaults().end(),
std::back_inserter(persistentOptions));
applyPersistentOptions();
@ -1755,24 +1759,34 @@ void ReadYourWritesTransaction::atomicOp( const KeyRef& key, const ValueRef& ope
}
void ReadYourWritesTransaction::set( const KeyRef& key, const ValueRef& value ) {
if (key.startsWith(systemKeys.end)) {
if (key == LiteralStringRef("\xff\xff/reboot_worker")){
BinaryReader::fromStringRef<ClientWorkerInterface>(value, IncludeVersion()).reboot.send( RebootRequest() );
return;
}
if (key == LiteralStringRef("\xff\xff/suspend_worker")){
BinaryReader::fromStringRef<ClientWorkerInterface>(value, IncludeVersion()).reboot.send( RebootRequest(false, false, options.timeoutInSeconds) );
return;
}
if (key == LiteralStringRef("\xff\xff/reboot_and_check_worker")){
BinaryReader::fromStringRef<ClientWorkerInterface>(value, IncludeVersion()).reboot.send( RebootRequest(false, true) );
return;
}
}
if (key == metadataVersionKey) {
throw client_invalid_operation();
}
if (specialKeys.contains(key)) {
if (getDatabase()->apiVersionAtLeast(700)) {
return getDatabase()->specialKeySpace->set(this, key, value);
} else {
// These three special keys are deprecated in 7.0 and an alternative C API is added
// TODO : Rewrite related code using C api
if (key == LiteralStringRef("\xff\xff/reboot_worker")) {
BinaryReader::fromStringRef<ClientWorkerInterface>(value, IncludeVersion())
.reboot.send(RebootRequest());
return;
}
if (key == LiteralStringRef("\xff\xff/suspend_worker")){
BinaryReader::fromStringRef<ClientWorkerInterface>(value, IncludeVersion())
.reboot.send(RebootRequest(false, false, options.timeoutInSeconds));
return;
}
if (key == LiteralStringRef("\xff\xff/reboot_and_check_worker")) {
BinaryReader::fromStringRef<ClientWorkerInterface>(value, IncludeVersion())
.reboot.send(RebootRequest(false, true));
return;
}
}
}
bool addWriteConflict = !options.getAndResetWriteConflictDisabled();
if(checkUsedDuringCommit()) {
@ -1808,6 +1822,12 @@ void ReadYourWritesTransaction::clear( const KeyRangeRef& range ) {
throw used_during_commit();
}
if (specialKeys.contains(range)) {
if (getDatabase()->apiVersionAtLeast(700)) {
return getDatabase()->specialKeySpace->clear(this, range);
}
}
KeyRef maxKey = getMaxWriteKey();
if(range.begin > maxKey || range.end > maxKey)
throw key_outside_legal_range();
@ -1847,6 +1867,12 @@ void ReadYourWritesTransaction::clear( const KeyRef& key ) {
throw used_during_commit();
}
if (specialKeys.contains(key)) {
if (getDatabase()->apiVersionAtLeast(700)) {
return getDatabase()->specialKeySpace->clear(this, key);
}
}
if(key >= getMaxWriteKey())
throw key_outside_legal_range();
@ -2023,6 +2049,14 @@ void ReadYourWritesTransaction::setOptionImpl( FDBTransactionOptions::Option opt
case FDBTransactionOptions::SPECIAL_KEY_SPACE_RELAXED:
validateOptionValue(value, false);
options.specialKeySpaceRelaxed = true;
break;
case FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES:
validateOptionValue(value, false);
options.specialKeySpaceChangeConfiguration = true;
// By default, it allows to read system keys
// More options will be implicitly enabled if needed when doing set or clear
options.readSystemKeys = true;
break;
default:
break;
}
@ -2054,6 +2088,7 @@ void ReadYourWritesTransaction::operator=(ReadYourWritesTransaction&& r) noexcep
nativeReadRanges = std::move(r.nativeReadRanges);
nativeWriteRanges = std::move(r.nativeWriteRanges);
versionStampKeys = std::move(r.versionStampKeys);
specialKeySpaceWriteMap = std::move(r.specialKeySpaceWriteMap);
}
ReadYourWritesTransaction::ReadYourWritesTransaction(ReadYourWritesTransaction&& r) noexcept
@ -2072,6 +2107,7 @@ ReadYourWritesTransaction::ReadYourWritesTransaction(ReadYourWritesTransaction&&
nativeReadRanges = std::move(r.nativeReadRanges);
nativeWriteRanges = std::move(r.nativeWriteRanges);
versionStampKeys = std::move(r.versionStampKeys);
specialKeySpaceWriteMap = std::move(r.specialKeySpaceWriteMap);
}
Future<Void> ReadYourWritesTransaction::onError(Error const& e) {
@ -2109,6 +2145,9 @@ void ReadYourWritesTransaction::resetRyow() {
versionStampKeys = VectorRef<KeyRef>();
nativeReadRanges = Standalone<VectorRef<KeyRangeRef>>();
nativeWriteRanges = Standalone<VectorRef<KeyRangeRef>>();
specialKeySpaceWriteMap =
KeyRangeMap<std::pair<bool, Optional<Value>>>(std::make_pair(false, Optional<Value>()), specialKeys.end);
specialKeySpaceErrorMsg.reset();
watchMap.clear();
reading = AndFuture();
approximateSize = 0;

View File

@ -38,6 +38,7 @@ struct ReadYourWritesTransactionOptions {
bool debugRetryLogging : 1;
bool disableUsedDuringCommitProtection : 1;
bool specialKeySpaceRelaxed : 1;
bool specialKeySpaceChangeConfiguration : 1;
double timeoutInSeconds;
int maxRetries;
int snapshotRywEnabled;
@ -148,6 +149,13 @@ public:
Standalone<RangeResultRef> getWriteConflictRangeIntersecting(KeyRangeRef kr);
bool specialKeySpaceRelaxed() const { return options.specialKeySpaceRelaxed; }
bool specialKeySpaceChangeConfiguration() const { return options.specialKeySpaceChangeConfiguration; }
KeyRangeMap<std::pair<bool, Optional<Value>>>& getSpecialKeySpaceWriteMap() { return specialKeySpaceWriteMap; }
bool readYourWritesDisabled() const { return options.readYourWritesDisabled; }
const Optional<std::string>& getSpecialKeySpaceErrorMsg() { return specialKeySpaceErrorMsg; }
void setSpecialKeySpaceErrorMsg(const std::string& msg) { specialKeySpaceErrorMsg = msg; }
Transaction& getTransaction() { return tr; }
private:
friend class RYWImpl;
@ -176,6 +184,9 @@ private:
Reference<TransactionDebugInfo> transactionDebugInfo;
KeyRangeMap<std::pair<bool, Optional<Value>>> specialKeySpaceWriteMap;
Optional<std::string> specialKeySpaceErrorMsg;
void resetTimeout();
void updateConflictMap( KeyRef const& key, WriteMap::iterator& it ); // pre: it.segmentContains(key)
void updateConflictMap( KeyRangeRef const& keys, WriteMap::iterator& it ); // pre: it.segmentContains(keys.begin), keys are already inside this->arena

View File

@ -922,3 +922,11 @@ const KeyRef JSONSchemas::aggregateHealthSchema = LiteralStringRef(R"""(
"worst_log_queue": 156
}
)""");
const KeyRef JSONSchemas::managementApiErrorSchema = LiteralStringRef(R"""(
{
"retriable": false,
"command": "exclude",
"message": "The reason of the error"
}
)""");

View File

@ -34,6 +34,7 @@ struct JSONSchemas {
static const KeyRef logHealthSchema;
static const KeyRef storageHealthSchema;
static const KeyRef aggregateHealthSchema;
static const KeyRef managementApiErrorSchema;
};
#endif /* FDBCLIENT_SCHEMAS_H */

View File

@ -20,6 +20,8 @@
#include "fdbclient/SpecialKeySpace.actor.h"
#include "flow/UnitTest.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/StatusClient.h"
#include "flow/actorcompiler.h" // This must be the last #include.
std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToBoundary = {
@ -31,16 +33,28 @@ std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToB
{ SpecialKeySpace::MODULE::CONNECTIONSTRING, singleKeyRange(LiteralStringRef("\xff\xff/connection_string")) },
{ SpecialKeySpace::MODULE::CLUSTERFILEPATH, singleKeyRange(LiteralStringRef("\xff\xff/cluster_file_path")) },
{ SpecialKeySpace::MODULE::METRICS,
KeyRangeRef(LiteralStringRef("\xff\xff/metrics/"), LiteralStringRef("\xff\xff/metrics0")) }
KeyRangeRef(LiteralStringRef("\xff\xff/metrics/"), LiteralStringRef("\xff\xff/metrics0")) },
{ SpecialKeySpace::MODULE::MANAGEMENT,
KeyRangeRef(LiteralStringRef("\xff\xff/management/"), LiteralStringRef("\xff\xff/management0")) },
{ SpecialKeySpace::MODULE::ERRORMSG, singleKeyRange(LiteralStringRef("\xff\xff/error_message")) }
};
std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandToRange = {
{ "exclude", KeyRangeRef(LiteralStringRef("excluded/"), LiteralStringRef("excluded0"))
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
{ "failed", KeyRangeRef(LiteralStringRef("failed/"), LiteralStringRef("failed0"))
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }
};
std::unordered_set<std::string> SpecialKeySpace::options = { "exclude/force", "failed/force" };
// This function will move the given KeySelector as far as possible to the standard form:
// orEqual == false && offset == 1 (Standard form)
// If the corresponding key is not in the underlying key range, it will move over the range
// The cache object is used to cache the first read result from the rpc call during the key resolution,
// then when we need to do key resolution or result filtering,
// we, instead of rpc call, read from this cache object have consistent results
ACTOR Future<Void> moveKeySelectorOverRangeActor(const SpecialKeyRangeBaseImpl* skrImpl, ReadYourWritesTransaction* ryw,
ACTOR Future<Void> moveKeySelectorOverRangeActor(const SpecialKeyRangeReadImpl* skrImpl, ReadYourWritesTransaction* ryw,
KeySelector* ks, Optional<Standalone<RangeResultRef>>* cache) {
ASSERT(!ks->orEqual); // should be removed before calling
ASSERT(ks->offset != 1); // never being called if KeySelector is already normalized
@ -91,7 +105,9 @@ ACTOR Future<Void> moveKeySelectorOverRangeActor(const SpecialKeyRangeBaseImpl*
ks->setKey(KeyRef(ks->arena(), result[ks->offset - 1].key));
ks->offset = 1;
} else {
ks->setKey(KeyRef(ks->arena(), keyAfter(result[result.size() - 1].key)));
ks->setKey(KeyRef(
ks->arena(),
keyAfter(result[result.size() - 1].key))); // TODO : the keyAfter will just return if key == \xff\xff
ks->offset -= result.size();
}
}
@ -115,9 +131,9 @@ ACTOR Future<Void> normalizeKeySelectorActor(SpecialKeySpace* sks, ReadYourWrite
KeyRangeRef boundary, int* actualOffset,
Standalone<RangeResultRef>* result,
Optional<Standalone<RangeResultRef>>* cache) {
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::iterator iter =
ks->offset < 1 ? sks->getImpls().rangeContainingKeyBefore(ks->getKey())
: sks->getImpls().rangeContaining(ks->getKey());
state RangeMap<Key, SpecialKeyRangeReadImpl*, KeyRangeRef>::iterator iter =
ks->offset < 1 ? sks->getReadImpls().rangeContainingKeyBefore(ks->getKey())
: sks->getReadImpls().rangeContaining(ks->getKey());
while ((ks->offset < 1 && iter->begin() > boundary.begin) || (ks->offset > 1 && iter->begin() < boundary.end)) {
if (iter->value() != nullptr) {
wait(moveKeySelectorOverRangeActor(iter->value(), ryw, ks, cache));
@ -141,6 +157,30 @@ ACTOR Future<Void> normalizeKeySelectorActor(SpecialKeySpace* sks, ReadYourWrite
return Void();
}
SpecialKeySpace::SpecialKeySpace(KeyRef spaceStartKey, KeyRef spaceEndKey, bool testOnly)
: range(KeyRangeRef(spaceStartKey, spaceEndKey)), readImpls(nullptr, spaceEndKey), writeImpls(nullptr, spaceEndKey),
modules(testOnly ? SpecialKeySpace::MODULE::TESTONLY : SpecialKeySpace::MODULE::UNKNOWN, spaceEndKey) {
// Default begin of KeyRangeMap is Key(), insert the range to update start key
readImpls.insert(range, nullptr);
writeImpls.insert(range, nullptr);
if (!testOnly) modulesBoundaryInit(); // testOnly is used in the correctness workload
}
void SpecialKeySpace::modulesBoundaryInit() {
for (const auto& pair : moduleToBoundary) {
ASSERT(range.contains(pair.second));
// Make sure the module is not overlapping with any registered read modules
// Note: same like ranges, one module's end cannot be another module's start, relax the condition if needed
ASSERT(modules.rangeContaining(pair.second.begin) == modules.rangeContaining(pair.second.end) &&
modules[pair.second.begin] == SpecialKeySpace::MODULE::UNKNOWN);
modules.insert(pair.second, pair.first);
// Note: Due to underlying implementation, the insertion here is important to make cross_module_read being
// handled correctly
readImpls.insert(pair.second, nullptr);
writeImpls.insert(pair.second, nullptr);
}
}
ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::checkRYWValid(SpecialKeySpace* sks,
ReadYourWritesTransaction* ryw,
KeySelector begin, KeySelector end,
@ -164,7 +204,7 @@ ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::getRangeAggregationAct
// KeySelector, GetRangeLimits and reverse are all handled here
state Standalone<RangeResultRef> result;
state Standalone<RangeResultRef> pairs;
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::iterator iter;
state RangeMap<Key, SpecialKeyRangeReadImpl*, KeyRangeRef>::iterator iter;
state int actualBeginOffset;
state int actualEndOffset;
state KeyRangeRef moduleBoundary;
@ -203,8 +243,8 @@ ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::getRangeAggregationAct
TEST(true);
return result;
}
state RangeMap<Key, SpecialKeyRangeBaseImpl*, KeyRangeRef>::Ranges ranges =
sks->impls.intersectingRanges(KeyRangeRef(begin.getKey(), end.getKey()));
state RangeMap<Key, SpecialKeyRangeReadImpl*, KeyRangeRef>::Ranges ranges =
sks->getReadImpls().intersectingRanges(KeyRangeRef(begin.getKey(), end.getKey()));
// TODO : workaround to write this two together to make the code compact
// The issue here is boost::iterator_range<> doest not provide rbegin(), rend()
iter = reverse ? ranges.end() : ranges.begin();
@ -226,6 +266,7 @@ ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::getRangeAggregationAct
result.arena().dependsOn(pairs.arena());
// limits handler
for (int i = pairs.size() - 1; i >= 0; --i) {
ASSERT(iter->range().contains(pairs[i].key));
result.push_back(result.arena(), pairs[i]);
// Note : behavior here is even the last k-v pair makes total bytes larger than specified, it's still
// returned. In other words, the total size of the returned value (less the last entry) will be less
@ -255,6 +296,7 @@ ACTOR Future<Standalone<RangeResultRef>> SpecialKeySpace::getRangeAggregationAct
result.arena().dependsOn(pairs.arena());
// limits handler
for (int i = 0; i < pairs.size(); ++i) {
ASSERT(iter->range().contains(pairs[i].key));
result.push_back(result.arena(), pairs[i]);
// Note : behavior here is even the last k-v pair makes total bytes larger than specified, it's still
// returned. In other words, the total size of the returned value (less the last entry) will be less
@ -309,7 +351,112 @@ Future<Optional<Value>> SpecialKeySpace::get(ReadYourWritesTransaction* ryw, con
return getActor(this, ryw, key);
}
ReadConflictRangeImpl::ReadConflictRangeImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {}
void SpecialKeySpace::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) {
if (!ryw->specialKeySpaceChangeConfiguration()) throw special_keys_write_disabled();
auto impl = writeImpls[key];
if (impl == nullptr) {
TraceEvent(SevDebug, "SpecialKeySpaceNoWriteModuleFound")
.detail("Key", key.toString())
.detail("Value", value.toString());
throw special_keys_no_write_module_found();
}
return impl->set(ryw, key, value);
}
void SpecialKeySpace::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) {
if (!ryw->specialKeySpaceChangeConfiguration()) throw special_keys_write_disabled();
if (range.empty()) return;
auto begin = writeImpls[range.begin];
auto end = writeImpls.rangeContainingKeyBefore(range.end)->value();
if (begin != end) {
TraceEvent(SevDebug, "SpecialKeySpaceCrossModuleClear").detail("Range", range.toString());
throw special_keys_cross_module_clear(); // ban cross module clear
} else if (begin == nullptr) {
TraceEvent(SevDebug, "SpecialKeySpaceNoWriteModuleFound").detail("Range", range.toString());
throw special_keys_no_write_module_found();
}
return begin->clear(ryw, range);
}
void SpecialKeySpace::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) {
if (!ryw->specialKeySpaceChangeConfiguration()) throw special_keys_write_disabled();
auto impl = writeImpls[key];
if (impl == nullptr) throw special_keys_no_write_module_found();
return impl->clear(ryw, key);
}
void SpecialKeySpace::registerKeyRange(SpecialKeySpace::MODULE module, SpecialKeySpace::IMPLTYPE type,
const KeyRangeRef& kr, SpecialKeyRangeReadImpl* impl) {
// module boundary check
if (module == SpecialKeySpace::MODULE::TESTONLY)
ASSERT(normalKeys.contains(kr));
else
ASSERT(moduleToBoundary.at(module).contains(kr));
// make sure the registered range is not overlapping with existing ones
// Note: kr.end should not be the same as another range's begin, although it should work even they are the same
for (auto iter = readImpls.rangeContaining(kr.begin); true; ++iter) {
ASSERT(iter->value() == nullptr);
if (iter == readImpls.rangeContaining(kr.end))
break; // Note: relax the condition that the end can be another range's start, if needed
}
readImpls.insert(kr, impl);
// if rw, it means the module can do both read and write
if (type == SpecialKeySpace::IMPLTYPE::READWRITE) {
// since write impls are always subset of read impls,
// no need to check overlapped registration
auto rwImpl = dynamic_cast<SpecialKeyRangeRWImpl*>(impl);
ASSERT(rwImpl);
writeImpls.insert(kr, rwImpl);
}
}
Key SpecialKeySpace::decode(const KeyRef& key) {
auto impl = writeImpls[key];
ASSERT(impl != nullptr);
return impl->decode(key);
}
KeyRange SpecialKeySpace::decode(const KeyRangeRef& kr) {
// Only allow to decode key range in the same underlying impl range
auto begin = writeImpls.rangeContaining(kr.begin);
ASSERT(begin->value() != nullptr);
auto end = writeImpls.rangeContainingKeyBefore(kr.end);
ASSERT(begin == end);
return KeyRangeRef(begin->value()->decode(kr.begin), begin->value()->decode(kr.end));
}
ACTOR Future<Void> commitActor(SpecialKeySpace* sks, ReadYourWritesTransaction* ryw) {
state RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::Ranges ranges =
ryw->getSpecialKeySpaceWriteMap().containedRanges(specialKeys);
state RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::iterator iter = ranges.begin();
state std::set<SpecialKeyRangeRWImpl*> writeModulePtrs;
while (iter != ranges.end()) {
std::pair<bool, Optional<Value>> entry = iter->value();
if (entry.first) {
auto modulePtr = sks->getRWImpls().rangeContaining(iter->begin())->value();
writeModulePtrs.insert(modulePtr);
}
++iter;
}
state std::set<SpecialKeyRangeRWImpl*>::const_iterator it;
for (it = writeModulePtrs.begin(); it != writeModulePtrs.end(); ++it) {
Optional<std::string> msg = wait((*it)->commit(ryw));
if (msg.present()) {
ryw->setSpecialKeySpaceErrorMsg(msg.get());
TraceEvent(SevDebug, "SpecialKeySpaceManagemetnAPIError")
.detail("Reason", msg.get())
.detail("Range", (*it)->getKeyRange().toString());
throw special_keys_api_failure();
}
}
return Void();
}
Future<Void> SpecialKeySpace::commit(ReadYourWritesTransaction* ryw) {
return commitActor(this, ryw);
}
ReadConflictRangeImpl::ReadConflictRangeImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
ACTOR static Future<Standalone<RangeResultRef>> getReadConflictRangeImpl(ReadYourWritesTransaction* ryw, KeyRange kr) {
wait(ryw->pendingReads());
@ -321,14 +468,14 @@ Future<Standalone<RangeResultRef>> ReadConflictRangeImpl::getRange(ReadYourWrite
return getReadConflictRangeImpl(ryw, kr);
}
WriteConflictRangeImpl::WriteConflictRangeImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {}
WriteConflictRangeImpl::WriteConflictRangeImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
Future<Standalone<RangeResultRef>> WriteConflictRangeImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr) const {
return ryw->getWriteConflictRangeIntersecting(kr);
}
ConflictingKeysImpl::ConflictingKeysImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {}
ConflictingKeysImpl::ConflictingKeysImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
Future<Standalone<RangeResultRef>> ConflictingKeysImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
Standalone<RangeResultRef> result;
@ -374,3 +521,439 @@ DDStatsRangeImpl::DDStatsRangeImpl(KeyRangeRef kr) : SpecialKeyRangeAsyncImpl(kr
Future<Standalone<RangeResultRef>> DDStatsRangeImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
return ddMetricsGetRangeActor(ryw, kr);
}
Key SpecialKeySpace::getManagementApiCommandOptionSpecialKey(const std::string& command, const std::string& option) {
Key prefix = LiteralStringRef("options/").withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin);
auto pair = command + "/" + option;
ASSERT(options.find(pair) != options.end());
return prefix.withSuffix(pair);
}
ManagementCommandsOptionsImpl::ManagementCommandsOptionsImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
Future<Standalone<RangeResultRef>> ManagementCommandsOptionsImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr) const {
Standalone<RangeResultRef> result;
// Since we only have limit number of options, a brute force loop here is enough
for (const auto& option : SpecialKeySpace::getManagementApiOptionsSet()) {
auto key = getKeyRange().begin.withSuffix(option);
// ignore all invalid keys
auto r = ryw->getSpecialKeySpaceWriteMap()[key];
if (kr.contains(key) && r.first && r.second.present())
result.push_back(result.arena(), KeyValueRef(key, ValueRef()));
}
return result;
}
void ManagementCommandsOptionsImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) {
std::string option = key.removePrefix(getKeyRange().begin).toString();
// ignore all invalid keys
if (SpecialKeySpace::getManagementApiOptionsSet().find(option) !=
SpecialKeySpace::getManagementApiOptionsSet().end()) {
TraceEvent(SevDebug, "ManagementApiOption").detail("Option", option).detail("Key", key);
ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>(value)));
}
}
void ManagementCommandsOptionsImpl::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) {
ryw->getSpecialKeySpaceWriteMap().rawErase(range);
}
void ManagementCommandsOptionsImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) {
std::string option = key.removePrefix(getKeyRange().begin).toString();
// ignore all invalid keys
if (SpecialKeySpace::getManagementApiOptionsSet().find(option) !=
SpecialKeySpace::getManagementApiOptionsSet().end()) {
ryw->getSpecialKeySpaceWriteMap().rawErase(singleKeyRange(key));
}
}
Key ManagementCommandsOptionsImpl::decode(const KeyRef& key) const {
// Should never be used
ASSERT(false);
return key;
}
Key ManagementCommandsOptionsImpl::encode(const KeyRef& key) const {
// Should never be used
ASSERT(false);
return key;
}
Future<Optional<std::string>> ManagementCommandsOptionsImpl::commit(ReadYourWritesTransaction* ryw) {
// Nothing to do, keys should be used by other impls' commit callback
return Optional<std::string>();
}
// read from rwModule
ACTOR Future<Standalone<RangeResultRef>> rwModuleGetRangeActor(ReadYourWritesTransaction* ryw,
const SpecialKeyRangeRWImpl* impl, KeyRangeRef kr) {
state KeyRangeRef range = impl->getKeyRange();
Standalone<RangeResultRef> resultWithoutPrefix =
wait(ryw->getRange(ryw->getDatabase()->specialKeySpace->decode(kr), CLIENT_KNOBS->TOO_MANY));
ASSERT(!resultWithoutPrefix.more && resultWithoutPrefix.size() < CLIENT_KNOBS->TOO_MANY);
Standalone<RangeResultRef> result;
if (ryw->readYourWritesDisabled()) {
for (const KeyValueRef& kv : resultWithoutPrefix)
result.push_back_deep(result.arena(), KeyValueRef(impl->encode(kv.key), kv.value));
} else {
RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::Ranges ranges =
ryw->getSpecialKeySpaceWriteMap().containedRanges(range);
RangeMap<Key, std::pair<bool, Optional<Value>>, KeyRangeRef>::iterator iter = ranges.begin();
int index = 0;
while (iter != ranges.end()) {
// add all previous entries into result
Key rk = impl->encode(resultWithoutPrefix[index].key);
while (index < resultWithoutPrefix.size() && rk < iter->begin()) {
result.push_back_deep(result.arena(), KeyValueRef(rk, resultWithoutPrefix[index].value));
++index;
}
std::pair<bool, Optional<Value>> entry = iter->value();
if (entry.first) {
// add the writen entries if exists
if (entry.second.present()) {
result.push_back_deep(result.arena(), KeyValueRef(iter->begin(), entry.second.get()));
}
// move index to skip all entries in the iter->range
while (index < resultWithoutPrefix.size() &&
iter->range().contains(impl->encode(resultWithoutPrefix[index].key)))
++index;
}
++iter;
}
// add all remaining entries into result
while (index < resultWithoutPrefix.size()) {
const KeyValueRef& kv = resultWithoutPrefix[index];
result.push_back_deep(result.arena(), KeyValueRef(impl->encode(kv.key), kv.value));
++index;
}
}
return result;
}
ExcludeServersRangeImpl::ExcludeServersRangeImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
Future<Standalone<RangeResultRef>> ExcludeServersRangeImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr) const {
return rwModuleGetRangeActor(ryw, this, kr);
}
void ExcludeServersRangeImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) {
ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>(value)));
}
void ExcludeServersRangeImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) {
ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>()));
}
void ExcludeServersRangeImpl::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) {
ryw->getSpecialKeySpaceWriteMap().insert(range, std::make_pair(true, Optional<Value>()));
}
Key ExcludeServersRangeImpl::decode(const KeyRef& key) const {
return key.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)
.withPrefix(LiteralStringRef("\xff/conf/"));
}
Key ExcludeServersRangeImpl::encode(const KeyRef& key) const {
return key.removePrefix(LiteralStringRef("\xff/conf/"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin);
}
bool parseNetWorkAddrFromKeys(ReadYourWritesTransaction* ryw, bool failed, std::vector<AddressExclusion>& addresses,
std::set<AddressExclusion>& exclusions, Optional<std::string>& msg) {
KeyRangeRef range = failed ? SpecialKeySpace::getManamentApiCommandRange("failed")
: SpecialKeySpace::getManamentApiCommandRange("exclude");
auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(range);
auto iter = ranges.begin();
while (iter != ranges.end()) {
auto entry = iter->value();
// only check for exclude(set) operation, include(clear) are not checked
TraceEvent(SevInfo, "ParseNetworkAddress")
.detail("Valid", entry.first)
.detail("Set", entry.second.present())
.detail("Key", iter->begin().toString());
if (entry.first && entry.second.present()) {
Key address = iter->begin().removePrefix(range.begin);
auto a = AddressExclusion::parse(address);
if (!a.isValid()) {
std::string error = "ERROR: \'" + address.toString() + "\' is not a valid network endpoint address\n";
if (address.toString().find(":tls") != std::string::npos)
error += " Do not include the `:tls' suffix when naming a process\n";
msg = ManagementAPIError::toJsonString(
false, entry.second.present() ? (failed ? "exclude failed" : "exclude") : "include", error);
return false;
}
addresses.push_back(a);
exclusions.insert(a);
}
++iter;
}
return true;
}
ACTOR Future<bool> checkExclusion(Database db, std::vector<AddressExclusion>* addresses,
std::set<AddressExclusion>* exclusions, bool markFailed, Optional<std::string>* msg) {
if (markFailed) {
state bool safe;
try {
bool _safe = wait(checkSafeExclusions(db, *addresses));
safe = _safe;
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) throw;
TraceEvent("CheckSafeExclusionsError").error(e);
safe = false;
}
if (!safe) {
std::string temp = "ERROR: It is unsafe to exclude the specified servers at this time.\n"
"Please check that this exclusion does not bring down an entire storage team.\n"
"Please also ensure that the exclusion will keep a majority of coordinators alive.\n"
"You may add more storage processes or coordinators to make the operation safe.\n"
"Call set(\"0xff0xff/management/failed/<ADDRESS...>\", ...) to exclude without "
"performing safety checks.\n";
*msg = ManagementAPIError::toJsonString(false, markFailed ? "exclude failed" : "exclude", temp);
return false;
}
}
StatusObject status = wait(StatusClient::statusFetcher(db));
state std::string errorString =
"ERROR: Could not calculate the impact of this exclude on the total free space in the cluster.\n"
"Please try the exclude again in 30 seconds.\n"
"Call set(\"0xff0xff/management/options/exclude/force\", ...) first to exclude without checking free "
"space.\n";
StatusObjectReader statusObj(status);
StatusObjectReader statusObjCluster;
if (!statusObj.get("cluster", statusObjCluster)) {
*msg = ManagementAPIError::toJsonString(false, markFailed ? "exclude failed" : "exclude", errorString);
return false;
}
StatusObjectReader processesMap;
if (!statusObjCluster.get("processes", processesMap)) {
*msg = ManagementAPIError::toJsonString(false, markFailed ? "exclude failed" : "exclude", errorString);
return false;
}
state int ssTotalCount = 0;
state int ssExcludedCount = 0;
state double worstFreeSpaceRatio = 1.0;
try {
for (auto proc : processesMap.obj()) {
bool storageServer = false;
StatusArray rolesArray = proc.second.get_obj()["roles"].get_array();
for (StatusObjectReader role : rolesArray) {
if (role["role"].get_str() == "storage") {
storageServer = true;
break;
}
}
// Skip non-storage servers in free space calculation
if (!storageServer) continue;
StatusObjectReader process(proc.second);
std::string addrStr;
if (!process.get("address", addrStr)) {
*msg = ManagementAPIError::toJsonString(false, markFailed ? "exclude failed" : "exclude", errorString);
return false;
}
NetworkAddress addr = NetworkAddress::parse(addrStr);
bool excluded =
(process.has("excluded") && process.last().get_bool()) || addressExcluded(*exclusions, addr);
ssTotalCount++;
if (excluded) ssExcludedCount++;
if (!excluded) {
StatusObjectReader disk;
if (!process.get("disk", disk)) {
*msg =
ManagementAPIError::toJsonString(false, markFailed ? "exclude failed" : "exclude", errorString);
return false;
}
int64_t total_bytes;
if (!disk.get("total_bytes", total_bytes)) {
*msg =
ManagementAPIError::toJsonString(false, markFailed ? "exclude failed" : "exclude", errorString);
return false;
}
int64_t free_bytes;
if (!disk.get("free_bytes", free_bytes)) {
*msg =
ManagementAPIError::toJsonString(false, markFailed ? "exclude failed" : "exclude", errorString);
return false;
}
worstFreeSpaceRatio = std::min(worstFreeSpaceRatio, double(free_bytes) / total_bytes);
}
}
} catch (...) // std::exception
{
*msg = ManagementAPIError::toJsonString(false, markFailed ? "exclude failed" : "exclude", errorString);
return false;
}
if (ssExcludedCount == ssTotalCount ||
(1 - worstFreeSpaceRatio) * ssTotalCount / (ssTotalCount - ssExcludedCount) > 0.9) {
std::string temp = "ERROR: This exclude may cause the total free space in the cluster to drop below 10%.\n"
"Call set(\"0xff0xff/management/options/exclude/force\", ...) first to exclude without "
"checking free space.\n";
*msg = ManagementAPIError::toJsonString(false, markFailed ? "exclude failed" : "exclude", temp);
return false;
}
return true;
}
void includeServers(ReadYourWritesTransaction* ryw) {
ryw->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
ryw->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
ryw->setOption(FDBTransactionOptions::LOCK_AWARE);
ryw->setOption(FDBTransactionOptions::USE_PROVISIONAL_PROXIES);
// includeServers might be used in an emergency transaction, so make sure it is retry-self-conflicting and
// CAUSAL_WRITE_RISKY
ryw->setOption(FDBTransactionOptions::CAUSAL_WRITE_RISKY);
std::string versionKey = deterministicRandom()->randomUniqueID().toString();
// for exluded servers
auto ranges =
ryw->getSpecialKeySpaceWriteMap().containedRanges(SpecialKeySpace::getManamentApiCommandRange("exclude"));
auto iter = ranges.begin();
Transaction& tr = ryw->getTransaction();
while (iter != ranges.end()) {
auto entry = iter->value();
if (entry.first && !entry.second.present()) {
tr.addReadConflictRange(singleKeyRange(excludedServersVersionKey));
tr.set(excludedServersVersionKey, versionKey);
tr.clear(ryw->getDatabase()->specialKeySpace->decode(iter->range()));
}
++iter;
}
// for failed servers
ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(SpecialKeySpace::getManamentApiCommandRange("failed"));
iter = ranges.begin();
while (iter != ranges.end()) {
auto entry = iter->value();
if (entry.first && !entry.second.present()) {
tr.addReadConflictRange(singleKeyRange(failedServersVersionKey));
tr.set(failedServersVersionKey, versionKey);
tr.clear(ryw->getDatabase()->specialKeySpace->decode(iter->range()));
}
++iter;
}
}
ACTOR Future<Optional<std::string>> excludeCommitActor(ReadYourWritesTransaction* ryw, bool failed) {
// parse network addresses
state Optional<std::string> result;
state std::vector<AddressExclusion> addresses;
state std::set<AddressExclusion> exclusions;
if (!parseNetWorkAddrFromKeys(ryw, failed, addresses, exclusions, result)) return result;
// If force option is not set, we need to do safety check
auto force = ryw->getSpecialKeySpaceWriteMap()[SpecialKeySpace::getManagementApiCommandOptionSpecialKey(
failed ? "failed" : "exclude", "force")];
// only do safety check when we have servers to be excluded and the force option key is not set
if (addresses.size() && !(force.first && force.second.present())) {
bool safe = wait(checkExclusion(ryw->getDatabase(), &addresses, &exclusions, failed, &result));
if (!safe) return result;
}
excludeServers(ryw->getTransaction(), addresses, failed);
includeServers(ryw);
return result;
}
Future<Optional<std::string>> ExcludeServersRangeImpl::commit(ReadYourWritesTransaction* ryw) {
return excludeCommitActor(ryw, false);
}
FailedServersRangeImpl::FailedServersRangeImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
Future<Standalone<RangeResultRef>> FailedServersRangeImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr) const {
return rwModuleGetRangeActor(ryw, this, kr);
}
void FailedServersRangeImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) {
ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>(value)));
}
void FailedServersRangeImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) {
ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional<Value>()));
}
void FailedServersRangeImpl::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) {
ryw->getSpecialKeySpaceWriteMap().insert(range, std::make_pair(true, Optional<Value>()));
}
Key FailedServersRangeImpl::decode(const KeyRef& key) const {
return key.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)
.withPrefix(LiteralStringRef("\xff/conf/"));
}
Key FailedServersRangeImpl::encode(const KeyRef& key) const {
return key.removePrefix(LiteralStringRef("\xff/conf/"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin);
}
Future<Optional<std::string>> FailedServersRangeImpl::commit(ReadYourWritesTransaction* ryw) {
return excludeCommitActor(ryw, true);
}
ACTOR Future<Standalone<RangeResultRef>> ExclusionInProgressActor(ReadYourWritesTransaction* ryw, KeyRef prefix,
KeyRangeRef kr) {
state Standalone<RangeResultRef> result;
state Transaction& tr = ryw->getTransaction();
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); // necessary?
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
state std::vector<AddressExclusion> excl = wait((getExcludedServers(&tr)));
state std::set<AddressExclusion> exclusions(excl.begin(), excl.end());
state std::set<NetworkAddress> inProgressExclusion;
// Just getting a consistent read version proves that a set of tlogs satisfying the exclusions has completed
// recovery Check that there aren't any storage servers with addresses violating the exclusions
state Standalone<RangeResultRef> serverList = wait(tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT(!serverList.more && serverList.size() < CLIENT_KNOBS->TOO_MANY);
for (auto& s : serverList) {
auto addresses = decodeServerListValue(s.value).getKeyValues.getEndpoint().addresses;
if (addressExcluded(exclusions, addresses.address)) {
inProgressExclusion.insert(addresses.address);
}
if (addresses.secondaryAddress.present() && addressExcluded(exclusions, addresses.secondaryAddress.get())) {
inProgressExclusion.insert(addresses.secondaryAddress.get());
}
}
Optional<Standalone<StringRef>> value = wait(tr.get(logsKey));
ASSERT(value.present());
auto logs = decodeLogsValue(value.get());
for (auto const& log : logs.first) {
if (log.second == NetworkAddress() || addressExcluded(exclusions, log.second)) {
inProgressExclusion.insert(log.second);
}
}
for (auto const& log : logs.second) {
if (log.second == NetworkAddress() || addressExcluded(exclusions, log.second)) {
inProgressExclusion.insert(log.second);
}
}
for (auto const& address : inProgressExclusion) {
Key addrKey = prefix.withSuffix(address.toString());
if (kr.contains(addrKey)) {
result.push_back(result.arena(), KeyValueRef(addrKey, ValueRef()));
result.arena().dependsOn(addrKey.arena());
}
}
return result;
}
ExclusionInProgressRangeImpl::ExclusionInProgressRangeImpl(KeyRangeRef kr) : SpecialKeyRangeAsyncImpl(kr) {}
Future<Standalone<RangeResultRef>> ExclusionInProgressRangeImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr) const {
return ExclusionInProgressActor(ryw, getKeyRange().begin, kr);
}

View File

@ -33,27 +33,58 @@
#include "fdbclient/ReadYourWrites.h"
#include "flow/actorcompiler.h" // This must be the last #include.
class SpecialKeyRangeBaseImpl {
class SpecialKeyRangeReadImpl {
public:
// Each derived class only needs to implement this simple version of getRange
virtual Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const = 0;
explicit SpecialKeyRangeBaseImpl(KeyRangeRef kr) : range(kr) {}
explicit SpecialKeyRangeReadImpl(KeyRangeRef kr) : range(kr) {}
KeyRangeRef getKeyRange() const { return range; }
// true if the getRange call can emit more than one rpc calls,
// we cache the results to keep consistency in the same getrange lifetime
// TODO : give this function a more descriptive name
virtual bool isAsync() const { return false; }
virtual ~SpecialKeyRangeBaseImpl() {}
virtual ~SpecialKeyRangeReadImpl() {}
protected:
KeyRange range; // underlying key range for this function
};
class SpecialKeyRangeAsyncImpl : public SpecialKeyRangeBaseImpl {
class ManagementAPIError {
public:
explicit SpecialKeyRangeAsyncImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {}
static std::string toJsonString(bool retriable, const std::string& command, const std::string& msg) {
json_spirit::mObject errorObj;
errorObj["retriable"] = retriable;
errorObj["command"] = command;
errorObj["message"] = msg;
return json_spirit::write_string(json_spirit::mValue(errorObj), json_spirit::Output_options::raw_utf8);
}
private:
ManagementAPIError(){};
};
class SpecialKeyRangeRWImpl : public SpecialKeyRangeReadImpl {
public:
virtual void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) = 0;
virtual void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) = 0;
virtual void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) = 0;
virtual Future<Optional<std::string>> commit(
ReadYourWritesTransaction* ryw) = 0; // all delayed async operations of writes in special-key-space
// Given the special key to write, return the real key that needs to be modified
virtual Key decode(const KeyRef& key) const = 0;
// Given the read key, return the corresponding special key
virtual Key encode(const KeyRef& key) const = 0;
explicit SpecialKeyRangeRWImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
virtual ~SpecialKeyRangeRWImpl() {}
};
class SpecialKeyRangeAsyncImpl : public SpecialKeyRangeReadImpl {
public:
explicit SpecialKeyRangeAsyncImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const = 0;
@ -65,7 +96,7 @@ public:
bool isAsync() const override { return true; }
ACTOR static Future<Standalone<RangeResultRef>> getRangeAsyncActor(const SpecialKeyRangeBaseImpl* skrAyncImpl,
ACTOR static Future<Standalone<RangeResultRef>> getRangeAsyncActor(const SpecialKeyRangeReadImpl* skrAyncImpl,
ReadYourWritesTransaction* ryw, KeyRangeRef kr,
Optional<Standalone<RangeResultRef>>* cache) {
ASSERT(skrAyncImpl->getKeyRange().contains(kr));
@ -95,6 +126,8 @@ public:
enum class MODULE {
CLUSTERFILEPATH,
CONNECTIONSTRING,
ERRORMSG, // A single key space contains a json string which describes the last error in special-key-space
MANAGEMENT, // Management-API
METRICS, // data-distribution metrics
TESTONLY, // only used by correctness tests
TRANSACTION, // transaction related info, conflicting keys, read/write conflict range
@ -103,67 +136,69 @@ public:
WORKERINTERFACE,
};
enum class IMPLTYPE {
READONLY, // The underlying special key range can only be called with get and getRange
READWRITE // The underlying special key range can be called with get, getRange, set, clear
};
SpecialKeySpace(KeyRef spaceStartKey = Key(), KeyRef spaceEndKey = normalKeys.end, bool testOnly = true);
Future<Optional<Value>> get(ReadYourWritesTransaction* ryw, const Key& key);
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeySelector begin, KeySelector end,
GetRangeLimits limits, bool reverse = false);
SpecialKeySpace(KeyRef spaceStartKey = Key(), KeyRef spaceEndKey = normalKeys.end, bool testOnly = true)
: range(KeyRangeRef(spaceStartKey, spaceEndKey)), impls(nullptr, spaceEndKey),
modules(testOnly ? SpecialKeySpace::MODULE::TESTONLY : SpecialKeySpace::MODULE::UNKNOWN, spaceEndKey) {
// Default begin of KeyRangeMap is Key(), insert the range to update start key if needed
impls.insert(range, nullptr);
if (!testOnly) modulesBoundaryInit(); // testOnly is used in the correctness workload
}
// Initialize module boundaries, used to handle cross_module_read
void modulesBoundaryInit() {
for (const auto& pair : moduleToBoundary) {
ASSERT(range.contains(pair.second));
// Make sure the module is not overlapping with any registered modules
// Note: same like ranges, one module's end cannot be another module's start, relax the condition if needed
ASSERT(modules.rangeContaining(pair.second.begin) == modules.rangeContaining(pair.second.end) &&
modules[pair.second.begin] == SpecialKeySpace::MODULE::UNKNOWN);
modules.insert(pair.second, pair.first);
impls.insert(pair.second, nullptr); // Note: Due to underlying implementation, the insertion here is
// important to make cross_module_read being handled correctly
}
}
void registerKeyRange(SpecialKeySpace::MODULE module, const KeyRangeRef& kr, SpecialKeyRangeBaseImpl* impl) {
// module boundary check
if (module == SpecialKeySpace::MODULE::TESTONLY)
ASSERT(normalKeys.contains(kr));
else
ASSERT(moduleToBoundary.at(module).contains(kr));
// make sure the registered range is not overlapping with existing ones
// Note: kr.end should not be the same as another range's begin, although it should work even they are the same
for (auto iter = impls.rangeContaining(kr.begin); true; ++iter) {
ASSERT(iter->value() == nullptr);
if (iter == impls.rangeContaining(kr.end))
break; // relax the condition that the end can be another range's start, if needed
}
impls.insert(kr, impl);
}
void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value);
KeyRangeMap<SpecialKeyRangeBaseImpl*>& getImpls() { return impls; }
void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range);
void clear(ReadYourWritesTransaction* ryw, const KeyRef& key);
Future<Void> commit(ReadYourWritesTransaction* ryw);
void registerKeyRange(SpecialKeySpace::MODULE module, SpecialKeySpace::IMPLTYPE type, const KeyRangeRef& kr,
SpecialKeyRangeReadImpl* impl);
Key decode(const KeyRef& key);
KeyRange decode(const KeyRangeRef& kr);
KeyRangeMap<SpecialKeyRangeReadImpl*>& getReadImpls() { return readImpls; }
KeyRangeMap<SpecialKeyRangeRWImpl*>& getRWImpls() { return writeImpls; }
KeyRangeMap<SpecialKeySpace::MODULE>& getModules() { return modules; }
KeyRangeRef getKeyRange() const { return range; }
static KeyRangeRef getModuleRange(SpecialKeySpace::MODULE module) { return moduleToBoundary.at(module); }
static KeyRangeRef getManamentApiCommandRange(const std::string& command) {
return managementApiCommandToRange.at(command);
}
static KeyRef getManagementApiCommandPrefix(const std::string& command) {
return managementApiCommandToRange.at(command).begin;
}
static Key getManagementApiCommandOptionSpecialKey(const std::string& command, const std::string& option);
static const std::unordered_set<std::string>& getManagementApiOptionsSet() { return options; }
private:
ACTOR static Future<Optional<Value>> getActor(SpecialKeySpace* sks, ReadYourWritesTransaction* ryw, KeyRef key);
ACTOR static Future<Standalone<RangeResultRef>> checkRYWValid(SpecialKeySpace* sks,
ReadYourWritesTransaction* ryw, KeySelector begin,
KeySelector end, GetRangeLimits limits,
bool reverse);
ACTOR static Future<Standalone<RangeResultRef>> checkRYWValid(SpecialKeySpace* sks, ReadYourWritesTransaction* ryw,
KeySelector begin, KeySelector end,
GetRangeLimits limits, bool reverse);
ACTOR static Future<Standalone<RangeResultRef>> getRangeAggregationActor(SpecialKeySpace* sks,
ReadYourWritesTransaction* ryw,
KeySelector begin, KeySelector end,
GetRangeLimits limits, bool reverse);
KeyRange range;
KeyRangeMap<SpecialKeyRangeBaseImpl*> impls;
KeyRangeMap<SpecialKeyRangeReadImpl*> readImpls;
KeyRangeMap<SpecialKeySpace::MODULE> modules;
KeyRangeMap<SpecialKeyRangeRWImpl*> writeImpls;
KeyRange range; // key space range, (\xff\xff, \xff\xff\xff) in prod and (, \xff) in test
static std::unordered_map<SpecialKeySpace::MODULE, KeyRange> moduleToBoundary;
static std::unordered_map<std::string, KeyRange>
managementApiCommandToRange; // management command to its special keys' range
static std::unordered_set<std::string> options; // "<command>/<option>"
// Initialize module boundaries, used to handle cross_module_read
void modulesBoundaryInit();
};
// Use special key prefix "\xff\xff/transaction/conflicting_keys/<some_key>",
@ -172,19 +207,19 @@ private:
// prefix/<key1> : '1' - any keys equal or larger than this key are (probably) conflicting keys
// prefix/<key2> : '0' - any keys equal or larger than this key are (definitely) not conflicting keys
// Currently, the conflicting keyranges returned are original read_conflict_ranges or union of them.
class ConflictingKeysImpl : public SpecialKeyRangeBaseImpl {
class ConflictingKeysImpl : public SpecialKeyRangeReadImpl {
public:
explicit ConflictingKeysImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
};
class ReadConflictRangeImpl : public SpecialKeyRangeBaseImpl {
class ReadConflictRangeImpl : public SpecialKeyRangeReadImpl {
public:
explicit ReadConflictRangeImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
};
class WriteConflictRangeImpl : public SpecialKeyRangeBaseImpl {
class WriteConflictRangeImpl : public SpecialKeyRangeReadImpl {
public:
explicit WriteConflictRangeImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
@ -196,5 +231,47 @@ public:
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
};
class ManagementCommandsOptionsImpl : public SpecialKeyRangeRWImpl {
public:
explicit ManagementCommandsOptionsImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) override;
void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) override;
void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override;
Key decode(const KeyRef& key) const override;
Key encode(const KeyRef& key) const override;
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
class ExcludeServersRangeImpl : public SpecialKeyRangeRWImpl {
public:
explicit ExcludeServersRangeImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) override;
void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) override;
void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override;
Key decode(const KeyRef& key) const override;
Key encode(const KeyRef& key) const override;
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
class FailedServersRangeImpl : public SpecialKeyRangeRWImpl {
public:
explicit FailedServersRangeImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) override;
void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) override;
void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override;
Key decode(const KeyRef& key) const override;
Key encode(const KeyRef& key) const override;
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
};
class ExclusionInProgressRangeImpl : public SpecialKeyRangeAsyncImpl {
public:
explicit ExclusionInProgressRangeImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
};
#include "flow/unactorcompiler.h"
#endif

View File

@ -264,6 +264,8 @@ description is not currently required but encouraged.
description="The transaction can retrieve keys that are conflicting with other transactions." />
<Option name="special_key_space_relaxed" code="713"
description="By default, the special key space will only allow users to read from exactly one module (a subspace in the special key space). Use this option to allow reading from zero or more modules. Users who set this option should be prepared for new modules, which may have different behaviors than the modules they're currently reading. For example, a new module might block or return an error." />
<Option name="special_key_space_enable_writes" code="714"
description="By default, users are not allowed to write to special keys. Enable this option will implicitly enable all options required to achieve the configuration change." />
<Option name="tag" code="800" paramType="String" paramDescription="String identifier used to associated this transaction with a throttling group. Must not exceed 16 characters."
description="Adds a tag to the transaction that can be used to apply manual targeted throttling. At most 5 tags can be set on a transaction." />
<Option name="auto_throttle_tag" code="801" paramType="String" paramDescription="String identifier used to associated this transaction with a throttling group. Must not exceed 16 characters."

View File

@ -115,6 +115,7 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
unsigned int operationId;
int64_t maximumTotalData;
bool specialKeysRelaxed;
bool specialKeysWritesEnabled;
bool success;
@ -131,6 +132,8 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
useSystemKeys = deterministicRandom()->coinflip();
initialKeyDensity = deterministicRandom()->random01(); // This fraction of keys are present before the first transaction (and after an unknown result)
specialKeysRelaxed = deterministicRandom()->coinflip();
// Only enable special keys writes when allowed to access system keys
specialKeysWritesEnabled = useSystemKeys && deterministicRandom()->coinflip();
// See https://github.com/apple/foundationdb/issues/2424
if (BUGGIFY) {
@ -162,7 +165,8 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
.detail("ValueSizeRange", valueSizeRange.second)
.detail("MaxClearSize", maxClearSize)
.detail("UseSystemKeys", useSystemKeys)
.detail("SpecialKeysRelaxed", specialKeysRelaxed);
.detail("SpecialKeysRelaxed", specialKeysRelaxed)
.detail("SpecialKeysWritesEnabled", specialKeysWritesEnabled);
TraceEvent("RemapEventSeverity").detail("TargetEvent", "LargePacketSent").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
TraceEvent("RemapEventSeverity").detail("TargetEvent", "LargePacketReceived").detail("OriginalSeverity", SevWarnAlways).detail("NewSeverity", SevInfo);
@ -225,6 +229,8 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
if( self->useSystemKeys )
tr->setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS );
if (self->specialKeysRelaxed) tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_RELAXED);
if (self->specialKeysWritesEnabled)
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
int end = std::min(self->nodes, i+keysPerBatch );
tr->clear( KeyRangeRef( self->getKeyForIndex(i), self->getKeyForIndex(end) ) );
@ -285,6 +291,9 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
if (self->specialKeysRelaxed) {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_RELAXED);
}
if (self->specialKeysWritesEnabled) {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
}
tr->addWriteConflictRange( self->conflictRange );
try {
@ -953,12 +962,23 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
key = makeKey();
}
value = makeValue();
contract = {
std::make_pair( error_code_key_too_large, ExceptionContract::requiredIf(key.size() > (key.startsWith(systemKeys.begin) ? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT : CLIENT_KNOBS->KEY_SIZE_LIMIT)) ),
std::make_pair( error_code_value_too_large, ExceptionContract::requiredIf(value.size() > CLIENT_KNOBS->VALUE_SIZE_LIMIT) ),
std::make_pair( error_code_key_outside_legal_range, ExceptionContract::requiredIf(
(key >= (workload->useSystemKeys ? systemKeys.end : normalKeys.end))) )
};
contract = { std::make_pair(
error_code_key_too_large,
ExceptionContract::requiredIf(key.size() > (key.startsWith(systemKeys.begin)
? CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT
: CLIENT_KNOBS->KEY_SIZE_LIMIT))),
std::make_pair(error_code_value_too_large,
ExceptionContract::requiredIf(value.size() > CLIENT_KNOBS->VALUE_SIZE_LIMIT)),
std::make_pair(error_code_key_outside_legal_range,
ExceptionContract::requiredIf(
(key >= (workload->useSystemKeys ? systemKeys.end : normalKeys.end)) &&
!specialKeys.contains(key))),
std::make_pair(error_code_special_keys_write_disabled,
ExceptionContract::requiredIf(specialKeys.contains(key) &&
!workload->specialKeysWritesEnabled)),
std::make_pair(error_code_special_keys_no_write_module_found,
ExceptionContract::possibleIf(specialKeys.contains(key) &&
workload->specialKeysWritesEnabled)) };
}
void callback(Reference<ITransaction> tr) {
@ -982,11 +1002,22 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
key1 = makeKey();
key2 = makeKey();
}
bool isSpecialKeyRange = specialKeys.contains(key1) && key2 <= specialKeys.end;
contract = {
std::make_pair( error_code_inverted_range, ExceptionContract::requiredIf(key1 > key2) ),
std::make_pair( error_code_key_outside_legal_range, ExceptionContract::requiredIf(
(key1 > (workload->useSystemKeys ? systemKeys.end : normalKeys.end)) ||
(key2 > (workload->useSystemKeys ? systemKeys.end : normalKeys.end))) )
std::make_pair(error_code_inverted_range, ExceptionContract::requiredIf(key1 > key2)),
std::make_pair(error_code_key_outside_legal_range,
ExceptionContract::requiredIf(
((key1 > (workload->useSystemKeys ? systemKeys.end : normalKeys.end)) ||
(key2 > (workload->useSystemKeys ? systemKeys.end : normalKeys.end))) &&
!isSpecialKeyRange)),
std::make_pair(error_code_special_keys_write_disabled,
ExceptionContract::requiredIf(isSpecialKeyRange && !workload->specialKeysWritesEnabled)),
std::make_pair(error_code_special_keys_cross_module_clear,
ExceptionContract::possibleIf(isSpecialKeyRange && workload->specialKeysWritesEnabled)),
std::make_pair(error_code_special_keys_no_write_module_found,
ExceptionContract::possibleIf(isSpecialKeyRange && workload->specialKeysWritesEnabled))
};
}
@ -1011,11 +1042,22 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
key1 = makeKey();
key2 = makeKey();
}
bool isSpecialKeyRange = specialKeys.contains(key1) && key2 <= specialKeys.end;
contract = {
std::make_pair( error_code_inverted_range, ExceptionContract::requiredIf(key1 > key2) ),
std::make_pair( error_code_key_outside_legal_range, ExceptionContract::requiredIf(
(key1 > (workload->useSystemKeys ? systemKeys.end : normalKeys.end)) ||
(key2 > (workload->useSystemKeys ? systemKeys.end : normalKeys.end))) )
std::make_pair(error_code_inverted_range, ExceptionContract::requiredIf(key1 > key2)),
std::make_pair(error_code_key_outside_legal_range,
ExceptionContract::requiredIf(
((key1 > (workload->useSystemKeys ? systemKeys.end : normalKeys.end)) ||
(key2 > (workload->useSystemKeys ? systemKeys.end : normalKeys.end))) &&
!isSpecialKeyRange)),
std::make_pair(error_code_special_keys_write_disabled,
ExceptionContract::requiredIf(isSpecialKeyRange && !workload->specialKeysWritesEnabled)),
std::make_pair(error_code_special_keys_cross_module_clear,
ExceptionContract::possibleIf(isSpecialKeyRange && workload->specialKeysWritesEnabled)),
std::make_pair(error_code_special_keys_no_write_module_found,
ExceptionContract::possibleIf(isSpecialKeyRange && workload->specialKeysWritesEnabled))
};
}
@ -1038,10 +1080,15 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
while (isProtectedKey(key)) {
key = makeKey();
}
contract = {
std::make_pair( error_code_key_outside_legal_range, ExceptionContract::requiredIf(
key >= (workload->useSystemKeys ? systemKeys.end : normalKeys.end) ) )
};
contract = { std::make_pair(error_code_key_outside_legal_range,
ExceptionContract::requiredIf(
key >= (workload->useSystemKeys ? systemKeys.end : normalKeys.end))),
std::make_pair(error_code_special_keys_write_disabled,
ExceptionContract::requiredIf(specialKeys.contains(key) &&
!workload->specialKeysWritesEnabled)),
std::make_pair(error_code_special_keys_no_write_module_found,
ExceptionContract::possibleIf(specialKeys.contains(key) &&
workload->specialKeysWritesEnabled)) };
}
void callback(Reference<ITransaction> tr) {

View File

@ -18,16 +18,18 @@
* limitations under the License.
*/
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/Schemas.h"
#include "fdbclient/SpecialKeySpace.actor.h"
#include "fdbserver/TesterInterface.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/actorcompiler.h"
class SKSCTestImpl : public SpecialKeyRangeBaseImpl {
class SKSCTestImpl : public SpecialKeyRangeReadImpl {
public:
explicit SKSCTestImpl(KeyRangeRef kr) : SpecialKeyRangeBaseImpl(kr) {}
explicit SKSCTestImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
virtual Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
ASSERT(range.contains(kr));
auto resultFuture = ryw->getRange(kr, CLIENT_KNOBS->TOO_MANY);
@ -94,7 +96,8 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
self->keys.push_back_deep(self->keys.arena(), KeyRangeRef(startKey, endKey));
self->impls.push_back(std::make_shared<SKSCTestImpl>(KeyRangeRef(startKey, endKey)));
// Although there are already ranges registered, the testing range will replace them
cx->specialKeySpace->registerKeyRange(SpecialKeySpace::MODULE::TESTONLY, self->keys.back(),
cx->specialKeySpace->registerKeyRange(SpecialKeySpace::MODULE::TESTONLY,
SpecialKeySpace::IMPLTYPE::READONLY, self->keys.back(),
self->impls.back().get());
// generate keys in each key range
int keysInRange = deterministicRandom()->randomInt(self->minKeysPerRange, self->maxKeysPerRange + 1);
@ -108,7 +111,7 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
}
ACTOR Future<Void> _start(Database cx, SpecialKeySpaceCorrectnessWorkload* self) {
testRywLifetime(cx);
wait(timeout(self->testModuleRangeReadErrors(cx, self) && self->getRangeCallActor(cx, self) &&
wait(timeout(self->testSpecialKeySpaceErrors(cx, self) && self->getRangeCallActor(cx, self) &&
testConflictRanges(cx, /*read*/ true, self) && testConflictRanges(cx, /*read*/ false, self),
self->testDuration, Void()));
return Void();
@ -235,7 +238,7 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
return GetRangeLimits(rowLimits, byteLimits);
}
ACTOR Future<Void> testModuleRangeReadErrors(Database cx_, SpecialKeySpaceCorrectnessWorkload* self) {
ACTOR Future<Void> testSpecialKeySpaceErrors(Database cx_, SpecialKeySpaceCorrectnessWorkload* self) {
Database cx = cx_->clone();
state Reference<ReadYourWritesTransaction> tx = Reference(new ReadYourWritesTransaction(cx));
// begin key outside module range
@ -349,6 +352,56 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
} catch (Error& e) {
throw;
}
// Errors introduced by SpecialKeyRangeRWImpl
// Writes are disabled by default
try {
tx->set(LiteralStringRef("\xff\xff/I_am_not_a_range_can_be_written"), ValueRef());
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) throw;
ASSERT(e.code() == error_code_special_keys_write_disabled);
tx->reset();
}
// The special key is not in a range that can be called with set
try {
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tx->set(LiteralStringRef("\xff\xff/I_am_not_a_range_can_be_written"), ValueRef());
ASSERT(false);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) throw;
ASSERT(e.code() == error_code_special_keys_no_write_module_found);
tx->reset();
}
// A clear cross two ranges are forbidden
try {
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tx->clear(KeyRangeRef(SpecialKeySpace::getManamentApiCommandRange("exclude").begin,
SpecialKeySpace::getManamentApiCommandRange("failed").end));
ASSERT(false);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) throw;
ASSERT(e.code() == error_code_special_keys_cross_module_clear);
tx->reset();
}
// Management api error, and error message shema check
try {
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tx->set(LiteralStringRef("Invalid_Network_Address")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("exclude")),
ValueRef());
wait(tx->commit());
ASSERT(false);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) throw;
ASSERT(e.code() == error_code_special_keys_api_failure);
Optional<Value> errorMsg = wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
ASSERT(errorMsg.present());
std::string errorStr;
auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
// special_key_space_management_api_error_msg schema validation
ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
tx->reset();
}
return Void();
}

View File

@ -103,7 +103,11 @@ struct TriggerRecoveryLoopWorkload : TestWorkload {
address_interface[ip_port] = it.value;
}
for (auto it : address_interface) {
tr.set(LiteralStringRef("\xff\xff/reboot_worker"), it.second);
if (cx->apiVersionAtLeast(700))
BinaryReader::fromStringRef<ClientWorkerInterface>(it.second, IncludeVersion())
.reboot.send(RebootRequest());
else
tr.set(LiteralStringRef("\xff\xff/reboot_worker"), it.second);
}
TraceEvent(SevInfo, "TriggerRecoveryLoop_AttempedKillAll");
return Void();

View File

@ -158,6 +158,10 @@ ERROR( tag_too_long, 2110, "Tag set on transaction is too long" )
ERROR( too_many_tag_throttles, 2111, "Too many tag throttles have been created" )
ERROR( special_keys_cross_module_read, 2112, "Special key space range read crosses modules. Refer to the `special_key_space_relaxed' transaction option for more details." )
ERROR( special_keys_no_module_found, 2113, "Special key space range read does not intersect a module. Refer to the `special_key_space_relaxed' transaction option for more details." )
ERROR( special_keys_write_disabled, 2114, "Special Key space is not allowed to write by default. Refer to the `special_key_space_enable_writes` transaction option for more details." )
ERROR( special_keys_no_write_module_found, 2115, "Special key space key or keyrange in set or clear does not intersect a module" )
ERROR( special_keys_cross_module_clear, 2116, "Special key space clear crosses modules" )
ERROR( special_keys_api_failure, 2117, "Api call through special keys failed. For more information, call get on special key 0xff0xff/error_message to get a json string of the error message." )
// 2200 - errors from bindings and official APIs
ERROR( api_version_unset, 2200, "API version is not set" )