Refactor the register logic so that only two write transactions take place. The first is to the data cluster, the second to the management cluster.

This commit is contained in:
A.J. Beamon 2022-05-18 11:21:39 -07:00
parent e8e26c9f7c
commit 98c3813431
6 changed files with 140 additions and 155 deletions

View File

@ -160,8 +160,6 @@ ACTOR Future<bool> metaclusterGetCommand(Reference<IDatabase> db, std::vector<St
}
DataClusterMetadata metadata = wait(MetaclusterAPI::getCluster(db, tokens[2]));
printf(" registration state: %s\n",
DataClusterEntry::registrationStateToString(metadata.entry.registrationState).toString().c_str());
printf(" connection string: %s\n", metadata.connectionString.toString().c_str());
printf(" tenant group capacity: %d\n", metadata.entry.capacity.numTenantGroups);
printf(" allocated tenant groups: %d\n", metadata.entry.allocated.numTenantGroups);

View File

@ -85,6 +85,11 @@ public:
std::vector<NetworkAddress> coords;
std::vector<Hostname> hostnames;
bool operator==(const ClusterConnectionString& other) const noexcept {
return key == other.key && keyDesc == other.keyDesc && coords == other.coords && hostnames == other.hostnames;
}
bool operator!=(const ClusterConnectionString& other) const noexcept { return !(*this == other); }
private:
void parseConnString();
Key key, keyDesc;

View File

@ -58,38 +58,20 @@ struct Traceable<ClusterUsage> : std::true_type {
struct DataClusterEntry {
constexpr static FileIdentifier file_identifier = 929511;
enum class RegistrationState { UNKNOWN = 0, REGISTERING = 1, READY = 2 };
static RegistrationState stringToRegistrationState(StringRef str) {
if (str == "REGISTERING"_sr) {
return RegistrationState::REGISTERING;
} else if (str == "READY"_sr) {
return RegistrationState::READY;
} else {
return RegistrationState::UNKNOWN;
}
}
static StringRef registrationStateToString(RegistrationState state) {
if (state == RegistrationState::REGISTERING) {
return "REGISTERING"_sr;
} else if (state == RegistrationState::READY) {
return "READY"_sr;
} else {
return "UNKNOWN"_sr;
}
}
UID id;
ClusterUsage capacity;
ClusterUsage allocated;
RegistrationState registrationState = RegistrationState::REGISTERING;
DataClusterEntry() = default;
DataClusterEntry(ClusterUsage capacity) : capacity(capacity) {}
DataClusterEntry(UID id, ClusterUsage capacity, ClusterUsage allocated)
: id(id), capacity(capacity), allocated(allocated) {}
// Returns true if all configurable properties match
bool matchesConfiguration(DataClusterEntry const& other) const {
return id == other.id && capacity == other.capacity;
}
Value encode() { return ObjectWriter::toValue(*this, IncludeVersion(ProtocolVersion::withMetacluster())); }
static DataClusterEntry decode(ValueRef const& value) {
DataClusterEntry entry;
@ -100,7 +82,7 @@ struct DataClusterEntry {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id, capacity, allocated, registrationState);
serializer(ar, id, capacity, allocated);
}
};
@ -108,10 +90,12 @@ struct DataClusterRegistrationEntry {
constexpr static FileIdentifier file_identifier = 13448589;
ClusterName name;
UID metaclusterId;
UID id;
DataClusterRegistrationEntry() = default;
DataClusterRegistrationEntry(ClusterName name, UID id) : name(name), id(id) {}
DataClusterRegistrationEntry(ClusterName name, UID metaclusterId, UID id)
: name(name), metaclusterId(metaclusterId), id(id) {}
Value encode() { return ObjectWriter::toValue(*this, IncludeVersion(ProtocolVersion::withMetacluster())); }
static DataClusterRegistrationEntry decode(ValueRef const& value) {
@ -123,7 +107,7 @@ struct DataClusterRegistrationEntry {
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, name, id);
serializer(ar, name, metaclusterId, id);
}
};

View File

@ -53,8 +53,12 @@ struct DataClusterMetadata {
DataClusterMetadata(DataClusterEntry const& entry, ClusterConnectionString const& connectionString)
: entry(entry), connectionString(connectionString) {}
Value encode() { return ObjectWriter::toValue(*this, IncludeVersion(ProtocolVersion::withMetacluster())); }
Value encode(Arena& arena) {
bool matchesConfiguration(DataClusterMetadata const& other) const {
return entry.matchesConfiguration(other.entry) && connectionString == other.connectionString;
}
Value encode() const { return ObjectWriter::toValue(*this, IncludeVersion(ProtocolVersion::withMetacluster())); }
Value encode(Arena& arena) const {
return ObjectWriter::toValue(*this, IncludeVersion(ProtocolVersion::withMetacluster()), arena);
}
static DataClusterMetadata decode(ValueRef const& value) {
@ -152,20 +156,14 @@ void updateClusterMetadata(Transaction tr,
}
ACTOR template <class Transaction>
Future<Void> managementClusterRegister(Transaction tr,
ClusterNameRef name,
std::string connectionString,
DataClusterEntry entry) {
state Key dataClusterMetadataKey = name.withPrefix(dataClusterMetadataPrefix);
state Key dataClusterConnectionRecordKey = name.withPrefix(dataClusterConnectionRecordPrefix);
if (name.startsWith("\xff"_sr)) {
throw invalid_cluster_name();
}
Future<std::pair<UID, bool>> managementClusterRegisterPrecheck(Transaction tr,
ClusterNameRef name,
Optional<DataClusterMetadata> metadata) {
state Future<Optional<DataClusterMetadata>> dataClusterMetadataFuture = tryGetClusterTransaction(tr, name);
state typename transaction_future_type<Transaction, Optional<Value>>::type tenantModeFuture =
tr->get(configKeysPrefix.withSuffix("tenant_mode"_sr));
state typename transaction_future_type<Transaction, Optional<Value>>::type metaclusterIdFuture =
tr->get(clusterIdKey);
Optional<Value> tenantMode = wait(safeThreadFutureToFuture(tenantModeFuture));
@ -173,27 +171,42 @@ Future<Void> managementClusterRegister(Transaction tr,
throw invalid_metacluster_operation();
}
Optional<DataClusterMetadata> dataClusterMetadata = wait(dataClusterMetadataFuture);
if (dataClusterMetadata.present()) {
if (dataClusterMetadata.get().entry.registrationState == DataClusterEntry::RegistrationState::REGISTERING &&
dataClusterMetadata.get().connectionString.toString() == connectionString &&
dataClusterMetadata.get().entry.capacity == entry.capacity) {
return Void();
} else {
throw cluster_already_exists();
}
state Optional<DataClusterMetadata> dataClusterMetadata = wait(dataClusterMetadataFuture);
if (dataClusterMetadata.present() &&
(!metadata.present() || !metadata.get().matchesConfiguration(dataClusterMetadata.get()))) {
throw cluster_already_exists();
}
entry.allocated = ClusterUsage();
Optional<Value> metaclusterIdValue = wait(safeThreadFutureToFuture(metaclusterIdFuture));
ASSERT(metaclusterIdValue.present());
tr->set(dataClusterMetadataKey, entry.encode());
tr->set(dataClusterConnectionRecordKey, connectionString);
return std::make_pair(BinaryReader::fromStringRef<UID>(metaclusterIdValue.get(), Unversioned()),
dataClusterMetadata.present());
}
ACTOR template <class Transaction>
Future<Void> managementClusterRegister(Transaction tr,
ClusterNameRef name,
ClusterConnectionString connectionString,
DataClusterEntry entry) {
state Key dataClusterMetadataKey = name.withPrefix(dataClusterMetadataPrefix);
state Key dataClusterConnectionRecordKey = name.withPrefix(dataClusterConnectionRecordPrefix);
std::pair<UID, bool> result =
wait(managementClusterRegisterPrecheck(tr, name, DataClusterMetadata(entry, connectionString)));
if (!result.second) {
entry.allocated = ClusterUsage();
tr->set(dataClusterMetadataKey, entry.encode());
tr->set(dataClusterConnectionRecordKey, connectionString.toString());
}
return Void();
}
ACTOR template <class Transaction>
Future<Void> dataClusterRegister(Transaction tr, ClusterNameRef name, UID clusterId) {
Future<UID> dataClusterRegister(Transaction tr, ClusterNameRef name, UID metaclusterId) {
state Future<std::map<TenantName, TenantMapEntry>> existingTenantsFuture =
ManagementAPI::listTenantsTransaction(tr, ""_sr, "\xff\xff"_sr, 1);
state typename transaction_future_type<Transaction, RangeResult>::type existingDataFuture =
@ -201,6 +214,19 @@ Future<Void> dataClusterRegister(Transaction tr, ClusterNameRef name, UID cluste
state typename transaction_future_type<Transaction, Optional<Value>>::type clusterRegistrationFuture =
tr->get(dataClusterRegistrationKey);
Optional<Value> storedClusterRegistration = wait(safeThreadFutureToFuture(clusterRegistrationFuture));
if (storedClusterRegistration.present()) {
DataClusterRegistrationEntry existingRegistration =
DataClusterRegistrationEntry::decode(storedClusterRegistration.get());
if (existingRegistration.name != name || existingRegistration.metaclusterId != metaclusterId) {
throw cluster_already_registered();
} else {
// We already successfully registered the cluster with these details, so there's nothing to do
return existingRegistration.id;
}
}
std::map<TenantName, TenantMapEntry> existingTenants = wait(safeThreadFutureToFuture(existingTenantsFuture));
if (!existingTenants.empty()) {
TraceEvent(SevWarn, "CannotRegisterClusterWithTenants").detail("ClusterName", name);
@ -213,20 +239,8 @@ Future<Void> dataClusterRegister(Transaction tr, ClusterNameRef name, UID cluste
throw cluster_not_empty();
}
Optional<Value> storedClusterRegistration = wait(safeThreadFutureToFuture(clusterRegistrationFuture));
if (storedClusterRegistration.present()) {
DataClusterRegistrationEntry existingRegistration =
DataClusterRegistrationEntry::decode(storedClusterRegistration.get());
if (existingRegistration.name != name || existingRegistration.id != clusterId) {
throw cluster_already_registered();
} else {
// We already successfully registered the cluster with these details, so there's nothing to do
return Void();
}
}
tr->set(dataClusterRegistrationKey, DataClusterRegistrationEntry(name, clusterId).encode());
state UID clusterId = deterministicRandom()->randomUniqueID();
tr->set(dataClusterRegistrationKey, DataClusterRegistrationEntry(name, metaclusterId, clusterId).encode());
std::vector<StringRef> tokens = { "tenant_mode=required"_sr };
@ -242,19 +256,7 @@ Future<Void> dataClusterRegister(Transaction tr, ClusterNameRef name, UID cluste
throw cluster_configuration_failure();
}
return Void();
}
ACTOR template <class Transaction>
Future<Void> finalizeRegistration(Transaction tr, ClusterNameRef name, UID clusterId) {
state Optional<DataClusterMetadata> dataClusterMetadata = wait(tryGetClusterTransaction(tr, name));
if (dataClusterMetadata.get().entry.id == clusterId &&
dataClusterMetadata.get().entry.registrationState == DataClusterEntry::RegistrationState::REGISTERING) {
dataClusterMetadata.get().entry.registrationState = DataClusterEntry::RegistrationState::READY;
updateClusterMetadata(tr, name, Optional<ClusterConnectionString>(), dataClusterMetadata.get().entry);
}
return Void();
return clusterId;
}
ACTOR template <class DB>
@ -266,14 +268,80 @@ Future<Void> registerCluster(Reference<DB> db,
throw invalid_cluster_name();
}
entry.id = deterministicRandom()->randomUniqueID();
state UID metaclusterId;
// Step 1: Record that we are registering the new cluster
// Step 1: Check for a conflicting cluster in the management cluster and get the metacluster ID
state Reference<typename DB::TransactionT> precheckTr = db->createTransaction();
loop {
try {
precheckTr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
std::pair<UID, bool> result =
wait(managementClusterRegisterPrecheck(precheckTr, name, Optional<DataClusterMetadata>()));
metaclusterId = result.first;
if (BUGGIFY) {
throw commit_unknown_result();
}
wait(safeThreadFutureToFuture(precheckTr->commit()));
if (BUGGIFY) {
throw commit_unknown_result();
}
TraceEvent("RegisteredDataCluster")
.detail("ClusterName", name)
.detail("ClusterID", entry.id)
.detail("Capacity", entry.capacity)
.detail("Version", precheckTr->getCommittedVersion())
.detail("ConnectionString", connectionString.toString());
break;
} catch (Error& e) {
wait(safeThreadFutureToFuture(precheckTr->onError(e)));
}
}
// Step 2: Configure the data cluster as a subordinate cluster
state Reference<IDatabase> dataClusterDb =
MultiVersionApi::api->createDatabase(makeReference<ClusterConnectionMemoryRecord>(connectionString));
state Reference<ITransaction> dataClusterTr = dataClusterDb->createTransaction();
loop {
try {
dataClusterTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
UID clusterId = wait(dataClusterRegister(dataClusterTr, name, metaclusterId));
entry.id = clusterId;
if (BUGGIFY) {
throw commit_unknown_result();
}
wait(safeThreadFutureToFuture(dataClusterTr->commit()));
if (BUGGIFY) {
throw commit_unknown_result();
}
TraceEvent("ConfiguredDataCluster")
.detail("ClusterName", name)
.detail("ClusterID", entry.id)
.detail("Capacity", entry.capacity)
.detail("Version", dataClusterTr->getCommittedVersion())
.detail("ConnectionString", connectionString.toString());
break;
} catch (Error& e) {
wait(safeThreadFutureToFuture(dataClusterTr->onError(e)));
}
}
// Step 3: Register the cluster in the management cluster
state Reference<typename DB::TransactionT> registerTr = db->createTransaction();
loop {
try {
registerTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
wait(managementClusterRegister(registerTr, name, connectionString.toString(), entry));
wait(managementClusterRegister(registerTr, name, connectionString, entry));
if (BUGGIFY) {
throw commit_unknown_result();
@ -285,7 +353,7 @@ Future<Void> registerCluster(Reference<DB> db,
throw commit_unknown_result();
}
TraceEvent("RegisteredDataCluster")
TraceEvent("FinalizedDataCluster")
.detail("ClusterName", name)
.detail("ClusterID", entry.id)
.detail("Capacity", entry.capacity)
@ -298,74 +366,6 @@ Future<Void> registerCluster(Reference<DB> db,
}
}
// Step 2: Configure the data cluster as a subordinate cluster
state Reference<IDatabase> dataClusterDb =
MultiVersionApi::api->createDatabase(makeReference<ClusterConnectionMemoryRecord>(connectionString));
state Reference<ITransaction> dataClusterTr = dataClusterDb->createTransaction();
try {
loop {
try {
dataClusterTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
wait(dataClusterRegister(dataClusterTr, name, entry.id));
if (BUGGIFY) {
throw commit_unknown_result();
}
wait(safeThreadFutureToFuture(dataClusterTr->commit()));
if (BUGGIFY) {
throw commit_unknown_result();
}
TraceEvent("ConfiguredDataCluster")
.detail("ClusterName", name)
.detail("ClusterID", entry.id)
.detail("Capacity", entry.capacity)
.detail("Version", dataClusterTr->getCommittedVersion())
.detail("ConnectionString", connectionString.toString());
break;
} catch (Error& e) {
wait(safeThreadFutureToFuture(dataClusterTr->onError(e)));
}
}
} catch (Error& e) {
// TODO: Remove cluster if the parameters match and the cluster is in registering state
}
// Step 3: Record that the cluster is ready in the management cluster
state Reference<typename DB::TransactionT> finalizeTr = db->createTransaction();
loop {
try {
finalizeTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
wait(finalizeRegistration(finalizeTr, name, entry.id));
if (BUGGIFY) {
throw commit_unknown_result();
}
wait(safeThreadFutureToFuture(finalizeTr->commit()));
if (BUGGIFY) {
throw commit_unknown_result();
}
TraceEvent("FinalizedDataCluster")
.detail("ClusterName", name)
.detail("ClusterID", entry.id)
.detail("Capacity", entry.capacity)
.detail("Version", finalizeTr->getCommittedVersion())
.detail("ConnectionString", connectionString.toString());
break;
} catch (Error& e) {
wait(safeThreadFutureToFuture(finalizeTr->onError(e)));
}
}
return Void();
}

View File

@ -1415,7 +1415,6 @@ const KeyRef dataClusterMetadataPrefix = dataClusterMetadataKeys.begin;
const KeyRangeRef dataClusterConnectionRecordKeys("\xff/metacluster/dataCluster/connectionString/"_sr,
"\xff/metacluster/dataCluster/connectionString0"_sr);
const KeyRef dataClusterConnectionRecordPrefix = dataClusterConnectionRecordKeys.begin;
const KeyRef dataClusterLastIdKey = "\xff/metacluster/dataCluster/lastId/"_sr;
// Metacluster data cluster keys
const KeyRef dataClusterRegistrationKey = "\xff/metacluster/dataCluster/clusterRegistration"_sr;

View File

@ -633,7 +633,6 @@ extern const KeyRangeRef dataClusterMetadataKeys;
extern const KeyRef dataClusterMetadataPrefix;
extern const KeyRangeRef dataClusterConnectionRecordKeys;
extern const KeyRef dataClusterConnectionRecordPrefix;
extern const KeyRef dataClusterLastIdKey;
extern const KeyRef dataClusterRegistrationKey;
#pragma clang diagnostic pop