Merge pull request #4255 from sfc-gh-clin/add-coordinators-into-special-keys

Add coordinators into special keys
This commit is contained in:
Andrew Noyes 2021-02-18 18:26:07 -08:00 committed by GitHub
commit 9fb396e372
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 592 additions and 21 deletions

View File

@ -1009,6 +1009,69 @@ ACTOR Future<std::vector<NetworkAddress>> getCoordinators( Database cx ) {
}
}
ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr, Reference<IQuorumChange> change,
std::vector<NetworkAddress>* desiredCoordinators) {
tr->setOption( FDBTransactionOptions::LOCK_AWARE );
tr->setOption( FDBTransactionOptions::USE_PROVISIONAL_PROXIES );
tr->setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
Optional<Value> currentKey = wait( tr->get( coordinatorsKey ) );
if (!currentKey.present())
return CoordinatorsResult::BAD_DATABASE_STATE; // Someone deleted this key entirely?
state ClusterConnectionString old( currentKey.get().toString() );
if ( tr->getDatabase()->getConnectionFile() && old.clusterKeyName().toString() != tr->getDatabase()->getConnectionFile()->getConnectionString().clusterKeyName() )
return CoordinatorsResult::BAD_DATABASE_STATE; // Someone changed the "name" of the database??
state CoordinatorsResult result = CoordinatorsResult::SUCCESS;
if(!desiredCoordinators->size()) {
std::vector<NetworkAddress> _desiredCoordinators = wait( change->getDesiredCoordinators( tr, old.coordinators(), Reference<ClusterConnectionFile>(new ClusterConnectionFile(old)), result ) );
*desiredCoordinators = _desiredCoordinators;
}
if (result != CoordinatorsResult::SUCCESS)
return result;
if (!desiredCoordinators->size())
return CoordinatorsResult::INVALID_NETWORK_ADDRESSES;
std::sort(desiredCoordinators->begin(), desiredCoordinators->end());
std::string newName = change->getDesiredClusterKeyName();
if (newName.empty()) newName = old.clusterKeyName().toString();
if ( old.coordinators() == *desiredCoordinators && old.clusterKeyName() == newName)
return CoordinatorsResult::SAME_NETWORK_ADDRESSES;
state ClusterConnectionString conn( *desiredCoordinators, StringRef( newName + ':' + deterministicRandom()->randomAlphaNumeric( 32 ) ) );
if(g_network->isSimulated()) {
for(int i = 0; i < (desiredCoordinators->size()/2)+1; i++) {
auto addresses = g_simulator.getProcessByAddress((*desiredCoordinators)[i])->addresses;
g_simulator.protectedAddresses.insert(addresses.address);
if(addresses.secondaryAddress.present()) {
g_simulator.protectedAddresses.insert(addresses.secondaryAddress.get());
}
TraceEvent("ProtectCoordinator").detail("Address", (*desiredCoordinators)[i]).backtrace();
}
}
vector<Future<Optional<LeaderInfo>>> leaderServers;
ClientCoordinators coord( Reference<ClusterConnectionFile>( new ClusterConnectionFile( conn ) ) );
for( int i = 0; i < coord.clientLeaderServers.size(); i++ )
leaderServers.push_back( retryBrokenPromise( coord.clientLeaderServers[i].getLeader, GetLeaderRequest( coord.clusterKey, UID() ), TaskPriority::CoordinationReply ) );
choose {
when( wait( waitForAll( leaderServers ) ) ) {}
when( wait( delay(5.0) ) ) {
return CoordinatorsResult::COORDINATOR_UNREACHABLE;
}
}
tr->set( coordinatorsKey, conn.toString() );
return Optional<CoordinatorsResult>();
}
ACTOR Future<CoordinatorsResult> changeQuorum(Database cx, Reference<IQuorumChange> change) {
state Transaction tr(cx);
state int retries = 0;

View File

@ -144,6 +144,8 @@ struct IQuorumChange : ReferenceCounted<IQuorumChange> {
};
// Change to use the given set of coordination servers
ACTOR Future<Optional<CoordinatorsResult>> changeQuorumChecker(Transaction* tr, Reference<IQuorumChange> change,
std::vector<NetworkAddress>* desiredCoordinators);
ACTOR Future<CoordinatorsResult> changeQuorum(Database cx, Reference<IQuorumChange> change);
Reference<IQuorumChange> autoQuorumChange(int desired = -1);
Reference<IQuorumChange> noQuorumChange();

View File

@ -950,6 +950,16 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
SpecialKeySpace::MODULE::TRACING, SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<TracingOptionsImpl>(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::TRACING)));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::CONFIGURATION, SpecialKeySpace::IMPLTYPE::READWRITE,
std::make_unique<CoordinatorsImpl>(
KeyRangeRef(LiteralStringRef("coordinators/"), LiteralStringRef("coordinators0"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin)));
registerSpecialKeySpaceModule(
SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READONLY,
std::make_unique<CoordinatorsAutoImpl>(
singleKeyRange(LiteralStringRef("auto_coordinators"))
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
}
if (apiVersionAtLeast(630)) {
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, SpecialKeySpace::IMPLTYPE::READONLY,

View File

@ -18,6 +18,8 @@
* limitations under the License.
*/
#include <boost/algorithm/string.hpp>
#include "fdbclient/SpecialKeySpace.actor.h"
#include "flow/UnitTest.h"
#include "fdbclient/ManagementAPI.actor.h"
@ -27,7 +29,16 @@
namespace {
const std::string kTracingTransactionIdKey = "transaction_id";
const std::string kTracingTokenKey = "token";
static bool isAlphaNumeric(const std::string& key) {
// [A-Za-z0-9_]+
if (!key.size()) return false;
for (const char& c : key) {
if (!((c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || c == '_')) return false;
}
return true;
}
} // namespace
std::unordered_map<SpecialKeySpace::MODULE, KeyRange> SpecialKeySpace::moduleToBoundary = {
{ SpecialKeySpace::MODULE::TRANSACTION,
@ -55,7 +66,9 @@ std::unordered_map<std::string, KeyRange> SpecialKeySpace::managementApiCommandT
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
{ "lock", singleKeyRange(LiteralStringRef("db_locked")).withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
{ "consistencycheck", singleKeyRange(LiteralStringRef("consistency_check_suspended"))
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }
.withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) },
{ "coordinators", KeyRangeRef(LiteralStringRef("coordinators/"), LiteralStringRef("coordinators0"))
.withPrefix(moduleToBoundary[MODULE::CONFIGURATION].begin) }
};
std::set<std::string> SpecialKeySpace::options = { "excluded/force", "failed/force" };
@ -178,13 +191,12 @@ ACTOR Future<Void> normalizeKeySelectorActor(SpecialKeySpace* sks, ReadYourWrite
TraceEvent(SevDebug, "ReadToBoundary")
.detail("TerminateKey", ks->getKey())
.detail("TerminateOffset", ks->offset);
// If still not normalized after moving to the boundary,
// If still not normalized after moving to the boundary,
// let key selector clamp up to the boundary
if (ks->offset < 1) {
result->readToBegin = true;
ks->setKey(boundary.begin);
}
else {
} else {
result->readThroughEnd = true;
ks->setKey(boundary.end);
}
@ -1296,8 +1308,7 @@ TracingOptionsImpl::TracingOptionsImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(k
TraceEvent("TracingOptionsImpl::TracingOptionsImpl").detail("Range", kr);
}
Future<Standalone<RangeResultRef>> TracingOptionsImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr) const {
Future<Standalone<RangeResultRef>> TracingOptionsImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
Standalone<RangeResultRef> result;
for (const auto& option : SpecialKeySpace::getTracingOptions()) {
auto key = getKeyRange().begin.withSuffix(option);
@ -1306,9 +1317,11 @@ Future<Standalone<RangeResultRef>> TracingOptionsImpl::getRange(ReadYourWritesTr
}
if (key.endsWith(kTracingTransactionIdKey)) {
result.push_back_deep(result.arena(), KeyValueRef(key, std::to_string(ryw->getTransactionInfo().spanID.first())));
result.push_back_deep(result.arena(),
KeyValueRef(key, std::to_string(ryw->getTransactionInfo().spanID.first())));
} else if (key.endsWith(kTracingTokenKey)) {
result.push_back_deep(result.arena(), KeyValueRef(key, std::to_string(ryw->getTransactionInfo().spanID.second())));
result.push_back_deep(result.arena(),
KeyValueRef(key, std::to_string(ryw->getTransactionInfo().spanID.second())));
}
}
return result;
@ -1351,3 +1364,193 @@ void TracingOptionsImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key
ryw->setSpecialKeySpaceErrorMsg("clear disabled");
throw special_keys_api_failure();
}
CoordinatorsImpl::CoordinatorsImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {}
Future<Standalone<RangeResultRef>> CoordinatorsImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const {
Standalone<RangeResultRef> result;
KeyRef prefix(getKeyRange().begin);
// the constructor of ClusterConnectionFile already checks whether the file is valid
auto cs = ClusterConnectionFile(ryw->getDatabase()->getConnectionFile()->getFilename()).getConnectionString();
auto coordinator_processes = cs.coordinators();
Key cluster_decription_key = prefix.withSuffix(LiteralStringRef("cluster_description"));
if (kr.contains(cluster_decription_key)) {
result.push_back_deep(result.arena(), KeyValueRef(cluster_decription_key, cs.clusterKeyName()));
}
// Note : the sort by string is anti intuition, ex. 1.1.1.1:11 < 1.1.1.1:5
// include :tls in keys if the network addresss is TLS
std::sort(coordinator_processes.begin(), coordinator_processes.end(),
[](const NetworkAddress& lhs, const NetworkAddress& rhs) { return lhs.toString() < rhs.toString(); });
std::string processes_str;
for (const auto& w : coordinator_processes) {
if (processes_str.size()) processes_str += ",";
processes_str += w.toString();
}
Key processes_key = prefix.withSuffix(LiteralStringRef("processes"));
if (kr.contains(processes_key)) {
result.push_back_deep(result.arena(), KeyValueRef(processes_key, Value(processes_str)));
}
return rywGetRange(ryw, kr, result);
}
ACTOR static Future<Optional<std::string>> coordinatorsCommitActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) {
state Reference<IQuorumChange> change;
state std::vector<NetworkAddress> addressesVec;
state std::vector<std::string> process_address_strs;
state Optional<std::string> msg;
state int index;
state bool parse_error = false;
// check update for cluster_description
Key processes_key = LiteralStringRef("processes").withPrefix(kr.begin);
auto processes_entry = ryw->getSpecialKeySpaceWriteMap()[processes_key];
if (processes_entry.first) {
ASSERT(processes_entry.second.present()); // no clear should be seen here
auto processesStr = processes_entry.second.get().toString();
boost::split(process_address_strs, processesStr, [](char c) { return c == ','; });
if (!process_address_strs.size()) {
return ManagementAPIError::toJsonString(
false, "coordinators",
"New coordinators\' processes are empty, please specify new processes\' network addresses with format "
"\"IP:PORT,IP:PORT,...,IP:PORT\"");
}
for (index = 0; index < process_address_strs.size(); index++) {
try {
auto a = NetworkAddress::parse(process_address_strs[index]);
if (!a.isValid())
parse_error = true;
else
addressesVec.push_back(a);
} catch (Error& e) {
TraceEvent(SevDebug, "SpecialKeysNetworkParseError").error(e);
parse_error = true;
}
if (parse_error) {
std::string error =
"ERROR: \'" + process_address_strs[index] + "\' is not a valid network endpoint address\n";
if (process_address_strs[index].find(":tls") != std::string::npos)
error += " Do not include the `:tls' suffix when naming a process\n";
return ManagementAPIError::toJsonString(false, "coordinators", error);
}
}
}
if (addressesVec.size())
change = specifiedQuorumChange(addressesVec);
else
change = noQuorumChange();
// check update for cluster_description
Key cluster_decription_key = LiteralStringRef("cluster_description").withPrefix(kr.begin);
auto entry = ryw->getSpecialKeySpaceWriteMap()[cluster_decription_key];
if (entry.first) {
// check valid description [a-zA-Z0-9_]+
if (entry.second.present() && isAlphaNumeric(entry.second.get().toString())) {
// do the name change
change = nameQuorumChange(entry.second.get().toString(), change);
} else {
// throw the error
return Optional<std::string>(ManagementAPIError::toJsonString(
false, "coordinators", "Cluster description must match [A-Za-z0-9_]+"));
}
}
ASSERT(change.isValid());
TraceEvent(SevDebug, "SKSChangeCoordinatorsStart")
.detail("NewAddresses", describe(addressesVec))
.detail("Description", entry.first ? entry.second.get().toString() : "");
Optional<CoordinatorsResult> r = wait(changeQuorumChecker(&ryw->getTransaction(), change, &addressesVec));
TraceEvent(SevDebug, "SKSChangeCoordinatorsFinish")
.detail("Result", r.present() ? static_cast<int>(r.get()) : -1); // -1 means success
if (r.present()) {
auto res = r.get();
std::string error_msg;
bool retriable = false;
if (res == CoordinatorsResult::INVALID_NETWORK_ADDRESSES) {
error_msg = "The specified network addresses are invalid";
} else if (res == CoordinatorsResult::SAME_NETWORK_ADDRESSES) {
error_msg = "No change (existing configuration satisfies request)";
} else if (res == CoordinatorsResult::NOT_COORDINATORS) {
error_msg = "Coordination servers are not running on the specified network addresses";
} else if (res == CoordinatorsResult::DATABASE_UNREACHABLE) {
error_msg = "Database unreachable";
} else if (res == CoordinatorsResult::BAD_DATABASE_STATE) {
error_msg = "The database is in an unexpected state from which changing coordinators might be unsafe";
} else if (res == CoordinatorsResult::COORDINATOR_UNREACHABLE) {
error_msg = "One of the specified coordinators is unreachable";
retriable = true;
} else if (res == CoordinatorsResult::NOT_ENOUGH_MACHINES) {
error_msg = "Too few fdbserver machines to provide coordination at the current redundancy level";
} else if (res == CoordinatorsResult::SUCCESS) {
TraceEvent(SevError, "SpecialKeysForCoordinators").detail("UnexpectedSuccessfulResult", "");
} else {
ASSERT(false);
}
msg = ManagementAPIError::toJsonString(retriable, "coordinators", error_msg);
}
return msg;
}
Future<Optional<std::string>> CoordinatorsImpl::commit(ReadYourWritesTransaction* ryw) {
return coordinatorsCommitActor(ryw, getKeyRange());
}
void CoordinatorsImpl::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) {
return throwSpecialKeyApiFailure(ryw, "coordinators", "Clear range is meaningless thus forbidden for coordinators");
}
void CoordinatorsImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) {
return throwSpecialKeyApiFailure(ryw, "coordinators",
"Clear operation is meaningless thus forbidden for coordinators");
}
CoordinatorsAutoImpl::CoordinatorsAutoImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
ACTOR static Future<Standalone<RangeResultRef>> CoordinatorsAutoImplActor(ReadYourWritesTransaction* ryw,
KeyRangeRef kr) {
state Standalone<RangeResultRef> res;
state std::string autoCoordinatorsKey;
state Transaction& tr = ryw->getTransaction();
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::USE_PROVISIONAL_PROXIES);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
Optional<Value> currentKey = wait(tr.get(coordinatorsKey));
if (!currentKey.present()) {
ryw->setSpecialKeySpaceErrorMsg(
ManagementAPIError::toJsonString(false, "auto_coordinators", "The coordinator key does not exist"));
throw special_keys_api_failure();
}
state ClusterConnectionString old(currentKey.get().toString());
state CoordinatorsResult result = CoordinatorsResult::SUCCESS;
std::vector<NetworkAddress> _desiredCoordinators = wait(autoQuorumChange()->getDesiredCoordinators(
&tr, old.coordinators(), Reference<ClusterConnectionFile>(new ClusterConnectionFile(old)), result));
if (result == CoordinatorsResult::NOT_ENOUGH_MACHINES) {
// we could get not_enough_machines if we happen to see the database while the cluster controller is updating
// the worker list, so make sure it happens twice before returning a failure
ryw->setSpecialKeySpaceErrorMsg(ManagementAPIError::toJsonString(
true, "auto_coordinators", "The auto change attempt did not get enough machines, please try again"));
throw special_keys_api_failure();
}
for (const auto& address : _desiredCoordinators) {
autoCoordinatorsKey += autoCoordinatorsKey.size() ? "," : "";
autoCoordinatorsKey += address.toString();
}
res.push_back_deep(res.arena(), KeyValueRef(kr.begin, Value(autoCoordinatorsKey)));
return res;
}
Future<Standalone<RangeResultRef>> CoordinatorsAutoImpl::getRange(ReadYourWritesTransaction* ryw,
KeyRangeRef kr) const {
// single key range, the queried range should always be the same as the underlying range
ASSERT(kr == getKeyRange());
return CoordinatorsAutoImplActor(ryw, kr);
}

View File

@ -332,5 +332,20 @@ public:
void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override;
};
class CoordinatorsImpl : public SpecialKeyRangeRWImpl {
public:
explicit CoordinatorsImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
Future<Optional<std::string>> commit(ReadYourWritesTransaction* ryw) override;
void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) override;
void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override;
};
class CoordinatorsAutoImpl : public SpecialKeyRangeReadImpl {
public:
explicit CoordinatorsAutoImpl(KeyRangeRef kr);
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
};
#include "flow/unactorcompiler.h"
#endif

View File

@ -24,6 +24,7 @@
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbrpc/simulator.h"
#include "fdbclient/Schemas.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct ChangeConfigWorkload : TestWorkload {
@ -65,9 +66,9 @@ struct ChangeConfigWorkload : TestWorkload {
TraceEvent("WaitForReplicasExtraEnd");
} if (self->networkAddresses.size()) {
if (self->networkAddresses == "auto")
wait(success(changeQuorum(extraDB, autoQuorumChange())));
wait(CoordinatorsChangeActor(extraDB, self, true));
else
wait(success(changeQuorum(extraDB, specifiedQuorumChange(NetworkAddress::parseList(self->networkAddresses)))));
wait(CoordinatorsChangeActor(extraDB, self));
}
wait(delay(5*deterministicRandom()->random01()));
}
@ -91,9 +92,9 @@ struct ChangeConfigWorkload : TestWorkload {
}
if( self->networkAddresses.size() ) {
if (self->networkAddresses == "auto")
wait(success( changeQuorum( cx, autoQuorumChange() ) ));
wait(CoordinatorsChangeActor(cx, self, true));
else
wait(success( changeQuorum( cx, specifiedQuorumChange(NetworkAddress::parseList( self->networkAddresses )) ) ));
wait(CoordinatorsChangeActor(cx, self));
}
if(!extraConfigureBefore) {
@ -102,6 +103,86 @@ struct ChangeConfigWorkload : TestWorkload {
return Void();
}
ACTOR static Future<Void> CoordinatorsChangeActor(Database cx, ChangeConfigWorkload* self,
bool autoChange = false) {
state ReadYourWritesTransaction tr(cx);
state int notEnoughMachineResults = 0; // Retry for the second time if we first get this result
state std::string desiredCoordinatorsKey; // comma separated
if (autoChange) { // if auto, we first get the desired addresses by read \xff\xff/management/auto_coordinators
loop {
try {
Optional<Value> newCoordinatorsKey = wait(tr.get(
LiteralStringRef("auto_coordinators")
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
ASSERT(newCoordinatorsKey.present());
desiredCoordinatorsKey = newCoordinatorsKey.get().toString();
tr.reset();
break;
} catch (Error& e) {
if (e.code() == error_code_special_keys_api_failure) {
Optional<Value> errorMsg =
wait(tr.get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
ASSERT(errorMsg.present());
std::string errorStr;
auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
// special_key_space_management_api_error_msg schema validation
TraceEvent(SevDebug, "GetAutoCoordinatorsChange")
.detail("ErrorMessage", valueObj["message"].get_str());
ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
ASSERT(valueObj["command"].get_str() == "auto_coordinators");
if (valueObj["retriable"].get_bool() && notEnoughMachineResults < 1) {
notEnoughMachineResults++;
wait(delay(1.0));
tr.reset();
} else {
break;
}
} else {
wait(tr.onError(e));
}
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
}
}
} else {
desiredCoordinatorsKey = self->networkAddresses;
}
loop {
try {
tr.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
tr.set(LiteralStringRef("processes")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")),
Value(desiredCoordinatorsKey));
TraceEvent(SevDebug, "CoordinatorsChangeBeforeCommit")
.detail("Auto", autoChange)
.detail("NewCoordinatorsKey", describe(desiredCoordinatorsKey));
wait(tr.commit());
ASSERT(false);
} catch (Error& e) {
state Error err(e);
if (e.code() == error_code_special_keys_api_failure) {
Optional<Value> errorMsg =
wait(tr.get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
ASSERT(errorMsg.present());
std::string errorStr;
auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
// special_key_space_management_api_error_msg schema validation
TraceEvent(SevDebug, "CoordinatorsChangeError")
.detail("Auto", autoChange)
.detail("ErrorMessage", valueObj["message"].get_str());
ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
ASSERT(valueObj["command"].get_str() == "coordinators");
break;
} else {
wait(tr.onError(err));
}
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
}
}
return Void();
}
};
WorkloadFactory<ChangeConfigWorkload> ChangeConfigWorkloadFactory("ChangeConfig");

View File

@ -18,6 +18,8 @@
* limitations under the License.
*/
#include <boost/algorithm/string.hpp>
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ReadYourWrites.h"
@ -434,14 +436,16 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
}
// test case when registered range is the same as the underlying module
try {
state Standalone<RangeResultRef> result = wait(tx->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"),
LiteralStringRef("\xff\xff/worker_interfaces0")),
CLIENT_KNOBS->TOO_MANY));
// We should have at least 1 process in the cluster
ASSERT(result.size());
state KeyValueRef entry = deterministicRandom()->randomChoice(result);
Optional<Value> singleRes = wait(tx->get(entry.key));
ASSERT(singleRes.present() && singleRes.get() == entry.value);
state Standalone<RangeResultRef> result =
wait(tx->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"),
LiteralStringRef("\xff\xff/worker_interfaces0")),
CLIENT_KNOBS->TOO_MANY));
// Note: there's possibility we get zero workers
if (result.size()) {
state KeyValueRef entry = deterministicRandom()->randomChoice(result);
Optional<Value> singleRes = wait(tx->get(entry.key));
if (singleRes.present()) ASSERT(singleRes.get() == entry.value);
}
tx->reset();
} catch (Error& e) {
wait(tx->onError(e));
@ -760,7 +764,11 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
Key("process/class_source/" + address)
.withPrefix(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin)));
ASSERT(class_source.present() && class_source.get() == LiteralStringRef("set_class"));
TraceEvent(SevDebug, "SetClassSourceDebug")
.detail("Present", class_source.present())
.detail("ClassSource", class_source.present() ? class_source.get().toString() : "__Nothing");
// Very rarely, we get an empty worker list, thus no class_source data
if (class_source.present()) ASSERT(class_source.get() == LiteralStringRef("set_class"));
tx->reset();
} else {
// If no worker process returned, skip the test
@ -875,6 +883,195 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
}
}
}
// coordinators
// test read, makes sure it's the same as reading from coordinatorsKey
loop {
try {
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<Value> res = wait(tx->get(coordinatorsKey));
ASSERT(res.present()); // Otherwise, database is in a bad state
state ClusterConnectionString cs(res.get().toString());
Optional<Value> coordinator_processes_key =
wait(tx->get(LiteralStringRef("processes")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators"))));
ASSERT(coordinator_processes_key.present());
std::vector<std::string> process_addresses;
boost::split(process_addresses, coordinator_processes_key.get().toString(),
[](char c) { return c == ','; });
ASSERT(process_addresses.size() == cs.coordinators().size());
// compare the coordinator process network addresses one by one
for (const auto& network_address : cs.coordinators()) {
ASSERT(std::find(process_addresses.begin(), process_addresses.end(), network_address.toString()) !=
process_addresses.end());
}
tx->reset();
break;
} catch (Error& e) {
wait(tx->onError(e));
}
}
// test change coordinators and cluster description
// we randomly pick one process(not coordinator) and add it, in this case, it should always succeed
{
state std::string new_cluster_description = deterministicRandom()->randomAlphaNumeric(8);
state std::string new_coordinator_process;
state std::vector<std::string> old_coordinators_processes;
state bool possible_to_add_coordinator;
state KeyRange coordinators_key_range =
KeyRangeRef(LiteralStringRef("process/"), LiteralStringRef("process0"))
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators"));
loop {
try {
// get current coordinators
Optional<Value> processes_key =
wait(tx->get(LiteralStringRef("processes")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators"))));
ASSERT(processes_key.present());
boost::split(old_coordinators_processes, processes_key.get().toString(),
[](char c) { return c == ','; });
// pick up one non-coordinator process if possible
vector<ProcessData> workers = wait(getWorkers(&tx->getTransaction()));
TraceEvent(SevDebug, "CoordinatorsManualChange")
.detail("OldCoordinators", describe(old_coordinators_processes))
.detail("WorkerSize", workers.size());
if (workers.size() > old_coordinators_processes.size()) {
loop {
auto worker = deterministicRandom()->randomChoice(workers);
new_coordinator_process = worker.address.toString();
if (std::find(old_coordinators_processes.begin(), old_coordinators_processes.end(),
worker.address.toString()) == old_coordinators_processes.end()) {
break;
}
}
possible_to_add_coordinator = true;
} else {
possible_to_add_coordinator = false;
}
tx->reset();
break;
} catch (Error& e) {
wait(tx->onError(e));
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
}
}
TraceEvent(SevDebug, "CoordinatorsManualChange")
.detail("NewCoordinator", possible_to_add_coordinator ? new_coordinator_process : "")
.detail("NewClusterDescription", new_cluster_description);
if (possible_to_add_coordinator) {
loop {
try {
std::string new_processes_key(new_coordinator_process);
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
for (const auto& address : old_coordinators_processes) {
new_processes_key += "," + address;
}
tx->set(LiteralStringRef("processes")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")),
Value(new_processes_key));
// update cluster description
tx->set(LiteralStringRef("cluster_description")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")),
Value(new_cluster_description));
wait(tx->commit());
ASSERT(false);
} catch (Error& e) {
TraceEvent(SevDebug, "CoordinatorsManualChange").error(e);
// if we repeat doing the change, we will get the error:
// CoordinatorsResult::SAME_NETWORK_ADDRESSES
if (e.code() == error_code_special_keys_api_failure) {
Optional<Value> errorMsg =
wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
ASSERT(errorMsg.present());
std::string errorStr;
auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
// special_key_space_management_api_error_msg schema validation
ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
TraceEvent(SevDebug, "CoordinatorsManualChange")
.detail("ErrorMessage", valueObj["message"].get_str());
ASSERT(valueObj["command"].get_str() == "coordinators");
if (valueObj["retriable"].get_bool()) { // coordinators not reachable, retry
tx->reset();
} else {
ASSERT(valueObj["message"].get_str() ==
"No change (existing configuration satisfies request)");
tx->reset();
break;
}
} else {
wait(tx->onError(e));
}
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
}
}
// change successful, now check it is already changed
try {
tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<Value> res = wait(tx->get(coordinatorsKey));
ASSERT(res.present()); // Otherwise, database is in a bad state
ClusterConnectionString cs(res.get().toString());
ASSERT(cs.coordinators().size() == old_coordinators_processes.size() + 1);
// verify the coordinators' addresses
for (const auto& network_address : cs.coordinators()) {
std::string address_str = network_address.toString();
ASSERT(std::find(old_coordinators_processes.begin(), old_coordinators_processes.end(),
address_str) != old_coordinators_processes.end() ||
new_coordinator_process == address_str);
}
// verify the cluster decription
ASSERT(new_cluster_description == cs.clusterKeyName().toString());
tx->reset();
} catch (Error& e) {
wait(tx->onError(e));
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
}
// change back to original settings
loop {
try {
std::string new_processes_key;
tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
for (const auto& address : old_coordinators_processes) {
new_processes_key += new_processes_key.size() ? "," : "";
new_processes_key += address;
}
tx->set(LiteralStringRef("processes")
.withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")),
Value(new_processes_key));
wait(tx->commit());
ASSERT(false);
} catch (Error& e) {
TraceEvent(SevDebug, "CoordinatorsManualChangeRevert").error(e);
if (e.code() == error_code_special_keys_api_failure) {
Optional<Value> errorMsg =
wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
ASSERT(errorMsg.present());
std::string errorStr;
auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
// special_key_space_management_api_error_msg schema validation
ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
TraceEvent(SevDebug, "CoordinatorsManualChangeRevert")
.detail("ErrorMessage", valueObj["message"].get_str());
ASSERT(valueObj["command"].get_str() == "coordinators");
if (valueObj["retriable"].get_bool()) {
tx->reset();
} else if (valueObj["message"].get_str() ==
"No change (existing configuration satisfies request)") {
tx->reset();
break;
} else {
TraceEvent(SevError, "CoordinatorsManualChangeRevert")
.detail("UnexpectedError", valueObj["message"].get_str());
throw special_keys_api_failure();
}
} else {
wait(tx->onError(e));
}
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
}
}
}
}
return Void();
}
};