Update ChangeConfig test for auto coordinators change
This commit is contained in:
parent
e06f4b4c29
commit
f3a849de10
|
@ -955,6 +955,11 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<ClusterConnectionF
|
|||
std::make_unique<CoordinatorsImpl>(
|
||||
KeyRangeRef(LiteralStringRef("coordinators/"), LiteralStringRef("coordinators0"))
|
||||
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::CONFIGURATION).begin)));
|
||||
registerSpecialKeySpaceModule(
|
||||
SpecialKeySpace::MODULE::MANAGEMENT, SpecialKeySpace::IMPLTYPE::READONLY,
|
||||
std::make_unique<CoordinatorsAutoImpl>(
|
||||
singleKeyRange(LiteralStringRef("auto_coordinators"))
|
||||
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
|
||||
}
|
||||
if (apiVersionAtLeast(630)) {
|
||||
registerSpecialKeySpaceModule(SpecialKeySpace::MODULE::TRANSACTION, SpecialKeySpace::IMPLTYPE::READONLY,
|
||||
|
|
|
@ -1436,7 +1436,6 @@ ACTOR static Future<Optional<std::string>> coordinatorsCommitActor(ReadYourWrite
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
if (addressesVec.size())
|
||||
change = specifiedQuorumChange(addressesVec);
|
||||
else
|
||||
|
@ -1508,3 +1507,50 @@ void CoordinatorsImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key)
|
|||
return throwSpecialKeyApiFailure(ryw, "coordinators",
|
||||
"Clear operation is meaningless thus forbidden for coordinators");
|
||||
}
|
||||
|
||||
CoordinatorsAutoImpl::CoordinatorsAutoImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {}
|
||||
|
||||
ACTOR static Future<Standalone<RangeResultRef>> CoordinatorsAutoImplActor(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr) {
|
||||
state Standalone<RangeResultRef> res;
|
||||
state std::string autoCoordinatorsKey;
|
||||
state Transaction& tr = ryw->getTransaction();
|
||||
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr.setOption(FDBTransactionOptions::USE_PROVISIONAL_PROXIES);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
Optional<Value> currentKey = wait(tr.get(coordinatorsKey));
|
||||
|
||||
if (!currentKey.present()) {
|
||||
ryw->setSpecialKeySpaceErrorMsg(
|
||||
ManagementAPIError::toJsonString(false, "auto_coordinators", "The coordinator key does not exist"));
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
state ClusterConnectionString old(currentKey.get().toString());
|
||||
state CoordinatorsResult result = CoordinatorsResult::SUCCESS;
|
||||
|
||||
std::vector<NetworkAddress> _desiredCoordinators = wait(autoQuorumChange()->getDesiredCoordinators(
|
||||
&tr, old.coordinators(), Reference<ClusterConnectionFile>(new ClusterConnectionFile(old)), result));
|
||||
|
||||
if (result == CoordinatorsResult::NOT_ENOUGH_MACHINES) {
|
||||
// we could get not_enough_machines if we happen to see the database while the cluster controller is updating
|
||||
// the worker list, so make sure it happens twice before returning a failure
|
||||
ryw->setSpecialKeySpaceErrorMsg(ManagementAPIError::toJsonString(
|
||||
true, "auto_coordinators", "The auto change attempt did not get enough machines, please try again"));
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
|
||||
for (const auto& address : _desiredCoordinators) {
|
||||
autoCoordinatorsKey += autoCoordinatorsKey.size() ? "," : "";
|
||||
autoCoordinatorsKey += address.toString();
|
||||
}
|
||||
res.push_back_deep(res.arena(), KeyValueRef(kr.begin, Value(autoCoordinatorsKey)));
|
||||
return res;
|
||||
}
|
||||
|
||||
Future<Standalone<RangeResultRef>> CoordinatorsAutoImpl::getRange(ReadYourWritesTransaction* ryw,
|
||||
KeyRangeRef kr) const {
|
||||
// single key range, the queried range should always be the same as the underlying range
|
||||
ASSERT(kr == getKeyRange());
|
||||
return CoordinatorsAutoImplActor(ryw, kr);
|
||||
}
|
||||
|
|
|
@ -341,5 +341,11 @@ public:
|
|||
void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override;
|
||||
};
|
||||
|
||||
class CoordinatorsAutoImpl : public SpecialKeyRangeReadImpl {
|
||||
public:
|
||||
explicit CoordinatorsAutoImpl(KeyRangeRef kr);
|
||||
Future<Standalone<RangeResultRef>> getRange(ReadYourWritesTransaction* ryw, KeyRangeRef kr) const override;
|
||||
};
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
#endif
|
||||
|
|
|
@ -108,50 +108,41 @@ struct ChangeConfigWorkload : TestWorkload {
|
|||
bool autoChange = false) {
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
state int notEnoughMachineResults = 0; // Retry for the second time if we first get this result
|
||||
// state std::vector<NetworkAddress> desiredCoordinators; // the desired coordinators' network addresses
|
||||
state std::string desiredCoordinatorsKey; // comma separated
|
||||
if (autoChange) { // if auto, we first get the desired addresses, which is not changed in the following retries
|
||||
if (autoChange) { // if auto, we first get the desired addresses by read \xff\xff/management/auto_coordinators
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
tr.setOption(FDBTransactionOptions::USE_PROVISIONAL_PROXIES);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
Optional<Value> currentKey = wait(tr.get(coordinatorsKey));
|
||||
|
||||
if (!currentKey.present()) return Void(); // Someone deleted this key entirely?
|
||||
|
||||
ClusterConnectionString old(currentKey.get().toString());
|
||||
if (cx->getConnectionFile() && old.clusterKeyName().toString() !=
|
||||
cx->getConnectionFile()->getConnectionString().clusterKeyName())
|
||||
return Void(); // Someone changed the "name" of the database??
|
||||
|
||||
state CoordinatorsResult result = CoordinatorsResult::SUCCESS;
|
||||
if (!desiredCoordinatorsKey.size()) {
|
||||
std::vector<NetworkAddress> _desiredCoordinators =
|
||||
wait(autoQuorumChange()->getDesiredCoordinators(
|
||||
&tr.getTransaction(), old.coordinators(),
|
||||
Reference<ClusterConnectionFile>(new ClusterConnectionFile(old)), result));
|
||||
for (const auto& address : _desiredCoordinators) {
|
||||
desiredCoordinatorsKey += desiredCoordinatorsKey.size() ? "," : "";
|
||||
desiredCoordinatorsKey += address.toString();
|
||||
}
|
||||
}
|
||||
|
||||
if (result == CoordinatorsResult::NOT_ENOUGH_MACHINES && notEnoughMachineResults < 1) {
|
||||
// we could get not_enough_machines if we happen to see the database while the cluster
|
||||
// controller is updating the worker list, so make sure it happens twice before returning a
|
||||
// failure
|
||||
notEnoughMachineResults++;
|
||||
wait(delay(1.0));
|
||||
tr.reset();
|
||||
continue;
|
||||
}
|
||||
if (result != CoordinatorsResult::SUCCESS) return Void();
|
||||
Optional<Value> newCoordinatorsKey = wait(tr.get(
|
||||
LiteralStringRef("auto_coordinators")
|
||||
.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)));
|
||||
ASSERT(newCoordinatorsKey.present());
|
||||
desiredCoordinatorsKey = newCoordinatorsKey.get().toString();
|
||||
tr.reset();
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
if (e.code() == error_code_special_keys_api_failure) {
|
||||
Optional<Value> errorMsg =
|
||||
wait(tr.get(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::ERRORMSG).begin));
|
||||
ASSERT(errorMsg.present());
|
||||
std::string errorStr;
|
||||
auto valueObj = readJSONStrictly(errorMsg.get().toString()).get_obj();
|
||||
auto schema = readJSONStrictly(JSONSchemas::managementApiErrorSchema.toString()).get_obj();
|
||||
// special_key_space_management_api_error_msg schema validation
|
||||
TraceEvent(SevDebug, "GetAutoCoordinatorsChange")
|
||||
.detail("ErrorMessage", valueObj["message"].get_str());
|
||||
ASSERT(schemaMatch(schema, valueObj, errorStr, SevError, true));
|
||||
ASSERT(valueObj["command"].get_str() == "auto_coordinators");
|
||||
if (valueObj["retriable"].get_bool() && notEnoughMachineResults < 1) {
|
||||
notEnoughMachineResults++;
|
||||
wait(delay(1.0));
|
||||
tr.reset();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
wait(delay(FLOW_KNOBS->PREVENT_FAST_SPIN_DELAY));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue