From e1182f2f86e2677f61ab1f521dccc8e42ddabf7e Mon Sep 17 00:00:00 2001 From: Chaoguang Lin Date: Fri, 29 Jan 2021 11:45:52 -0800 Subject: [PATCH 01/13] Add coordinators to special keys --- fdbclient/ManagementAPI.actor.cpp | 71 +++++++ fdbclient/ManagementAPI.actor.h | 1 + fdbclient/NativeAPI.actor.cpp | 6 + fdbclient/SpecialKeySpace.actor.cpp | 193 +++++++++++++++++- fdbclient/SpecialKeySpace.actor.h | 9 + .../SpecialKeySpaceCorrectness.actor.cpp | 178 +++++++++++++++- 6 files changed, 446 insertions(+), 12 deletions(-) diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index 3d712d34aa..26e892767d 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -1009,6 +1009,77 @@ ACTOR Future> getCoordinators( Database cx ) { } } +ACTOR Future> changeQuorumChecker(Transaction* tr, Reference change, std::vector* desiredCoordinators, int* retries, int* notEnoughMachineResults) { + tr->setOption( FDBTransactionOptions::LOCK_AWARE ); + tr->setOption( FDBTransactionOptions::USE_PROVISIONAL_PROXIES ); + tr->setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); + Optional currentKey = wait( tr->get( coordinatorsKey ) ); + + if (!currentKey.present()) + return CoordinatorsResult::BAD_DATABASE_STATE; // Someone deleted this key entirely? + + state ClusterConnectionString old( currentKey.get().toString() ); + if ( tr->getDatabase()->getConnectionFile() && old.clusterKeyName().toString() != tr->getDatabase()->getConnectionFile()->getConnectionString().clusterKeyName() ) + return CoordinatorsResult::BAD_DATABASE_STATE; // Someone changed the "name" of the database?? + + state CoordinatorsResult result = CoordinatorsResult::SUCCESS; + if(!desiredCoordinators->size()) { + std::vector _desiredCoordinators = wait( change->getDesiredCoordinators( tr, old.coordinators(), Reference(new ClusterConnectionFile(old)), result ) ); + *desiredCoordinators = _desiredCoordinators; + } + + if(result == CoordinatorsResult::NOT_ENOUGH_MACHINES && *notEnoughMachineResults < 1) { + //we could get not_enough_machines if we happen to see the database while the cluster controller is updating the worker list, so make sure it happens twice before returning a failure + (*notEnoughMachineResults)++; + wait( delay(1.0) ); + tr->reset(); + return Optional(); + } + if (result != CoordinatorsResult::SUCCESS) + return result; + if (!desiredCoordinators->size()) + return CoordinatorsResult::INVALID_NETWORK_ADDRESSES; + std::sort(desiredCoordinators->begin(), desiredCoordinators->end()); + + std::string newName = change->getDesiredClusterKeyName(); + if (newName.empty()) newName = old.clusterKeyName().toString(); + + if ( old.coordinators() == *desiredCoordinators && old.clusterKeyName() == newName) + return *retries ? CoordinatorsResult::SUCCESS : CoordinatorsResult::SAME_NETWORK_ADDRESSES; + + state ClusterConnectionString conn( *desiredCoordinators, StringRef( newName + ':' + deterministicRandom()->randomAlphaNumeric( 32 ) ) ); + + if(g_network->isSimulated()) { + for(int i = 0; i < (desiredCoordinators->size()/2)+1; i++) { + auto addresses = g_simulator.getProcessByAddress((*desiredCoordinators)[i])->addresses; + + g_simulator.protectedAddresses.insert(addresses.address); + if(addresses.secondaryAddress.present()) { + g_simulator.protectedAddresses.insert(addresses.secondaryAddress.get()); + } + TraceEvent("ProtectCoordinator").detail("Address", (*desiredCoordinators)[i]).backtrace(); + } + } + + TraceEvent("AttemptingQuorumChange").detail("FromCS", old.toString()).detail("ToCS", conn.toString()); + TEST(old.clusterKeyName() != conn.clusterKeyName()); // Quorum change with new name + TEST(old.clusterKeyName() == conn.clusterKeyName()); // Quorum change with unchanged name + + vector>> leaderServers; + ClientCoordinators coord( Reference( new ClusterConnectionFile( conn ) ) ); + for( int i = 0; i < coord.clientLeaderServers.size(); i++ ) + leaderServers.push_back( retryBrokenPromise( coord.clientLeaderServers[i].getLeader, GetLeaderRequest( coord.clusterKey, UID() ), TaskPriority::CoordinationReply ) ); + + choose { + when( wait( waitForAll( leaderServers ) ) ) {} + when( wait( delay(5.0) ) ) { + return CoordinatorsResult::COORDINATOR_UNREACHABLE; + } + } + tr->set( coordinatorsKey, conn.toString() ); + return Optional(); +} + ACTOR Future changeQuorum(Database cx, Reference change) { state Transaction tr(cx); state int retries = 0; diff --git a/fdbclient/ManagementAPI.actor.h b/fdbclient/ManagementAPI.actor.h index bbd69b589f..e013570157 100644 --- a/fdbclient/ManagementAPI.actor.h +++ b/fdbclient/ManagementAPI.actor.h @@ -144,6 +144,7 @@ struct IQuorumChange : ReferenceCounted { }; // Change to use the given set of coordination servers +ACTOR Future> changeQuorumChecker(Transaction* tr, Reference change, std::vector* desiredCoordinators, int* retries, int* notEnoughMachineResults); ACTOR Future changeQuorum(Database cx, Reference change); Reference autoQuorumChange(int desired = -1); Reference noQuorumChange(); diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 85c83db335..58652c75c9 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -950,6 +950,12 @@ DatabaseContext::DatabaseContext(Reference( SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::TRACING))); + registerSpecialKeySpaceModule( + SpecialKeySpace::MODULE::CONFIGURATION, SpecialKeySpace::IMPLTYPE::READWRITE, + std::make_unique( + KeyRangeRef(LiteralStringRef("coordinators/"), LiteralStringRef("coordinators0")) + .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin)) + ); } if (apiVersionAtLeast(630)) { registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, SpecialKeySpace::IMPLTYPE::READONLY, diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index 5b3dbc90c9..581acab861 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -27,7 +27,16 @@ namespace { const std::string kTracingTransactionIdKey = "transaction_id"; const std::string kTracingTokenKey = "token"; + +static bool isAlphaNumeric(const std::string& key) { + // [A-Za-z0-9_]+ + if (!key.size()) return false; + for (const char& c : key) { + if (!((c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || c == '_')) return false; + } + return true; } +} // namespace std::unordered_map SpecialKeySpace::moduleToBoundary = { { SpecialKeySpace::MODULE::TRANSACTION, @@ -55,10 +64,12 @@ std::unordered_map SpecialKeySpace::managementApiCommandT .withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }, { "lock", singleKeyRange(LiteralStringRef("db_locked")).withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }, { "consistencycheck", singleKeyRange(LiteralStringRef("consistency_check_suspended")) - .withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) } + .withPrefix(moduleToBoundary[MODULE::MANAGEMENT].begin) }, + { "coordinators", KeyRangeRef(LiteralStringRef("coordinators/"), LiteralStringRef("coordinators0")) + .withPrefix(moduleToBoundary[MODULE::CONFIGURATION].begin) } }; -std::set SpecialKeySpace::options = { "excluded/force", "failed/force" }; +std::set SpecialKeySpace::options = { "excluded/force", "failed/force", "coordinators/auto" }; std::set SpecialKeySpace::tracingOptions = { kTracingTransactionIdKey, kTracingTokenKey }; @@ -178,13 +189,12 @@ ACTOR Future normalizeKeySelectorActor(SpecialKeySpace* sks, ReadYourWrite TraceEvent(SevDebug, "ReadToBoundary") .detail("TerminateKey", ks->getKey()) .detail("TerminateOffset", ks->offset); - // If still not normalized after moving to the boundary, + // If still not normalized after moving to the boundary, // let key selector clamp up to the boundary if (ks->offset < 1) { result->readToBegin = true; ks->setKey(boundary.begin); - } - else { + } else { result->readThroughEnd = true; ks->setKey(boundary.end); } @@ -481,11 +491,24 @@ KeyRange SpecialKeySpace::decode(const KeyRangeRef& kr) { return KeyRangeRef(begin->value()->decode(kr.begin), begin->value()->decode(kr.end)); } +// Sometimes, we need to force impl::commit to execute without write to the underlying key range +void precommitUpdate(SpecialKeySpace* sks, ReadYourWritesTransaction* ryw, + std::set& writeModulePtrs) { + // if "coordinators/auto" is set, make sure we execute CoordinatorsImpl::commit + Key option_key = SpecialKeySpace::getManagementApiCommandOptionSpecialKey("coordinators", "auto"); + auto entry = ryw->getSpecialKeySpaceWriteMap()[option_key]; + if (entry.first) { + writeModulePtrs.insert( + sks->getRWImpls().rangeContaining(SpecialKeySpace::getManagementApiCommandPrefix("coordinators"))->value()); + } +} + ACTOR Future commitActor(SpecialKeySpace* sks, ReadYourWritesTransaction* ryw) { state RangeMap>, KeyRangeRef>::Ranges ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(specialKeys); state RangeMap>, KeyRangeRef>::iterator iter = ranges.begin(); state std::set writeModulePtrs; + precommitUpdate(sks, ryw, writeModulePtrs); while (iter != ranges.end()) { std::pair> entry = iter->value(); if (entry.first) { @@ -638,6 +661,13 @@ void ManagementCommandsOptionsImpl::set(ReadYourWritesTransaction* ryw, const Ke TraceEvent(SevDebug, "ManagementApiOption").detail("Option", option).detail("Key", key); ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional(value))); } + // a hack to make sure we execute CoordinatorsImpl::commit + // if (key == SpecialKeySpace::getManagementApiCommandOptionSpecialKey("coordinators", "auto")) { + // Key hack_key = + // LiteralStringRef("option/auto").withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")); + // // since clear is forbidden in coordinators special key range, this is okay + // ryw->getSpecialKeySpaceWriteMap().insert(hack_key, std::make_pair(true, Optional())); + // } } void ManagementCommandsOptionsImpl::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) { @@ -1296,8 +1326,7 @@ TracingOptionsImpl::TracingOptionsImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(k TraceEvent("TracingOptionsImpl::TracingOptionsImpl").detail("Range", kr); } -Future> TracingOptionsImpl::getRange(ReadYourWritesTransaction* ryw, - KeyRangeRef kr) const { +Future> TracingOptionsImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const { Standalone result; for (const auto& option : SpecialKeySpace::getTracingOptions()) { auto key = getKeyRange().begin.withSuffix(option); @@ -1306,9 +1335,11 @@ Future> TracingOptionsImpl::getRange(ReadYourWritesTr } if (key.endsWith(kTracingTransactionIdKey)) { - result.push_back_deep(result.arena(), KeyValueRef(key, std::to_string(ryw->getTransactionInfo().spanID.first()))); + result.push_back_deep(result.arena(), + KeyValueRef(key, std::to_string(ryw->getTransactionInfo().spanID.first()))); } else if (key.endsWith(kTracingTokenKey)) { - result.push_back_deep(result.arena(), KeyValueRef(key, std::to_string(ryw->getTransactionInfo().spanID.second()))); + result.push_back_deep(result.arena(), + KeyValueRef(key, std::to_string(ryw->getTransactionInfo().spanID.second()))); } } return result; @@ -1351,3 +1382,147 @@ void TracingOptionsImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key ryw->setSpecialKeySpaceErrorMsg("clear disabled"); throw special_keys_api_failure(); } + +CoordinatorsImpl::CoordinatorsImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {} + +Future> CoordinatorsImpl::getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const { + Standalone result; + KeyRef prefix(getKeyRange().begin); + // the constructor of ClusterConnectionFile already checks whether the file is valid + auto cs = ClusterConnectionFile(ryw->getDatabase()->getConnectionFile()->getFilename()).getConnectionString(); + auto coordinator_processes = cs.coordinators(); + Key cluster_decription_key = prefix.withSuffix(LiteralStringRef("cluster_description")); + if (kr.contains(cluster_decription_key)) { + result.push_back_deep(result.arena(), KeyValueRef(cluster_decription_key, cs.clusterKeyName())); + } + // Note : the sort by string is anti intuition, ex. 1.1.1.1:11 < 1.1.1.1:5 + // include :tls in keys if the network addresss is TLS + std::sort(coordinator_processes.begin(), coordinator_processes.end(), + [](const NetworkAddress& lhs, const NetworkAddress& rhs) { + // return formatIpPort(lhs.ip, lhs.port) < formatIpPort(rhs.ip, rhs.port); + return lhs.toString() < rhs.toString(); + }); + for (auto& w : coordinator_processes) { + Key k(prefix.withSuffix(LiteralStringRef("process/")).withSuffix(w.toString()));//formatIpPort(w.ip, w.port))); + if (kr.contains(k)) { + result.push_back(result.arena(), KeyValueRef(k, ValueRef())); + result.arena().dependsOn(k.arena()); + } + } + return rywGetRange(ryw, kr, result); +} + +ACTOR static Future> coordinatorsCommitActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) { + state Reference change; + state std::vector addressesVec; + state Optional msg; + state int retry = 0; + state int notEnoughMachineResults; + + KeyRange process_range = + KeyRangeRef(LiteralStringRef("process/"), LiteralStringRef("process0")).withPrefix(kr.begin); + auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(process_range); + + auto iter = ranges.begin(); + while (iter != ranges.end()) { + auto entry = iter->value(); + if (entry.first) { + ASSERT(entry.second.present()); // no clear should be seen here + Key address = iter->begin().removePrefix(process_range.begin); + auto a = NetworkAddress::parse(address.toString()); + if (!a.isValid()) { + std::string error = "ERROR: \'" + address.toString() + "\' is not a valid network endpoint address\n"; + if (address.toString().find(":tls") != std::string::npos) + error += " Do not include the `:tls' suffix when naming a process\n"; + msg = ManagementAPIError::toJsonString(false, "coordinators", error); + return msg; + } + addressesVec.push_back(a); + } + ++iter; + } + + // check auto option + auto auto_option = ryw->getSpecialKeySpaceWriteMap()[SpecialKeySpace::getManagementApiCommandOptionSpecialKey( + "coordinators", "auto")]; + if (auto_option.first) { + // auto option is speicified, check the hack_key we use is set + // auto hack_entry = + // ryw->getSpecialKeySpaceWriteMap()[LiteralStringRef("option/auto") + // .withPrefix( + // SpecialKeySpace::getManagementApiCommandPrefix("coordinators"))]; + // ASSERT(hack_entry.first && !hack_entry.second.present()); + change = autoQuorumChange(); + } else { + if (addressesVec.size()) + change = specifiedQuorumChange(addressesVec); + else + change = noQuorumChange(); + } + + // check update for cluster_description + Key cluster_decription_key = LiteralStringRef("cluster_description").withPrefix(kr.begin); + auto entry = ryw->getSpecialKeySpaceWriteMap()[cluster_decription_key]; + if (entry.first) { + // check valid description [a-zA-Z0-9_]+ + if (entry.second.present() && isAlphaNumeric(entry.second.get().toString())) { + // do the name change + change = nameQuorumChange(entry.second.get().toString(), change); + } else { + // throw the error + return Optional(ManagementAPIError::toJsonString( + false, "coordinators", "Cluster description must match [A-Za-z0-9_]+")); + } + } + + ASSERT(change.isValid()); + + TraceEvent(SevDebug, "SKSChangeCoordinatorsStart") + .detail("NewAddresses", describe(addressesVec)) + .detail("Auto", auto_option.first) + .detail("Description", entry.first ? entry.second.get().toString() : ""); + + Optional r = + wait(changeQuorumChecker(&ryw->getTransaction(), change, &addressesVec, &retry, ¬EnoughMachineResults)); + + TraceEvent(SevDebug, "SKSChangeCoordinatorsFinish") + .detail("Result", r.present() ? int(r.get()) : -1); // -1 means success + if (r.present()) { + auto res = r.get(); + std::string error_msg; + if (res == CoordinatorsResult::INVALID_NETWORK_ADDRESSES) { + error_msg = "The specified network addresses are invalid"; + } else if (res == CoordinatorsResult::SAME_NETWORK_ADDRESSES) { + error_msg = "No change (existing configuration satisfies request)"; + } else if (res == CoordinatorsResult::NOT_COORDINATORS) { + error_msg = "Coordination servers are not running on the specified network addresses"; + } else if (res == CoordinatorsResult::DATABASE_UNREACHABLE) { + error_msg = "Database unreachable"; + } else if (res == CoordinatorsResult::BAD_DATABASE_STATE) { + error_msg = "The database is in an unexpected state from which changing coordinators might be unsafe"; + } else if (res == CoordinatorsResult::COORDINATOR_UNREACHABLE) { + error_msg = "One of the specified coordinators is unreachable"; + } else if (res == CoordinatorsResult::NOT_ENOUGH_MACHINES) { + error_msg = "Too few fdbserver machines to provide coordination at the current redundancy level"; + } else if (res == CoordinatorsResult::SUCCESS) { + TraceEvent(SevError, "SpecialKeysForCoordinators").detail("UnexpectedSuccessfulResult", ""); + } else { + ASSERT(false); + } + msg = ManagementAPIError::toJsonString(false, "coordinators", error_msg); + } + return msg; +} + +Future> CoordinatorsImpl::commit(ReadYourWritesTransaction* ryw) { + return coordinatorsCommitActor(ryw, getKeyRange()); +} + +void CoordinatorsImpl::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) { + return throwSpecialKeyApiFailure(ryw, "coordinators", "Clear range is meaningless thus forbidden for coordinators"); +} + +void CoordinatorsImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) { + return throwSpecialKeyApiFailure(ryw, "coordinators", + "Clear operation is meaningless thus forbidden for coordinators"); +} \ No newline at end of file diff --git a/fdbclient/SpecialKeySpace.actor.h b/fdbclient/SpecialKeySpace.actor.h index 9e428abfee..b606937fd3 100644 --- a/fdbclient/SpecialKeySpace.actor.h +++ b/fdbclient/SpecialKeySpace.actor.h @@ -332,5 +332,14 @@ public: void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override; }; +class CoordinatorsImpl : public SpecialKeyRangeRWImpl { +public: + explicit CoordinatorsImpl(KeyRangeRef kr); + Future> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override; + Future> commit(ReadYourWritesTransaction* ryw) override; + void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) override; + void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override; +}; + #include "flow/unactorcompiler.h" #endif diff --git a/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp b/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp index 5750073297..3ef4f2892a 100644 --- a/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp +++ b/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp @@ -434,9 +434,10 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { } // test case when registered range is the same as the underlying module try { - state Standalone result = wait(tx->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"), - LiteralStringRef("\xff\xff/worker_interfaces0")), - CLIENT_KNOBS->TOO_MANY)); + state Standalone result = + wait(tx->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"), + LiteralStringRef("\xff\xff/worker_interfaces0")), + CLIENT_KNOBS->TOO_MANY)); // We should have at least 1 process in the cluster ASSERT(result.size()); state KeyValueRef entry = deterministicRandom()->randomChoice(result); @@ -875,6 +876,177 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { } } } + // coordinators + // test read, makes sure it's the same as reading from coordinatorsKey + loop { + try { + tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + Optional res = wait(tx->get(coordinatorsKey)); + ASSERT(res.present()); // Otherwise, database is in a bad state + state ClusterConnectionString cs(res.get().toString()); + state KeyRange coordinator_process_key_range = + KeyRangeRef(LiteralStringRef("process/"), LiteralStringRef("process0")) + .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")); + Standalone coordinator_process_kvs = + wait(tx->getRange(coordinator_process_key_range, CLIENT_KNOBS->TOO_MANY)); + ASSERT(!coordinator_process_kvs.more); + ASSERT(self->getRangeResultInOrder(coordinator_process_kvs)); + ASSERT(coordinator_process_kvs.size() == cs.coordinators().size()); + // compare the coordinator process network addresses one by one + for (const auto& network_address : cs.coordinators()) { + Key addr = Key(network_address.toString())//formatIpPort(network_address.ip, network_address.port)) + .withPrefix(coordinator_process_key_range.begin); + KeyValueRef kv(addr, ValueRef()); + ASSERT(std::find(coordinator_process_kvs.begin(), coordinator_process_kvs.end(), kv) != + coordinator_process_kvs.end()); + } + tx->reset(); + break; + } catch (Error& e) { + wait(tx->onError(e)); + } + } + // test change coordinators and cluster description + // we randomly pick one process(not coordinator) and add it, in this case, it should always succeed + { + state std::string new_cluster_description = deterministicRandom()->randomAlphaNumeric(8); + state Key new_coordinator_process; + state Standalone old_coordinators_kvs; + state bool possible_to_add_coordinator; + state KeyRange coordinators_key_range = + KeyRangeRef(LiteralStringRef("process/"), LiteralStringRef("process0")) + .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")); + loop { + try { + // get current coordinators + Standalone _coordinators_kvs = + wait(tx->getRange(coordinators_key_range, CLIENT_KNOBS->TOO_MANY)); + old_coordinators_kvs = _coordinators_kvs; + // pick up one non-coordinator process if possible + vector workers = wait(getWorkers(&tx->getTransaction())); + TraceEvent(SevDebug, "CoordinatorsManualChange") + .detail("OldCoordinators", old_coordinators_kvs.size()) + .detail("WorkerSize", workers.size()); + if (workers.size() > old_coordinators_kvs.size()) { + loop { + auto worker = deterministicRandom()->randomChoice(workers); + new_coordinator_process = Key(worker.address.toString())//formatIpPort(worker.address.ip, worker.address.port)) + .withPrefix(coordinators_key_range.begin); + KeyValueRef kv(new_coordinator_process, ValueRef()); + if (std::find(old_coordinators_kvs.begin(), old_coordinators_kvs.end(), kv) == + old_coordinators_kvs.end()) { + break; + } + } + possible_to_add_coordinator = true; + } else { + possible_to_add_coordinator = false; + } + tx->reset(); + break; + } catch (Error& e) { + wait(tx->onError(e)); + } + } + TraceEvent(SevDebug, "CoordinatorsManualChange") + .detail("NewCoordinator", possible_to_add_coordinator ? new_coordinator_process.toString() : "") + .detail("NewClusterDescription", new_cluster_description); + if (possible_to_add_coordinator) { + loop { + try { + tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + for (const auto& kv : old_coordinators_kvs) { + // TraceEvent(SevDebug, "CoordinatorsManualChange").detail("AddressKey", kv.key.toString()); + tx->set(kv.key, kv.value); + } + // TraceEvent(SevDebug, "CoordinatorsManualChange") + // .detail("AddressKey", new_coordinator_process.toString()); + tx->set(new_coordinator_process, ValueRef()); + // update cluster description + tx->set(LiteralStringRef("cluster_description") + .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")), + Value(new_cluster_description)); + wait(tx->commit()); + tx->reset(); + break; + } catch (Error& e) { + TraceEvent(SevDebug, "CoordinatorsManualChange").error(e); + // if we repeat doing the change, we will get this error: + // CoordinatorsResult::SAME_NETWORK_ADDRESSES + if (e.code() == error_code_special_keys_api_failure) { + Optional errorMsg = + wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin)); + ASSERT(errorMsg.present()); + std::string errorStr; + auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj(); + auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj(); + // special_key_space_management_api_error_msg schema validation + ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true)); + ASSERT(valueObj["command"].get_str() == "coordinators" && + !valueObj["retriable"].get_bool()); + ASSERT(valueObj["message"].get_str() == + "No change (existing configuration satisfies request)"); + tx->reset(); + break; + } else { + wait(tx->onError(e)); + } + } + } + // change successful, now check it is already changed + try { + tx->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + Optional res = wait(tx->get(coordinatorsKey)); + ASSERT(res.present()); // Otherwise, database is in a bad state + ClusterConnectionString cs(res.get().toString()); + ASSERT(cs.coordinators().size() == old_coordinators_kvs.size() + 1); + // verify the coordinators' addresses + for (const auto& network_address : cs.coordinators()) { + Key addr = Key(network_address.toString())//formatIpPort(network_address.ip, network_address.port)) + .withPrefix(coordinators_key_range.begin); + KeyValueRef kv(addr, ValueRef()); + ASSERT(std::find(old_coordinators_kvs.begin(), old_coordinators_kvs.end(), kv) != + old_coordinators_kvs.end() || + new_coordinator_process == addr); + } + // verify the cluster decription + TraceEvent(SevDebug, "CoordinatorsManualChange") + .detail("NewClsuterDescription", cs.clusterKeyName()); + ASSERT(new_cluster_description == cs.clusterKeyName().toString()); + tx->reset(); + } catch (Error& e) { + wait(tx->onError(e)); + } + } + } + // test coordinators' "auto" option + loop { + try { + tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + tx->set(SpecialKeySpace::getManagementApiCommandOptionSpecialKey("coordinators", "auto"), ValueRef()); + wait(tx->commit()); // if an "auto" change happened, the commit may or may not succeed + tx->reset(); + } catch (Error& e) { + TraceEvent(SevDebug, "CoordinatorsAutoChange").error(e); + // if we repeat doing "auto" change, we will get this error: CoordinatorsResult::SAME_NETWORK_ADDRESSES + if (e.code() == error_code_special_keys_api_failure) { + Optional errorMsg = + wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin)); + ASSERT(errorMsg.present()); + std::string errorStr; + auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj(); + auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj(); + // special_key_space_management_api_error_msg schema validation + ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true)); + ASSERT(valueObj["command"].get_str() == "coordinators" && !valueObj["retriable"].get_bool()); + ASSERT(valueObj["message"].get_str() == "No change (existing configuration satisfies request)"); + tx->reset(); + break; + } else { + wait(tx->onError(e)); + } + } + } return Void(); } }; From 5a62e0c0848a82122b170672ef540e6214b5a041 Mon Sep 17 00:00:00 2001 From: Chaoguang Lin Date: Fri, 29 Jan 2021 12:20:30 -0800 Subject: [PATCH 02/13] Fix repeated code usage --- fdbclient/ManagementAPI.actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index 26e892767d..43073855c8 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -1062,8 +1062,8 @@ ACTOR Future> changeQuorumChecker(Transaction* tr, } TraceEvent("AttemptingQuorumChange").detail("FromCS", old.toString()).detail("ToCS", conn.toString()); - TEST(old.clusterKeyName() != conn.clusterKeyName()); // Quorum change with new name - TEST(old.clusterKeyName() == conn.clusterKeyName()); // Quorum change with unchanged name + ASSERT(old.clusterKeyName() != conn.clusterKeyName()); // Quorum change with new name + ASSERT(old.clusterKeyName() == conn.clusterKeyName()); // Quorum change with unchanged name vector>> leaderServers; ClientCoordinators coord( Reference( new ClusterConnectionFile( conn ) ) ); From 766de6574ab8a45a934e2ad4f7026b844c5bd466 Mon Sep 17 00:00:00 2001 From: Chaoguang Lin Date: Fri, 29 Jan 2021 15:32:53 -0800 Subject: [PATCH 03/13] Add trace --- fdbclient/ManagementAPI.actor.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index 43073855c8..24c5d59e1f 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -1061,9 +1061,11 @@ ACTOR Future> changeQuorumChecker(Transaction* tr, } } - TraceEvent("AttemptingQuorumChange").detail("FromCS", old.toString()).detail("ToCS", conn.toString()); - ASSERT(old.clusterKeyName() != conn.clusterKeyName()); // Quorum change with new name - ASSERT(old.clusterKeyName() == conn.clusterKeyName()); // Quorum change with unchanged name + TraceEvent("AttemptingQuorumChange") + .detail("FromCS", old.toString()) + .detail("ToCS", conn.toString()) + .detail("OldClusterDescription", old.clusterKeyName()) + .detail("NewClusterDescription", conn.clusterKeyName()); vector>> leaderServers; ClientCoordinators coord( Reference( new ClusterConnectionFile( conn ) ) ); From c2aedb0b94547641d07b81c40f9078edf3aa3246 Mon Sep 17 00:00:00 2001 From: Chaoguang Lin Date: Fri, 29 Jan 2021 18:20:09 -0800 Subject: [PATCH 04/13] clang-format, remove comments --- fdbclient/NativeAPI.actor.cpp | 9 ++++----- fdbclient/SpecialKeySpace.actor.cpp | 22 +++++----------------- 2 files changed, 9 insertions(+), 22 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 3b47d8a7ba..949915e70f 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -951,11 +951,10 @@ DatabaseContext::DatabaseContext(Reference( SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::TRACING))); registerSpecialKeySpaceModule( - SpecialKeySpace::MODULE::CONFIGURATION, SpecialKeySpace::IMPLTYPE::READWRITE, - std::make_unique( - KeyRangeRef(LiteralStringRef("coordinators/"), LiteralStringRef("coordinators0")) - .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin)) - ); + SpecialKeySpace::MODULE::CONFIGURATION, SpecialKeySpace::IMPLTYPE::READWRITE, + std::make_unique( + KeyRangeRef(LiteralStringRef("coordinators/"), LiteralStringRef("coordinators0")) + .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin))); } if (apiVersionAtLeast(630)) { registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, SpecialKeySpace::IMPLTYPE::READONLY, diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index 581acab861..569f85c5cf 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -661,13 +661,6 @@ void ManagementCommandsOptionsImpl::set(ReadYourWritesTransaction* ryw, const Ke TraceEvent(SevDebug, "ManagementApiOption").detail("Option", option).detail("Key", key); ryw->getSpecialKeySpaceWriteMap().insert(key, std::make_pair(true, Optional(value))); } - // a hack to make sure we execute CoordinatorsImpl::commit - // if (key == SpecialKeySpace::getManagementApiCommandOptionSpecialKey("coordinators", "auto")) { - // Key hack_key = - // LiteralStringRef("option/auto").withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")); - // // since clear is forbidden in coordinators special key range, this is okay - // ryw->getSpecialKeySpaceWriteMap().insert(hack_key, std::make_pair(true, Optional())); - // } } void ManagementCommandsOptionsImpl::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) { @@ -1399,11 +1392,12 @@ Future> CoordinatorsImpl::getRange(ReadYourWritesTran // include :tls in keys if the network addresss is TLS std::sort(coordinator_processes.begin(), coordinator_processes.end(), [](const NetworkAddress& lhs, const NetworkAddress& rhs) { - // return formatIpPort(lhs.ip, lhs.port) < formatIpPort(rhs.ip, rhs.port); - return lhs.toString() < rhs.toString(); + // return formatIpPort(lhs.ip, lhs.port) < formatIpPort(rhs.ip, rhs.port); + return lhs.toString() < rhs.toString(); }); for (auto& w : coordinator_processes) { - Key k(prefix.withSuffix(LiteralStringRef("process/")).withSuffix(w.toString()));//formatIpPort(w.ip, w.port))); + Key k(prefix.withSuffix(LiteralStringRef("process/")).withSuffix(w.toString())); // formatIpPort(w.ip, + // w.port))); if (kr.contains(k)) { result.push_back(result.arena(), KeyValueRef(k, ValueRef())); result.arena().dependsOn(k.arena()); @@ -1446,12 +1440,6 @@ ACTOR static Future> coordinatorsCommitActor(ReadYourWrite auto auto_option = ryw->getSpecialKeySpaceWriteMap()[SpecialKeySpace::getManagementApiCommandOptionSpecialKey( "coordinators", "auto")]; if (auto_option.first) { - // auto option is speicified, check the hack_key we use is set - // auto hack_entry = - // ryw->getSpecialKeySpaceWriteMap()[LiteralStringRef("option/auto") - // .withPrefix( - // SpecialKeySpace::getManagementApiCommandPrefix("coordinators"))]; - // ASSERT(hack_entry.first && !hack_entry.second.present()); change = autoQuorumChange(); } else { if (addressesVec.size()) @@ -1525,4 +1513,4 @@ void CoordinatorsImpl::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& void CoordinatorsImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) { return throwSpecialKeyApiFailure(ryw, "coordinators", "Clear operation is meaningless thus forbidden for coordinators"); -} \ No newline at end of file +} From d185ff37521b832e363512baac5910feb050aa5c Mon Sep 17 00:00:00 2001 From: Chaoguang Lin Date: Fri, 5 Feb 2021 00:55:34 -0800 Subject: [PATCH 05/13] update \xff\xff/configuration/coordinators/processes --- fdbclient/SpecialKeySpace.actor.cpp | 74 ++++++++++-------- .../SpecialKeySpaceCorrectness.actor.cpp | 77 +++++++++---------- 2 files changed, 82 insertions(+), 69 deletions(-) diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index 569f85c5cf..2c2038df73 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -18,6 +18,8 @@ * limitations under the License. */ +#include + #include "fdbclient/SpecialKeySpace.actor.h" #include "flow/UnitTest.h" #include "fdbclient/ManagementAPI.actor.h" @@ -1391,17 +1393,15 @@ Future> CoordinatorsImpl::getRange(ReadYourWritesTran // Note : the sort by string is anti intuition, ex. 1.1.1.1:11 < 1.1.1.1:5 // include :tls in keys if the network addresss is TLS std::sort(coordinator_processes.begin(), coordinator_processes.end(), - [](const NetworkAddress& lhs, const NetworkAddress& rhs) { - // return formatIpPort(lhs.ip, lhs.port) < formatIpPort(rhs.ip, rhs.port); - return lhs.toString() < rhs.toString(); - }); + [](const NetworkAddress& lhs, const NetworkAddress& rhs) { return lhs.toString() < rhs.toString(); }); + std::string processes_str; for (auto& w : coordinator_processes) { - Key k(prefix.withSuffix(LiteralStringRef("process/")).withSuffix(w.toString())); // formatIpPort(w.ip, - // w.port))); - if (kr.contains(k)) { - result.push_back(result.arena(), KeyValueRef(k, ValueRef())); - result.arena().dependsOn(k.arena()); - } + if (processes_str.size()) processes_str += ","; + processes_str += w.toString(); + } + Key processes_key = prefix.withSuffix(LiteralStringRef("processes")); + if (kr.contains(processes_key)) { + result.push_back_deep(result.arena(), KeyValueRef(processes_key, Value(processes_str))); } return rywGetRange(ryw, kr, result); } @@ -1409,31 +1409,45 @@ Future> CoordinatorsImpl::getRange(ReadYourWritesTran ACTOR static Future> coordinatorsCommitActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) { state Reference change; state std::vector addressesVec; + state std::vector process_address_strs; state Optional msg; state int retry = 0; + state int index; state int notEnoughMachineResults; + state bool parse_error = false; - KeyRange process_range = - KeyRangeRef(LiteralStringRef("process/"), LiteralStringRef("process0")).withPrefix(kr.begin); - auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(process_range); - - auto iter = ranges.begin(); - while (iter != ranges.end()) { - auto entry = iter->value(); - if (entry.first) { - ASSERT(entry.second.present()); // no clear should be seen here - Key address = iter->begin().removePrefix(process_range.begin); - auto a = NetworkAddress::parse(address.toString()); - if (!a.isValid()) { - std::string error = "ERROR: \'" + address.toString() + "\' is not a valid network endpoint address\n"; - if (address.toString().find(":tls") != std::string::npos) - error += " Do not include the `:tls' suffix when naming a process\n"; - msg = ManagementAPIError::toJsonString(false, "coordinators", error); - return msg; - } - addressesVec.push_back(a); + // check update for cluster_description + Key processes_key = LiteralStringRef("processes").withPrefix(kr.begin); + auto processes_entry = ryw->getSpecialKeySpaceWriteMap()[processes_key]; + if (processes_entry.first) { + ASSERT(processes_entry.second.present()); // no clear should be seen here + auto processesStr = processes_entry.second.get().toString(); + boost::split(process_address_strs, processesStr, [](char c) { return c == ','; }); + if (!process_address_strs.size()) { + return ManagementAPIError::toJsonString( + false, "coordinators", + "New coordinators\' processes are empty, please specify new processes\' network addresses with format " + "\"IP:PORT,IP:PORT,...,IP:PORT\""); + } + for (index = 0; index < process_address_strs.size(); index++) { + try { + auto a = NetworkAddress::parse(process_address_strs[index]); + if (!a.isValid()) + parse_error = true; + else + addressesVec.push_back(a); + } catch (Error& e) { + TraceEvent(SevDebug, "SpeicalKeysNetworkParseError").error(e); + parse_error = true; + } + + if (parse_error) { + std::string error = "ERROR: \'" + process_address_strs[index] + "\' is not a valid network endpoint address\n"; + if (process_address_strs[index].find(":tls") != std::string::npos) + error += " Do not include the `:tls' suffix when naming a process\n"; + return ManagementAPIError::toJsonString(false, "coordinators", error); + } } - ++iter; } // check auto option diff --git a/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp b/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp index 3ef4f2892a..af78842214 100644 --- a/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp +++ b/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp @@ -18,6 +18,8 @@ * limitations under the License. */ +#include + #include "fdbclient/ManagementAPI.actor.h" #include "fdbclient/NativeAPI.actor.h" #include "fdbclient/ReadYourWrites.h" @@ -884,21 +886,18 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { Optional res = wait(tx->get(coordinatorsKey)); ASSERT(res.present()); // Otherwise, database is in a bad state state ClusterConnectionString cs(res.get().toString()); - state KeyRange coordinator_process_key_range = - KeyRangeRef(LiteralStringRef("process/"), LiteralStringRef("process0")) - .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")); - Standalone coordinator_process_kvs = - wait(tx->getRange(coordinator_process_key_range, CLIENT_KNOBS->TOO_MANY)); - ASSERT(!coordinator_process_kvs.more); - ASSERT(self->getRangeResultInOrder(coordinator_process_kvs)); - ASSERT(coordinator_process_kvs.size() == cs.coordinators().size()); + Optional coordinator_processes_key = + wait(tx->get(LiteralStringRef("processes") + .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")))); + ASSERT(coordinator_processes_key.present()); + std::vector process_addresses; + boost::split(process_addresses, coordinator_processes_key.get().toString(), + [](char c) { return c == ','; }); + ASSERT(process_addresses.size() == cs.coordinators().size()); // compare the coordinator process network addresses one by one for (const auto& network_address : cs.coordinators()) { - Key addr = Key(network_address.toString())//formatIpPort(network_address.ip, network_address.port)) - .withPrefix(coordinator_process_key_range.begin); - KeyValueRef kv(addr, ValueRef()); - ASSERT(std::find(coordinator_process_kvs.begin(), coordinator_process_kvs.end(), kv) != - coordinator_process_kvs.end()); + ASSERT(std::find(process_addresses.begin(), process_addresses.end(), network_address.toString()) != + process_addresses.end()); } tx->reset(); break; @@ -910,8 +909,8 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { // we randomly pick one process(not coordinator) and add it, in this case, it should always succeed { state std::string new_cluster_description = deterministicRandom()->randomAlphaNumeric(8); - state Key new_coordinator_process; - state Standalone old_coordinators_kvs; + state std::string new_coordinator_process; + state std::vector old_coordinators_processes; state bool possible_to_add_coordinator; state KeyRange coordinators_key_range = KeyRangeRef(LiteralStringRef("process/"), LiteralStringRef("process0")) @@ -919,22 +918,24 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { loop { try { // get current coordinators - Standalone _coordinators_kvs = - wait(tx->getRange(coordinators_key_range, CLIENT_KNOBS->TOO_MANY)); - old_coordinators_kvs = _coordinators_kvs; + Optional processes_key = + wait(tx->get(LiteralStringRef("processes") + .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")))); + ASSERT(processes_key.present()); + boost::split(old_coordinators_processes, processes_key.get().toString(), + [](char c) { return c == ','; }); // pick up one non-coordinator process if possible vector workers = wait(getWorkers(&tx->getTransaction())); TraceEvent(SevDebug, "CoordinatorsManualChange") - .detail("OldCoordinators", old_coordinators_kvs.size()) + .detail("OldCoordinators", describe(old_coordinators_processes)) .detail("WorkerSize", workers.size()); - if (workers.size() > old_coordinators_kvs.size()) { + if (workers.size() > old_coordinators_processes.size()) { loop { auto worker = deterministicRandom()->randomChoice(workers); - new_coordinator_process = Key(worker.address.toString())//formatIpPort(worker.address.ip, worker.address.port)) - .withPrefix(coordinators_key_range.begin); - KeyValueRef kv(new_coordinator_process, ValueRef()); - if (std::find(old_coordinators_kvs.begin(), old_coordinators_kvs.end(), kv) == - old_coordinators_kvs.end()) { + new_coordinator_process = + worker.address.toString(); + if (std::find(old_coordinators_processes.begin(), old_coordinators_processes.end(), + worker.address.toString()) == old_coordinators_processes.end()) { break; } } @@ -949,19 +950,19 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { } } TraceEvent(SevDebug, "CoordinatorsManualChange") - .detail("NewCoordinator", possible_to_add_coordinator ? new_coordinator_process.toString() : "") + .detail("NewCoordinator", possible_to_add_coordinator ? new_coordinator_process : "") .detail("NewClusterDescription", new_cluster_description); if (possible_to_add_coordinator) { loop { try { + std::string new_processes_key(new_coordinator_process); tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); - for (const auto& kv : old_coordinators_kvs) { - // TraceEvent(SevDebug, "CoordinatorsManualChange").detail("AddressKey", kv.key.toString()); - tx->set(kv.key, kv.value); + for (const auto& address : old_coordinators_processes) { + new_processes_key += "," + address; } - // TraceEvent(SevDebug, "CoordinatorsManualChange") - // .detail("AddressKey", new_coordinator_process.toString()); - tx->set(new_coordinator_process, ValueRef()); + tx->set(LiteralStringRef("processes") + .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")), + Value(new_processes_key)); // update cluster description tx->set(LiteralStringRef("cluster_description") .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")), @@ -999,15 +1000,13 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { Optional res = wait(tx->get(coordinatorsKey)); ASSERT(res.present()); // Otherwise, database is in a bad state ClusterConnectionString cs(res.get().toString()); - ASSERT(cs.coordinators().size() == old_coordinators_kvs.size() + 1); + ASSERT(cs.coordinators().size() == old_coordinators_processes.size() + 1); // verify the coordinators' addresses for (const auto& network_address : cs.coordinators()) { - Key addr = Key(network_address.toString())//formatIpPort(network_address.ip, network_address.port)) - .withPrefix(coordinators_key_range.begin); - KeyValueRef kv(addr, ValueRef()); - ASSERT(std::find(old_coordinators_kvs.begin(), old_coordinators_kvs.end(), kv) != - old_coordinators_kvs.end() || - new_coordinator_process == addr); + std::string address_str = network_address.toString(); + ASSERT(std::find(old_coordinators_processes.begin(), old_coordinators_processes.end(), + address_str) != old_coordinators_processes.end() || + new_coordinator_process == address_str); } // verify the cluster decription TraceEvent(SevDebug, "CoordinatorsManualChange") From b10959be0febca3eddc7822ab14795c56d004c10 Mon Sep 17 00:00:00 2001 From: Chaoguang Lin Date: Mon, 8 Feb 2021 18:52:36 -0800 Subject: [PATCH 06/13] format code, add trace --- fdbclient/SpecialKeySpace.actor.cpp | 3 ++- .../workloads/SpecialKeySpaceCorrectness.actor.cpp | 10 +++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index 2c2038df73..9d544d37a6 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -1442,7 +1442,8 @@ ACTOR static Future> coordinatorsCommitActor(ReadYourWrite } if (parse_error) { - std::string error = "ERROR: \'" + process_address_strs[index] + "\' is not a valid network endpoint address\n"; + std::string error = + "ERROR: \'" + process_address_strs[index] + "\' is not a valid network endpoint address\n"; if (process_address_strs[index].find(":tls") != std::string::npos) error += " Do not include the `:tls' suffix when naming a process\n"; return ManagementAPIError::toJsonString(false, "coordinators", error); diff --git a/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp b/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp index af78842214..16c71357c3 100644 --- a/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp +++ b/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp @@ -932,8 +932,7 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { if (workers.size() > old_coordinators_processes.size()) { loop { auto worker = deterministicRandom()->randomChoice(workers); - new_coordinator_process = - worker.address.toString(); + new_coordinator_process = worker.address.toString(); if (std::find(old_coordinators_processes.begin(), old_coordinators_processes.end(), worker.address.toString()) == old_coordinators_processes.end()) { break; @@ -947,6 +946,7 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { break; } catch (Error& e) { wait(tx->onError(e)); + wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); } } TraceEvent(SevDebug, "CoordinatorsManualChange") @@ -991,6 +991,7 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { break; } else { wait(tx->onError(e)); + wait(delay(1.0)); } } } @@ -1015,6 +1016,7 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { tx->reset(); } catch (Error& e) { wait(tx->onError(e)); + wait(delay(1.0)); } } } @@ -1036,13 +1038,15 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj(); auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj(); // special_key_space_management_api_error_msg schema validation + TraceEvent(SevDebug, "CoordinatorsAutoChange").detail("SKSErrorMessage", valueObj["message"].get_str()); ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true)); ASSERT(valueObj["command"].get_str() == "coordinators" && !valueObj["retriable"].get_bool()); - ASSERT(valueObj["message"].get_str() == "No change (existing configuration satisfies request)"); + // ASSERT(valueObj["message"].get_str() == "No change (existing configuration satisfies request)"); tx->reset(); break; } else { wait(tx->onError(e)); + wait(delay(1.0)); } } } From 731ee8a1217a92be3fd22e587523117c04c25096 Mon Sep 17 00:00:00 2001 From: Chaoguang Lin Date: Tue, 16 Feb 2021 13:01:37 -0800 Subject: [PATCH 07/13] Update ChangeConfig to test coordinators special keys --- fdbclient/ManagementAPI.actor.cpp | 20 +-- fdbclient/ManagementAPI.actor.h | 3 +- fdbclient/SpecialKeySpace.actor.cpp | 9 +- fdbserver/workloads/ChangeConfig.actor.cpp | 98 +++++++++++++- .../SpecialKeySpaceCorrectness.actor.cpp | 120 +++++++++++------- 5 files changed, 177 insertions(+), 73 deletions(-) diff --git a/fdbclient/ManagementAPI.actor.cpp b/fdbclient/ManagementAPI.actor.cpp index 542ef497b1..3e30d87d89 100644 --- a/fdbclient/ManagementAPI.actor.cpp +++ b/fdbclient/ManagementAPI.actor.cpp @@ -1009,7 +1009,8 @@ ACTOR Future> getCoordinators( Database cx ) { } } -ACTOR Future> changeQuorumChecker(Transaction* tr, Reference change, std::vector* desiredCoordinators, int* retries, int* notEnoughMachineResults) { +ACTOR Future> changeQuorumChecker(Transaction* tr, Reference change, + std::vector* desiredCoordinators) { tr->setOption( FDBTransactionOptions::LOCK_AWARE ); tr->setOption( FDBTransactionOptions::USE_PROVISIONAL_PROXIES ); tr->setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE ); @@ -1028,24 +1029,19 @@ ACTOR Future> changeQuorumChecker(Transaction* tr, *desiredCoordinators = _desiredCoordinators; } - if(result == CoordinatorsResult::NOT_ENOUGH_MACHINES && *notEnoughMachineResults < 1) { - //we could get not_enough_machines if we happen to see the database while the cluster controller is updating the worker list, so make sure it happens twice before returning a failure - (*notEnoughMachineResults)++; - wait( delay(1.0) ); - tr->reset(); - return Optional(); - } if (result != CoordinatorsResult::SUCCESS) return result; + if (!desiredCoordinators->size()) return CoordinatorsResult::INVALID_NETWORK_ADDRESSES; + std::sort(desiredCoordinators->begin(), desiredCoordinators->end()); std::string newName = change->getDesiredClusterKeyName(); if (newName.empty()) newName = old.clusterKeyName().toString(); if ( old.coordinators() == *desiredCoordinators && old.clusterKeyName() == newName) - return *retries ? CoordinatorsResult::SUCCESS : CoordinatorsResult::SAME_NETWORK_ADDRESSES; + return CoordinatorsResult::SAME_NETWORK_ADDRESSES; state ClusterConnectionString conn( *desiredCoordinators, StringRef( newName + ':' + deterministicRandom()->randomAlphaNumeric( 32 ) ) ); @@ -1061,12 +1057,6 @@ ACTOR Future> changeQuorumChecker(Transaction* tr, } } - TraceEvent("AttemptingQuorumChange") - .detail("FromCS", old.toString()) - .detail("ToCS", conn.toString()) - .detail("OldClusterDescription", old.clusterKeyName()) - .detail("NewClusterDescription", conn.clusterKeyName()); - vector>> leaderServers; ClientCoordinators coord( Reference( new ClusterConnectionFile( conn ) ) ); for( int i = 0; i < coord.clientLeaderServers.size(); i++ ) diff --git a/fdbclient/ManagementAPI.actor.h b/fdbclient/ManagementAPI.actor.h index e013570157..bb3c86ab72 100644 --- a/fdbclient/ManagementAPI.actor.h +++ b/fdbclient/ManagementAPI.actor.h @@ -144,7 +144,8 @@ struct IQuorumChange : ReferenceCounted { }; // Change to use the given set of coordination servers -ACTOR Future> changeQuorumChecker(Transaction* tr, Reference change, std::vector* desiredCoordinators, int* retries, int* notEnoughMachineResults); +ACTOR Future> changeQuorumChecker(Transaction* tr, Reference change, + std::vector* desiredCoordinators); ACTOR Future changeQuorum(Database cx, Reference change); Reference autoQuorumChange(int desired = -1); Reference noQuorumChange(); diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index 9d544d37a6..271c8b81e4 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -1411,9 +1411,7 @@ ACTOR static Future> coordinatorsCommitActor(ReadYourWrite state std::vector addressesVec; state std::vector process_address_strs; state Optional msg; - state int retry = 0; state int index; - state int notEnoughMachineResults; state bool parse_error = false; // check update for cluster_description @@ -1485,14 +1483,14 @@ ACTOR static Future> coordinatorsCommitActor(ReadYourWrite .detail("Auto", auto_option.first) .detail("Description", entry.first ? entry.second.get().toString() : ""); - Optional r = - wait(changeQuorumChecker(&ryw->getTransaction(), change, &addressesVec, &retry, ¬EnoughMachineResults)); + Optional r = wait(changeQuorumChecker(&ryw->getTransaction(), change, &addressesVec)); TraceEvent(SevDebug, "SKSChangeCoordinatorsFinish") .detail("Result", r.present() ? int(r.get()) : -1); // -1 means success if (r.present()) { auto res = r.get(); std::string error_msg; + bool retriable = false; if (res == CoordinatorsResult::INVALID_NETWORK_ADDRESSES) { error_msg = "The specified network addresses are invalid"; } else if (res == CoordinatorsResult::SAME_NETWORK_ADDRESSES) { @@ -1505,6 +1503,7 @@ ACTOR static Future> coordinatorsCommitActor(ReadYourWrite error_msg = "The database is in an unexpected state from which changing coordinators might be unsafe"; } else if (res == CoordinatorsResult::COORDINATOR_UNREACHABLE) { error_msg = "One of the specified coordinators is unreachable"; + retriable = true; } else if (res == CoordinatorsResult::NOT_ENOUGH_MACHINES) { error_msg = "Too few fdbserver machines to provide coordination at the current redundancy level"; } else if (res == CoordinatorsResult::SUCCESS) { @@ -1512,7 +1511,7 @@ ACTOR static Future> coordinatorsCommitActor(ReadYourWrite } else { ASSERT(false); } - msg = ManagementAPIError::toJsonString(false, "coordinators", error_msg); + msg = ManagementAPIError::toJsonString(retriable, "coordinators", error_msg); } return msg; } diff --git a/fdbserver/workloads/ChangeConfig.actor.cpp b/fdbserver/workloads/ChangeConfig.actor.cpp index 7139c38a51..72f5809793 100644 --- a/fdbserver/workloads/ChangeConfig.actor.cpp +++ b/fdbserver/workloads/ChangeConfig.actor.cpp @@ -24,6 +24,7 @@ #include "fdbclient/ManagementAPI.actor.h" #include "fdbserver/workloads/workloads.actor.h" #include "fdbrpc/simulator.h" +#include "fdbclient/Schemas.h" #include "flow/actorcompiler.h" // This must be the last #include. struct ChangeConfigWorkload : TestWorkload { @@ -65,9 +66,9 @@ struct ChangeConfigWorkload : TestWorkload { TraceEvent("WaitForReplicasExtraEnd"); } if (self->networkAddresses.size()) { if (self->networkAddresses == "auto") - wait(success(changeQuorum(extraDB, autoQuorumChange()))); + wait(CoordinatorsChangeActor(extraDB, self, true)); else - wait(success(changeQuorum(extraDB, specifiedQuorumChange(NetworkAddress::parseList(self->networkAddresses))))); + wait(CoordinatorsChangeActor(extraDB, self)); } wait(delay(5*deterministicRandom()->random01())); } @@ -91,9 +92,9 @@ struct ChangeConfigWorkload : TestWorkload { } if( self->networkAddresses.size() ) { if (self->networkAddresses == "auto") - wait(success( changeQuorum( cx, autoQuorumChange() ) )); + wait(CoordinatorsChangeActor(cx, self, true)); else - wait(success( changeQuorum( cx, specifiedQuorumChange(NetworkAddress::parseList( self->networkAddresses )) ) )); + wait(CoordinatorsChangeActor(cx, self)); } if(!extraConfigureBefore) { @@ -102,6 +103,95 @@ struct ChangeConfigWorkload : TestWorkload { return Void(); } + + ACTOR static Future CoordinatorsChangeActor(Database cx, ChangeConfigWorkload* self, + bool autoChange = false) { + state ReadYourWritesTransaction tr(cx); + state int notEnoughMachineResults = 0; // Retry for the second time if we first get this result + // state std::vector desiredCoordinators; // the desired coordinators' network addresses + state std::string desiredCoordinatorsKey; // comma separated + if (autoChange) { // if auto, we first get the desired addresses, which is not changed in the following retries + loop { + try { + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + tr.setOption(FDBTransactionOptions::USE_PROVISIONAL_PROXIES); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + Optional currentKey = wait(tr.get(coordinatorsKey)); + + if (!currentKey.present()) return Void(); // Someone deleted this key entirely? + + ClusterConnectionString old(currentKey.get().toString()); + if (cx->getConnectionFile() && old.clusterKeyName().toString() != + cx->getConnectionFile()->getConnectionString().clusterKeyName()) + return Void(); // Someone changed the "name" of the database?? + + state CoordinatorsResult result = CoordinatorsResult::SUCCESS; + if (!desiredCoordinatorsKey.size()) { + std::vector _desiredCoordinators = + wait(autoQuorumChange()->getDesiredCoordinators( + &tr.getTransaction(), old.coordinators(), + Reference(new ClusterConnectionFile(old)), result)); + for (const auto& address : _desiredCoordinators) { + desiredCoordinatorsKey += desiredCoordinatorsKey.size() ? "," : ""; + desiredCoordinatorsKey += address.toString(); + } + } + + if (result == CoordinatorsResult::NOT_ENOUGH_MACHINES && notEnoughMachineResults < 1) { + // we could get not_enough_machines if we happen to see the database while the cluster + // controller is updating the worker list, so make sure it happens twice before returning a + // failure + notEnoughMachineResults++; + wait(delay(1.0)); + tr.reset(); + continue; + } + if (result != CoordinatorsResult::SUCCESS) return Void(); + tr.reset(); + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + } else { + desiredCoordinatorsKey = self->networkAddresses; + } + loop { + try { + tr.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + tr.set(LiteralStringRef("processes") + .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")), + Value(desiredCoordinatorsKey)); + TraceEvent(SevDebug, "CoordinatorsChangeBeforeCommit") + .detail("Auto", autoChange) + .detail("NewCoordinatorsKey", describe(desiredCoordinatorsKey)); + wait(tr.commit()); + ASSERT(false); + } catch (Error& e) { + state Error err(e); + if (e.code() == error_code_special_keys_api_failure) { + Optional errorMsg = + wait(tr.get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin)); + ASSERT(errorMsg.present()); + std::string errorStr; + auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj(); + auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj(); + // special_key_space_management_api_error_msg schema validation + TraceEvent(SevDebug, "CoordinatorsChangeError") + .detail("Auto", autoChange) + .detail("ErrorMessage", valueObj["message"].get_str()); + ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true)); + ASSERT(valueObj["command"].get_str() == "coordinators"); + break; + } else { + wait(tr.onError(err)); + } + wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); + } + } + return Void(); + } }; WorkloadFactory ChangeConfigWorkloadFactory("ChangeConfig"); diff --git a/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp b/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp index 16c71357c3..a4669d309f 100644 --- a/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp +++ b/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp @@ -27,6 +27,8 @@ #include "fdbclient/SpecialKeySpace.actor.h" #include "fdbserver/TesterInterface.actor.h" #include "fdbserver/workloads/workloads.actor.h" +#include "flow/Knobs.h" +#include "flow/Trace.h" #include "flow/actorcompiler.h" struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { @@ -440,11 +442,12 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { wait(tx->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces/"), LiteralStringRef("\xff\xff/worker_interfaces0")), CLIENT_KNOBS->TOO_MANY)); - // We should have at least 1 process in the cluster - ASSERT(result.size()); - state KeyValueRef entry = deterministicRandom()->randomChoice(result); - Optional singleRes = wait(tx->get(entry.key)); - ASSERT(singleRes.present() && singleRes.get() == entry.value); + // Note: there's possibility we get zero workers + if (result.size()) { + state KeyValueRef entry = deterministicRandom()->randomChoice(result); + Optional singleRes = wait(tx->get(entry.key)); + ASSERT(singleRes.present() && singleRes.get() == entry.value); + } tx->reset(); } catch (Error& e) { wait(tx->onError(e)); @@ -763,7 +766,11 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { Key("process/class_source/" + address) .withPrefix( SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin))); - ASSERT(class_source.present() && class_source.get() == LiteralStringRef("set_class")); + TraceEvent(SevDebug, "SetClassSourceDebug") + .detail("Present", class_source.present()) + .detail("ClassSource", class_source.present() ? class_source.get().toString() : "__Nothing"); + // Very rarely, we get an empty worker list, thus no class_source data + if (class_source.present()) ASSERT(class_source.get() == LiteralStringRef("set_class")); tx->reset(); } else { // If no worker process returned, skip the test @@ -968,11 +975,10 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")), Value(new_cluster_description)); wait(tx->commit()); - tx->reset(); - break; + ASSERT(false); } catch (Error& e) { TraceEvent(SevDebug, "CoordinatorsManualChange").error(e); - // if we repeat doing the change, we will get this error: + // if we repeat doing the change, we will get the error: // CoordinatorsResult::SAME_NETWORK_ADDRESSES if (e.code() == error_code_special_keys_api_failure) { Optional errorMsg = @@ -983,16 +989,21 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj(); // special_key_space_management_api_error_msg schema validation ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true)); - ASSERT(valueObj["command"].get_str() == "coordinators" && - !valueObj["retriable"].get_bool()); - ASSERT(valueObj["message"].get_str() == - "No change (existing configuration satisfies request)"); - tx->reset(); - break; + TraceEvent(SevDebug, "CoordinatorsManualChange") + .detail("ErrorMessage", valueObj["message"].get_str()); + ASSERT(valueObj["command"].get_str() == "coordinators"); + if (valueObj["retriable"].get_bool()) { // coordinators not reachable, retry + tx->reset(); + } else { + ASSERT(valueObj["message"].get_str() == + "No change (existing configuration satisfies request)"); + tx->reset(); + break; + } } else { wait(tx->onError(e)); - wait(delay(1.0)); } + wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); } } // change successful, now check it is already changed @@ -1010,43 +1021,56 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { new_coordinator_process == address_str); } // verify the cluster decription - TraceEvent(SevDebug, "CoordinatorsManualChange") - .detail("NewClsuterDescription", cs.clusterKeyName()); ASSERT(new_cluster_description == cs.clusterKeyName().toString()); tx->reset(); } catch (Error& e) { wait(tx->onError(e)); - wait(delay(1.0)); + wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); } - } - } - // test coordinators' "auto" option - loop { - try { - tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); - tx->set(SpecialKeySpace::getManagementApiCommandOptionSpecialKey("coordinators", "auto"), ValueRef()); - wait(tx->commit()); // if an "auto" change happened, the commit may or may not succeed - tx->reset(); - } catch (Error& e) { - TraceEvent(SevDebug, "CoordinatorsAutoChange").error(e); - // if we repeat doing "auto" change, we will get this error: CoordinatorsResult::SAME_NETWORK_ADDRESSES - if (e.code() == error_code_special_keys_api_failure) { - Optional errorMsg = - wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin)); - ASSERT(errorMsg.present()); - std::string errorStr; - auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj(); - auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj(); - // special_key_space_management_api_error_msg schema validation - TraceEvent(SevDebug, "CoordinatorsAutoChange").detail("SKSErrorMessage", valueObj["message"].get_str()); - ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true)); - ASSERT(valueObj["command"].get_str() == "coordinators" && !valueObj["retriable"].get_bool()); - // ASSERT(valueObj["message"].get_str() == "No change (existing configuration satisfies request)"); - tx->reset(); - break; - } else { - wait(tx->onError(e)); - wait(delay(1.0)); + // change back to original settings + loop { + try { + std::string new_processes_key; + tx->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + for (const auto& address : old_coordinators_processes) { + new_processes_key += new_processes_key.size() ? "," : ""; + new_processes_key += address; + } + tx->set(LiteralStringRef("processes") + .withPrefix(SpecialKeySpace::getManagementApiCommandPrefix("coordinators")), + Value(new_processes_key)); + wait(tx->commit()); + ASSERT(false); + } catch (Error& e) { + TraceEvent(SevDebug, "CoordinatorsManualChangeRevert").error(e); + if (e.code() == error_code_special_keys_api_failure) { + Optional errorMsg = + wait(tx->get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin)); + ASSERT(errorMsg.present()); + std::string errorStr; + auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj(); + auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj(); + // special_key_space_management_api_error_msg schema validation + ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true)); + TraceEvent(SevDebug, "CoordinatorsManualChangeRevert") + .detail("ErrorMessage", valueObj["message"].get_str()); + ASSERT(valueObj["command"].get_str() == "coordinators"); + if (valueObj["retriable"].get_bool()) { + tx->reset(); + } else if (valueObj["message"].get_str() == + "No change (existing configuration satisfies request)") { + tx->reset(); + break; + } else { + TraceEvent(SevError, "CoordinatorsManualChangeRevert") + .detail("UnexpectedError", valueObj["message"].get_str()); + throw special_keys_api_failure(); + } + } else { + wait(tx->onError(e)); + } + wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); + } } } } From 9dd94499f24bb87c994c052097c6b3d44bc8acb6 Mon Sep 17 00:00:00 2001 From: Chaoguang Lin Date: Tue, 16 Feb 2021 13:06:25 -0800 Subject: [PATCH 08/13] Update traces --- fdbclient/SpecialKeySpace.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index 271c8b81e4..cde1e6cdc8 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -1486,7 +1486,7 @@ ACTOR static Future> coordinatorsCommitActor(ReadYourWrite Optional r = wait(changeQuorumChecker(&ryw->getTransaction(), change, &addressesVec)); TraceEvent(SevDebug, "SKSChangeCoordinatorsFinish") - .detail("Result", r.present() ? int(r.get()) : -1); // -1 means success + .detail("Result", r.present() ? static_cast(r.get()) : -1); // -1 means success if (r.present()) { auto res = r.get(); std::string error_msg; From fca7f753f81e51695aba059b4c345cd097b6510a Mon Sep 17 00:00:00 2001 From: Chaoguang Lin Date: Tue, 16 Feb 2021 13:46:45 -0800 Subject: [PATCH 09/13] Remove special key for coordinators' 'auto' option --- fdbclient/SpecialKeySpace.actor.cpp | 32 ++++++----------------------- 1 file changed, 6 insertions(+), 26 deletions(-) diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index cde1e6cdc8..1fb31aa7d8 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -71,7 +71,7 @@ std::unordered_map SpecialKeySpace::managementApiCommandT .withPrefix(moduleToBoundary[MODULE::CONFIGURATION].begin) } }; -std::set SpecialKeySpace::options = { "excluded/force", "failed/force", "coordinators/auto" }; +std::set SpecialKeySpace::options = { "excluded/force", "failed/force" }; std::set SpecialKeySpace::tracingOptions = { kTracingTransactionIdKey, kTracingTokenKey }; @@ -493,24 +493,11 @@ KeyRange SpecialKeySpace::decode(const KeyRangeRef& kr) { return KeyRangeRef(begin->value()->decode(kr.begin), begin->value()->decode(kr.end)); } -// Sometimes, we need to force impl::commit to execute without write to the underlying key range -void precommitUpdate(SpecialKeySpace* sks, ReadYourWritesTransaction* ryw, - std::set& writeModulePtrs) { - // if "coordinators/auto" is set, make sure we execute CoordinatorsImpl::commit - Key option_key = SpecialKeySpace::getManagementApiCommandOptionSpecialKey("coordinators", "auto"); - auto entry = ryw->getSpecialKeySpaceWriteMap()[option_key]; - if (entry.first) { - writeModulePtrs.insert( - sks->getRWImpls().rangeContaining(SpecialKeySpace::getManagementApiCommandPrefix("coordinators"))->value()); - } -} - ACTOR Future commitActor(SpecialKeySpace* sks, ReadYourWritesTransaction* ryw) { state RangeMap>, KeyRangeRef>::Ranges ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(specialKeys); state RangeMap>, KeyRangeRef>::iterator iter = ranges.begin(); state std::set writeModulePtrs; - precommitUpdate(sks, ryw, writeModulePtrs); while (iter != ranges.end()) { std::pair> entry = iter->value(); if (entry.first) { @@ -1449,17 +1436,11 @@ ACTOR static Future> coordinatorsCommitActor(ReadYourWrite } } - // check auto option - auto auto_option = ryw->getSpecialKeySpaceWriteMap()[SpecialKeySpace::getManagementApiCommandOptionSpecialKey( - "coordinators", "auto")]; - if (auto_option.first) { - change = autoQuorumChange(); - } else { - if (addressesVec.size()) - change = specifiedQuorumChange(addressesVec); - else - change = noQuorumChange(); - } + + if (addressesVec.size()) + change = specifiedQuorumChange(addressesVec); + else + change = noQuorumChange(); // check update for cluster_description Key cluster_decription_key = LiteralStringRef("cluster_description").withPrefix(kr.begin); @@ -1480,7 +1461,6 @@ ACTOR static Future> coordinatorsCommitActor(ReadYourWrite TraceEvent(SevDebug, "SKSChangeCoordinatorsStart") .detail("NewAddresses", describe(addressesVec)) - .detail("Auto", auto_option.first) .detail("Description", entry.first ? entry.second.get().toString() : ""); Optional r = wait(changeQuorumChecker(&ryw->getTransaction(), change, &addressesVec)); From e06f4b4c297606c4856ad2094a5e916db739011f Mon Sep 17 00:00:00 2001 From: Chaoguang Lin Date: Tue, 16 Feb 2021 22:34:23 -0800 Subject: [PATCH 10/13] Update test when we get an empty worker list --- fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp b/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp index a4669d309f..317bcac5ca 100644 --- a/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp +++ b/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp @@ -446,7 +446,7 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { if (result.size()) { state KeyValueRef entry = deterministicRandom()->randomChoice(result); Optional singleRes = wait(tx->get(entry.key)); - ASSERT(singleRes.present() && singleRes.get() == entry.value); + if (singleRes.present()) ASSERT(singleRes.get() == entry.value); } tx->reset(); } catch (Error& e) { From f3a849de107fa295be0842deb55bfe1aed151a85 Mon Sep 17 00:00:00 2001 From: Chaoguang Lin Date: Tue, 16 Feb 2021 23:55:58 -0800 Subject: [PATCH 11/13] Update ChangeConfig test for auto coordinators change --- fdbclient/NativeAPI.actor.cpp | 5 ++ fdbclient/SpecialKeySpace.actor.cpp | 48 +++++++++++++++- fdbclient/SpecialKeySpace.actor.h | 6 ++ fdbserver/workloads/ChangeConfig.actor.cpp | 67 ++++++++++------------ 4 files changed, 87 insertions(+), 39 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 949915e70f..4fda82740d 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -955,6 +955,11 @@ DatabaseContext::DatabaseContext(Reference( KeyRangeRef(LiteralStringRef("coordinators/"), LiteralStringRef("coordinators0")) .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin))); + registerSpecialKeySpaceModule( + SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READONLY, + std::make_unique( + singleKeyRange(LiteralStringRef("auto_coordinators")) + .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin))); } if (apiVersionAtLeast(630)) { registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, SpecialKeySpace::IMPLTYPE::READONLY, diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index 1fb31aa7d8..1c53a4b0b7 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -1436,7 +1436,6 @@ ACTOR static Future> coordinatorsCommitActor(ReadYourWrite } } - if (addressesVec.size()) change = specifiedQuorumChange(addressesVec); else @@ -1508,3 +1507,50 @@ void CoordinatorsImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) return throwSpecialKeyApiFailure(ryw, "coordinators", "Clear operation is meaningless thus forbidden for coordinators"); } + +CoordinatorsAutoImpl::CoordinatorsAutoImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {} + +ACTOR static Future> CoordinatorsAutoImplActor(ReadYourWritesTransaction* ryw, + KeyRangeRef kr) { + state Standalone res; + state std::string autoCoordinatorsKey; + state Transaction& tr = ryw->getTransaction(); + + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + tr.setOption(FDBTransactionOptions::USE_PROVISIONAL_PROXIES); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + Optional currentKey = wait(tr.get(coordinatorsKey)); + + if (!currentKey.present()) { + ryw->setSpecialKeySpaceErrorMsg( + ManagementAPIError::toJsonString(false, "auto_coordinators", "The coordinator key does not exist")); + throw special_keys_api_failure(); + } + state ClusterConnectionString old(currentKey.get().toString()); + state CoordinatorsResult result = CoordinatorsResult::SUCCESS; + + std::vector _desiredCoordinators = wait(autoQuorumChange()->getDesiredCoordinators( + &tr, old.coordinators(), Reference(new ClusterConnectionFile(old)), result)); + + if (result == CoordinatorsResult::NOT_ENOUGH_MACHINES) { + // we could get not_enough_machines if we happen to see the database while the cluster controller is updating + // the worker list, so make sure it happens twice before returning a failure + ryw->setSpecialKeySpaceErrorMsg(ManagementAPIError::toJsonString( + true, "auto_coordinators", "The auto change attempt did not get enough machines, please try again")); + throw special_keys_api_failure(); + } + + for (const auto& address : _desiredCoordinators) { + autoCoordinatorsKey += autoCoordinatorsKey.size() ? "," : ""; + autoCoordinatorsKey += address.toString(); + } + res.push_back_deep(res.arena(), KeyValueRef(kr.begin, Value(autoCoordinatorsKey))); + return res; +} + +Future> CoordinatorsAutoImpl::getRange(ReadYourWritesTransaction* ryw, + KeyRangeRef kr) const { + // single key range, the queried range should always be the same as the underlying range + ASSERT(kr == getKeyRange()); + return CoordinatorsAutoImplActor(ryw, kr); +} diff --git a/fdbclient/SpecialKeySpace.actor.h b/fdbclient/SpecialKeySpace.actor.h index 3cbd08ef85..793a070e1e 100644 --- a/fdbclient/SpecialKeySpace.actor.h +++ b/fdbclient/SpecialKeySpace.actor.h @@ -341,5 +341,11 @@ public: void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override; }; +class CoordinatorsAutoImpl : public SpecialKeyRangeReadImpl { +public: + explicit CoordinatorsAutoImpl(KeyRangeRef kr); + Future> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override; +}; + #include "flow/unactorcompiler.h" #endif diff --git a/fdbserver/workloads/ChangeConfig.actor.cpp b/fdbserver/workloads/ChangeConfig.actor.cpp index 72f5809793..74a3e43979 100644 --- a/fdbserver/workloads/ChangeConfig.actor.cpp +++ b/fdbserver/workloads/ChangeConfig.actor.cpp @@ -108,50 +108,41 @@ struct ChangeConfigWorkload : TestWorkload { bool autoChange = false) { state ReadYourWritesTransaction tr(cx); state int notEnoughMachineResults = 0; // Retry for the second time if we first get this result - // state std::vector desiredCoordinators; // the desired coordinators' network addresses state std::string desiredCoordinatorsKey; // comma separated - if (autoChange) { // if auto, we first get the desired addresses, which is not changed in the following retries + if (autoChange) { // if auto, we first get the desired addresses by read \xff\xff/management/auto_coordinators loop { try { - tr.setOption(FDBTransactionOptions::LOCK_AWARE); - tr.setOption(FDBTransactionOptions::USE_PROVISIONAL_PROXIES); - tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); - Optional currentKey = wait(tr.get(coordinatorsKey)); - - if (!currentKey.present()) return Void(); // Someone deleted this key entirely? - - ClusterConnectionString old(currentKey.get().toString()); - if (cx->getConnectionFile() && old.clusterKeyName().toString() != - cx->getConnectionFile()->getConnectionString().clusterKeyName()) - return Void(); // Someone changed the "name" of the database?? - - state CoordinatorsResult result = CoordinatorsResult::SUCCESS; - if (!desiredCoordinatorsKey.size()) { - std::vector _desiredCoordinators = - wait(autoQuorumChange()->getDesiredCoordinators( - &tr.getTransaction(), old.coordinators(), - Reference(new ClusterConnectionFile(old)), result)); - for (const auto& address : _desiredCoordinators) { - desiredCoordinatorsKey += desiredCoordinatorsKey.size() ? "," : ""; - desiredCoordinatorsKey += address.toString(); - } - } - - if (result == CoordinatorsResult::NOT_ENOUGH_MACHINES && notEnoughMachineResults < 1) { - // we could get not_enough_machines if we happen to see the database while the cluster - // controller is updating the worker list, so make sure it happens twice before returning a - // failure - notEnoughMachineResults++; - wait(delay(1.0)); - tr.reset(); - continue; - } - if (result != CoordinatorsResult::SUCCESS) return Void(); + Optional newCoordinatorsKey = wait(tr.get( + LiteralStringRef("auto_coordinators") + .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin))); + ASSERT(newCoordinatorsKey.present()); + desiredCoordinatorsKey = newCoordinatorsKey.get().toString(); tr.reset(); break; } catch (Error& e) { - wait(tr.onError(e)); + if (e.code() == error_code_special_keys_api_failure) { + Optional errorMsg = + wait(tr.get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin)); + ASSERT(errorMsg.present()); + std::string errorStr; + auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj(); + auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj(); + // special_key_space_management_api_error_msg schema validation + TraceEvent(SevDebug, "GetAutoCoordinatorsChange") + .detail("ErrorMessage", valueObj["message"].get_str()); + ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true)); + ASSERT(valueObj["command"].get_str() == "auto_coordinators"); + if (valueObj["retriable"].get_bool() && notEnoughMachineResults < 1) { + notEnoughMachineResults++; + wait(delay(1.0)); + tr.reset(); + } else { + break; + } + } else { + wait(tr.onError(e)); + } + wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); } } } else { From c27c2756ec3ae1b477f391af875492d601b3af67 Mon Sep 17 00:00:00 2001 From: Chaoguang Lin Date: Wed, 17 Feb 2021 13:23:46 -0800 Subject: [PATCH 12/13] Remove unnecessary header added by IDE --- fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp b/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp index 317bcac5ca..9464914012 100644 --- a/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp +++ b/fdbserver/workloads/SpecialKeySpaceCorrectness.actor.cpp @@ -27,8 +27,6 @@ #include "fdbclient/SpecialKeySpace.actor.h" #include "fdbserver/TesterInterface.actor.h" #include "fdbserver/workloads/workloads.actor.h" -#include "flow/Knobs.h" -#include "flow/Trace.h" #include "flow/actorcompiler.h" struct SpecialKeySpaceCorrectnessWorkload : TestWorkload { From 70098b752a2eb874eb4361a08c1fe9b604d1218a Mon Sep 17 00:00:00 2001 From: Chaoguang Lin Date: Thu, 18 Feb 2021 14:23:51 -0800 Subject: [PATCH 13/13] Fix typo, solve comments --- fdbclient/SpecialKeySpace.actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index 1c53a4b0b7..fdb8ded1fd 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -1382,7 +1382,7 @@ Future> CoordinatorsImpl::getRange(ReadYourWritesTran std::sort(coordinator_processes.begin(), coordinator_processes.end(), [](const NetworkAddress& lhs, const NetworkAddress& rhs) { return lhs.toString() < rhs.toString(); }); std::string processes_str; - for (auto& w : coordinator_processes) { + for (const auto& w : coordinator_processes) { if (processes_str.size()) processes_str += ","; processes_str += w.toString(); } @@ -1422,7 +1422,7 @@ ACTOR static Future> coordinatorsCommitActor(ReadYourWrite else addressesVec.push_back(a); } catch (Error& e) { - TraceEvent(SevDebug, "SpeicalKeysNetworkParseError").error(e); + TraceEvent(SevDebug, "SpecialKeysNetworkParseError").error(e); parse_error = true; }