diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 949915e70f..4fda82740d 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -955,6 +955,11 @@ DatabaseContext::DatabaseContext(Reference( KeyRangeRef(LiteralStringRef("coordinators/"), LiteralStringRef("coordinators0")) .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin))); + registerSpecialKeySpaceModule( + SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READONLY, + std::make_unique( + singleKeyRange(LiteralStringRef("auto_coordinators")) + .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin))); } if (apiVersionAtLeast(630)) { registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, SpecialKeySpace::IMPLTYPE::READONLY, diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index 1fb31aa7d8..1c53a4b0b7 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -1436,7 +1436,6 @@ ACTOR static Future> coordinatorsCommitActor(ReadYourWrite } } - if (addressesVec.size()) change = specifiedQuorumChange(addressesVec); else @@ -1508,3 +1507,50 @@ void CoordinatorsImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) return throwSpecialKeyApiFailure(ryw, "coordinators", "Clear operation is meaningless thus forbidden for coordinators"); } + +CoordinatorsAutoImpl::CoordinatorsAutoImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {} + +ACTOR static Future> CoordinatorsAutoImplActor(ReadYourWritesTransaction* ryw, + KeyRangeRef kr) { + state Standalone res; + state std::string autoCoordinatorsKey; + state Transaction& tr = ryw->getTransaction(); + + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + tr.setOption(FDBTransactionOptions::USE_PROVISIONAL_PROXIES); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + Optional currentKey = wait(tr.get(coordinatorsKey)); + + if (!currentKey.present()) { + ryw->setSpecialKeySpaceErrorMsg( + ManagementAPIError::toJsonString(false, "auto_coordinators", "The coordinator key does not exist")); + throw special_keys_api_failure(); + } + state ClusterConnectionString old(currentKey.get().toString()); + state CoordinatorsResult result = CoordinatorsResult::SUCCESS; + + std::vector _desiredCoordinators = wait(autoQuorumChange()->getDesiredCoordinators( + &tr, old.coordinators(), Reference(new ClusterConnectionFile(old)), result)); + + if (result == CoordinatorsResult::NOT_ENOUGH_MACHINES) { + // we could get not_enough_machines if we happen to see the database while the cluster controller is updating + // the worker list, so make sure it happens twice before returning a failure + ryw->setSpecialKeySpaceErrorMsg(ManagementAPIError::toJsonString( + true, "auto_coordinators", "The auto change attempt did not get enough machines, please try again")); + throw special_keys_api_failure(); + } + + for (const auto& address : _desiredCoordinators) { + autoCoordinatorsKey += autoCoordinatorsKey.size() ? "," : ""; + autoCoordinatorsKey += address.toString(); + } + res.push_back_deep(res.arena(), KeyValueRef(kr.begin, Value(autoCoordinatorsKey))); + return res; +} + +Future> CoordinatorsAutoImpl::getRange(ReadYourWritesTransaction* ryw, + KeyRangeRef kr) const { + // single key range, the queried range should always be the same as the underlying range + ASSERT(kr == getKeyRange()); + return CoordinatorsAutoImplActor(ryw, kr); +} diff --git a/fdbclient/SpecialKeySpace.actor.h b/fdbclient/SpecialKeySpace.actor.h index 3cbd08ef85..793a070e1e 100644 --- a/fdbclient/SpecialKeySpace.actor.h +++ b/fdbclient/SpecialKeySpace.actor.h @@ -341,5 +341,11 @@ public: void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override; }; +class CoordinatorsAutoImpl : public SpecialKeyRangeReadImpl { +public: + explicit CoordinatorsAutoImpl(KeyRangeRef kr); + Future> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override; +}; + #include "flow/unactorcompiler.h" #endif diff --git a/fdbserver/workloads/ChangeConfig.actor.cpp b/fdbserver/workloads/ChangeConfig.actor.cpp index 72f5809793..74a3e43979 100644 --- a/fdbserver/workloads/ChangeConfig.actor.cpp +++ b/fdbserver/workloads/ChangeConfig.actor.cpp @@ -108,50 +108,41 @@ struct ChangeConfigWorkload : TestWorkload { bool autoChange = false) { state ReadYourWritesTransaction tr(cx); state int notEnoughMachineResults = 0; // Retry for the second time if we first get this result - // state std::vector desiredCoordinators; // the desired coordinators' network addresses state std::string desiredCoordinatorsKey; // comma separated - if (autoChange) { // if auto, we first get the desired addresses, which is not changed in the following retries + if (autoChange) { // if auto, we first get the desired addresses by read \xff\xff/management/auto_coordinators loop { try { - tr.setOption(FDBTransactionOptions::LOCK_AWARE); - tr.setOption(FDBTransactionOptions::USE_PROVISIONAL_PROXIES); - tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); - tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); - Optional currentKey = wait(tr.get(coordinatorsKey)); - - if (!currentKey.present()) return Void(); // Someone deleted this key entirely? - - ClusterConnectionString old(currentKey.get().toString()); - if (cx->getConnectionFile() && old.clusterKeyName().toString() != - cx->getConnectionFile()->getConnectionString().clusterKeyName()) - return Void(); // Someone changed the "name" of the database?? - - state CoordinatorsResult result = CoordinatorsResult::SUCCESS; - if (!desiredCoordinatorsKey.size()) { - std::vector _desiredCoordinators = - wait(autoQuorumChange()->getDesiredCoordinators( - &tr.getTransaction(), old.coordinators(), - Reference(new ClusterConnectionFile(old)), result)); - for (const auto& address : _desiredCoordinators) { - desiredCoordinatorsKey += desiredCoordinatorsKey.size() ? "," : ""; - desiredCoordinatorsKey += address.toString(); - } - } - - if (result == CoordinatorsResult::NOT_ENOUGH_MACHINES && notEnoughMachineResults < 1) { - // we could get not_enough_machines if we happen to see the database while the cluster - // controller is updating the worker list, so make sure it happens twice before returning a - // failure - notEnoughMachineResults++; - wait(delay(1.0)); - tr.reset(); - continue; - } - if (result != CoordinatorsResult::SUCCESS) return Void(); + Optional newCoordinatorsKey = wait(tr.get( + LiteralStringRef("auto_coordinators") + .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin))); + ASSERT(newCoordinatorsKey.present()); + desiredCoordinatorsKey = newCoordinatorsKey.get().toString(); tr.reset(); break; } catch (Error& e) { - wait(tr.onError(e)); + if (e.code() == error_code_special_keys_api_failure) { + Optional errorMsg = + wait(tr.get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin)); + ASSERT(errorMsg.present()); + std::string errorStr; + auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj(); + auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj(); + // special_key_space_management_api_error_msg schema validation + TraceEvent(SevDebug, "GetAutoCoordinatorsChange") + .detail("ErrorMessage", valueObj["message"].get_str()); + ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true)); + ASSERT(valueObj["command"].get_str() == "auto_coordinators"); + if (valueObj["retriable"].get_bool() && notEnoughMachineResults < 1) { + notEnoughMachineResults++; + wait(delay(1.0)); + tr.reset(); + } else { + break; + } + } else { + wait(tr.onError(e)); + } + wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY)); } } } else {