From 5db1135ac6f721ed07a539d735071751e0663ab3 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Thu, 28 Jul 2022 16:44:20 -0700 Subject: [PATCH] Add MetaclusterOperationContext to make it easier to run multi-transaction operations --- fdbclient/include/fdbclient/Metacluster.h | 44 +- .../fdbclient/MetaclusterManagement.actor.h | 1566 ++++++++--------- .../fdbclient/TenantManagement.actor.h | 66 +- ...antManagementConcurrencyWorkload.actor.cpp | 55 +- flow/include/flow/error_definitions.h | 1 + 5 files changed, 873 insertions(+), 859 deletions(-) diff --git a/fdbclient/include/fdbclient/Metacluster.h b/fdbclient/include/fdbclient/Metacluster.h index 26bcdeafe2..a475177194 100644 --- a/fdbclient/include/fdbclient/Metacluster.h +++ b/fdbclient/include/fdbclient/Metacluster.h @@ -120,7 +120,31 @@ struct MetaclusterRegistrationEntry { ASSERT(metaclusterName != name && metaclusterId != id); } - Value encode() { return ObjectWriter::toValue(*this, IncludeVersion()); } + // Returns true if this entry is associated with the same cluster as the passed in entry. If one entry is from the + // management cluster and the other is from a data cluster, this checks whether they are part of the same + // metacluster. + bool matches(MetaclusterRegistrationEntry const& other) const { + if (metaclusterName != other.metaclusterName || metaclusterId != other.metaclusterId) { + return false; + } else if (clusterType == ClusterType::METACLUSTER_DATA && other.clusterType == ClusterType::METACLUSTER_DATA && + (name != other.name || id != other.id)) { + return false; + } + + return true; + } + + MetaclusterRegistrationEntry toManagementClusterRegistration() const { + ASSERT(clusterType == ClusterType::METACLUSTER_DATA); + return MetaclusterRegistrationEntry(metaclusterName, metaclusterId); + } + + MetaclusterRegistrationEntry toDataClusterRegistration(ClusterName name, UID id) const { + ASSERT(clusterType == ClusterType::METACLUSTER_MANAGEMENT); + return MetaclusterRegistrationEntry(metaclusterName, name, metaclusterId, id); + } + + Value encode() const { return ObjectWriter::toValue(*this, IncludeVersion()); } static MetaclusterRegistrationEntry decode(ValueRef const& value) { MetaclusterRegistrationEntry entry; ObjectReader reader(value.begin(), IncludeVersion()); @@ -132,12 +156,30 @@ struct MetaclusterRegistrationEntry { [](ValueRef const& v) { return MetaclusterRegistrationEntry::decode(v); }); } + std::string toString() const { + if (clusterType == ClusterType::METACLUSTER_MANAGEMENT) { + return fmt::format( + "metacluster name: {}, metacluster id: {}\n", printable(metaclusterName), metaclusterId.shortString()); + } else { + return fmt::format("metacluster name: {}, metacluster id: {}, data cluster name: {}, data cluster id: {}\n", + printable(metaclusterName), + metaclusterId.shortString(), + printable(name), + id.shortString()); + } + } + template void serialize(Ar& ar) { serializer(ar, clusterType, metaclusterName, name, metaclusterId, id); } }; +template <> +struct Traceable : std::true_type { + static std::string toString(MetaclusterRegistrationEntry const& entry) { return entry.toString(); } +}; + struct MetaclusterMetadata { // Registration information for a metacluster, stored on both management and data clusters static inline KeyBackedObjectProperty diff --git a/fdbclient/include/fdbclient/MetaclusterManagement.actor.h b/fdbclient/include/fdbclient/MetaclusterManagement.actor.h index a4abc77a56..be78c2a47e 100644 --- a/fdbclient/include/fdbclient/MetaclusterManagement.actor.h +++ b/fdbclient/include/fdbclient/MetaclusterManagement.actor.h @@ -20,6 +20,8 @@ #pragma once #include "fdbclient/FDBOptions.g.h" +#include "flow/IRandom.h" +#include "flow/ThreadHelper.actor.h" #if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_METACLUSTER_MANAGEMENT_ACTOR_G_H) #define FDBCLIENT_METACLUSTER_MANAGEMENT_ACTOR_G_H #include "fdbclient/MetaclusterManagement.actor.g.h" @@ -116,47 +118,7 @@ struct ManagementClusterMetadata { static KeyBackedSet clusterTenantGroupIndex; }; -template -Future> tryGetTenantTransaction(Transaction tr, TenantName name) { - tr->setOption(FDBTransactionOptions::RAW_ACCESS); - return ManagementClusterMetadata::tenantMetadata.tenantMap.get(tr, name); -} - -ACTOR template -Future> tryGetTenant(Reference db, TenantName name) { - state Reference tr = db->createTransaction(); - - loop { - try { - tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); - tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE); - Optional entry = wait(tryGetTenantTransaction(tr, name)); - return entry; - } catch (Error& e) { - wait(safeThreadFutureToFuture(tr->onError(e))); - } - } -} - -ACTOR template -Future getTenantTransaction(Transaction tr, TenantName name) { - Optional entry = wait(tryGetTenantTransaction(tr, name)); - if (!entry.present()) { - throw tenant_not_found(); - } - - return entry.get(); -} - -ACTOR template -Future getTenant(Reference db, TenantName name) { - Optional entry = wait(tryGetTenant(db, name)); - if (!entry.present()) { - throw tenant_not_found(); - } - - return entry.get(); -} +ACTOR Future> openDatabase(ClusterConnectionString connectionString); ACTOR template Future> tryGetClusterTransaction(Transaction tr, ClusterName name) { @@ -217,8 +179,6 @@ Future getCluster(Reference db, ClusterName name) { return metadata.get(); } -ACTOR Future> openDatabase(ClusterConnectionString connectionString); - ACTOR template Future> getAndOpenDatabase(Transaction managementTr, ClusterName clusterName) { DataClusterMetadata clusterMetadata = wait(getClusterTransaction(managementTr, clusterName)); @@ -226,6 +186,236 @@ Future> getAndOpenDatabase(Transaction managementTr, Cluste return db; } +template +struct MetaclusterOperationContext { + Reference managementDb; + Reference dataClusterDb; + + UID id = nondeterministicRandom()->randomUniqueID(); + + Optional clusterName; + + Optional metaclusterRegistration; + Optional dataClusterMetadata; + + MetaclusterOperationContext(Reference managementDb, Optional clusterName = {}) + : managementDb(managementDb), clusterName(clusterName) { + } + + // Run a transaction on the management cluster. This verifies that the cluster is a management cluster and matches + // the same metacluster that we've run any previous transactions on. If a clusterName is set, it also verifies that + // the specified cluster is present. Stores the metaclusterRegistration entry and, if a clusterName is set, the + // dataClusterMetadata and dataClusterDb in the context. + ACTOR template + static Future()(Reference()).getValue())> + runManagementTransaction(MetaclusterOperationContext* self, Function func) { + state Reference tr = self->managementDb->createTransaction(); + state bool clusterPresentAtStart = self->clusterName.present(); + loop { + try { + // If this transaction is retrying and didn't have the cluster name set at the beginning, clear it out + // to be set again in the next iteration. + if (!clusterPresentAtStart) { + self->clearCluster(); + } + + // Get the data cluster metadata for the specified cluster, if present + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + state Future> dataClusterMetadataFuture; + if (self->clusterName.present()) { + dataClusterMetadataFuture = tryGetClusterTransaction(tr, self->clusterName.get()); + } + + // Get the metacluster registration information + state Optional currentMetaclusterRegistration = + wait(MetaclusterMetadata::metaclusterRegistration.get(tr)); + + state Optional currentDataClusterMetadata; + if (self->clusterName.present()) { + wait(store(currentDataClusterMetadata, dataClusterMetadataFuture)); + } + + // Check that this is a management cluster and is the same metacluster that any previous transactions + // have run on. + if (!currentMetaclusterRegistration.present() || + currentMetaclusterRegistration.get().clusterType != ClusterType::METACLUSTER_MANAGEMENT) { + throw invalid_metacluster_operation(); + } else if (self->metaclusterRegistration.present() && + !self->metaclusterRegistration.get().matches(currentMetaclusterRegistration.get())) { + throw invalid_metacluster_operation(); + } + + // If a cluster was specified, check that the cluster metadata is present. If so, load it and store it + // in the context. Additionally, store the data cluster details in the local metacluster registration + // entry. + if (self->clusterName.present()) { + if (!currentDataClusterMetadata.present()) { + throw cluster_not_found(); + } else { + currentMetaclusterRegistration = currentMetaclusterRegistration.get().toDataClusterRegistration( + self->clusterName.get(), currentDataClusterMetadata.get().entry.id); + } + } + + // Store the metacluster registration entry + if (!self->metaclusterRegistration.present()) { + self->metaclusterRegistration = currentMetaclusterRegistration; + } + + // Check that our data cluster has the same ID as previous transactions. If so, then store the updated + // cluster metadata in the context and open a connection to the data DB. + if (self->dataClusterMetadata.present() && + self->dataClusterMetadata.get().entry.id != currentDataClusterMetadata.get().entry.id) { + throw cluster_not_found(); + } else if (self->clusterName.present()) { + self->dataClusterMetadata = currentDataClusterMetadata; + if (!self->dataClusterDb) { + wait( + store(self->dataClusterDb, openDatabase(self->dataClusterMetadata.get().connectionString))); + } + } + + state decltype(std::declval()(Reference()).getValue()) result = + wait(func(tr)); + + wait(buggifiedCommit(tr, BUGGIFY_WITH_PROB(0.1))); + return result; + } catch (Error& e) { + wait(safeThreadFutureToFuture(tr->onError(e))); + } + } + } + + template + Future()(Reference()).getValue())> + runManagementTransaction(Function func) { + return runManagementTransaction(this, func); + } + + // Runs a transaction on the data cluster. This requires that a cluster name be set and that a transaction has + // already been run on the management cluster to populate the needed metadata. This verifies that the data cluster + // has the expected ID and is part of the metacluster that previous transactions have run on. + ACTOR template + static Future()(Reference()).getValue())> + runDataClusterTransaction(MetaclusterOperationContext* self, Function func) { + ASSERT(self->dataClusterDb); + ASSERT(self->dataClusterMetadata.present()); + ASSERT(self->metaclusterRegistration.present() && + self->metaclusterRegistration.get().clusterType == ClusterType::METACLUSTER_DATA); + + state Reference tr = self->dataClusterDb->createTransaction(); + loop { + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + + state Optional currentMetaclusterRegistration = + wait(MetaclusterMetadata::metaclusterRegistration.get(tr)); + + // Check that this is the expected data cluster and is part of the right metacluster + if (!currentMetaclusterRegistration.present() || + currentMetaclusterRegistration.get().clusterType != ClusterType::METACLUSTER_DATA) { + throw invalid_metacluster_operation(); + } else if (!self->metaclusterRegistration.get().matches(currentMetaclusterRegistration.get())) { + throw invalid_metacluster_operation(); + } + + state decltype(std::declval()(Reference()).getValue()) result = + wait(func(tr)); + + wait(safeThreadFutureToFuture(tr->commit())); + return result; + } catch (Error& e) { + wait(safeThreadFutureToFuture(tr->onError(e))); + } + } + } + + template + Future()(Reference()).getValue())> + runDataClusterTransaction(Function func) { + return runDataClusterTransaction(this, func); + } + + ACTOR static Future updateClusterName(MetaclusterOperationContext* self, + Reference tr) { + state DataClusterMetadata currentDataClusterMetadata = wait(getClusterTransaction(tr, self->clusterName.get())); + + self->metaclusterRegistration = self->metaclusterRegistration.get().toDataClusterRegistration( + self->clusterName.get(), currentDataClusterMetadata.entry.id); + + self->dataClusterMetadata = currentDataClusterMetadata; + if (!self->dataClusterDb) { + wait(store(self->dataClusterDb, openDatabase(self->dataClusterMetadata.get().connectionString))); + } + + return Void(); + } + + // Sets the cluster used in this context. This must be called from a management cluster transaction, and it + // will load the cluster metadata and connect to the cluster. + Future setCluster(Reference tr, ClusterName clusterName) { + ASSERT(!this->clusterName.present()); + ASSERT(!dataClusterMetadata.present()); + ASSERT(metaclusterRegistration.get().clusterType == ClusterType::METACLUSTER_MANAGEMENT); + this->clusterName = clusterName; + return updateClusterName(this, tr); + } + + // Clears the chosen cluster for this context. This is useful if we are retrying a transaction that expects an + // uninitialized cluster. + void clearCluster() { + clusterName = {}; + dataClusterMetadata = {}; + dataClusterDb = {}; + if (metaclusterRegistration.present() && + metaclusterRegistration.get().clusterType == ClusterType::METACLUSTER_DATA) { + metaclusterRegistration = metaclusterRegistration.get().toManagementClusterRegistration(); + } + } +}; + +template +Future> tryGetTenantTransaction(Transaction tr, TenantName name) { + tr->setOption(FDBTransactionOptions::RAW_ACCESS); + return ManagementClusterMetadata::tenantMetadata.tenantMap.get(tr, name); +} + +ACTOR template +Future> tryGetTenant(Reference db, TenantName name) { + state Reference tr = db->createTransaction(); + + loop { + try { + tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE); + Optional entry = wait(tryGetTenantTransaction(tr, name)); + return entry; + } catch (Error& e) { + wait(safeThreadFutureToFuture(tr->onError(e))); + } + } +} + +ACTOR template +Future getTenantTransaction(Transaction tr, TenantName name) { + Optional entry = wait(tryGetTenantTransaction(tr, name)); + if (!entry.present()) { + throw tenant_not_found(); + } + + return entry.get(); +} + +ACTOR template +Future getTenant(Reference db, TenantName name) { + Optional entry = wait(tryGetTenant(db, name)); + if (!entry.present()) { + throw tenant_not_found(); + } + + return entry.get(); +} + ACTOR template Future managementClusterCheckEmpty(Transaction tr) { state Future>> tenantsFuture = @@ -357,6 +547,9 @@ void updateClusterMetadata(Transaction tr, Optional updatedEntry) { if (updatedEntry.present()) { + if (previousMetadata.entry.locked) { + throw cluster_locked(); + } ManagementClusterMetadata::dataClusters.set(tr, name, updatedEntry.get()); updateClusterCapacityIndex(tr, name, previousMetadata.entry, updatedEntry.get()); } @@ -365,181 +558,143 @@ void updateClusterMetadata(Transaction tr, } } -ACTOR template -Future> -managementClusterRegisterPrecheck(Transaction tr, ClusterNameRef name, Optional metadata) { - state Future> dataClusterMetadataFuture = tryGetClusterTransaction(tr, name); +template +struct RegisterClusterImpl { + MetaclusterOperationContext ctx; - state Optional metaclusterRegistration = - wait(MetaclusterMetadata::metaclusterRegistration.get(tr)); + // Initialization parameters + ClusterName clusterName; + ClusterConnectionString connectionString; + DataClusterEntry clusterEntry; - if (!metaclusterRegistration.present() || - metaclusterRegistration.get().clusterType != ClusterType::METACLUSTER_MANAGEMENT) { - throw invalid_metacluster_operation(); - } + RegisterClusterImpl(Reference managementDb, + ClusterName clusterName, + ClusterConnectionString connectionString, + DataClusterEntry clusterEntry) + : ctx(managementDb), clusterName(clusterName), connectionString(connectionString), clusterEntry(clusterEntry) {} - state Optional dataClusterMetadata = wait(dataClusterMetadataFuture); - if (dataClusterMetadata.present() && - (!metadata.present() || !metadata.get().matchesConfiguration(dataClusterMetadata.get()))) { - throw cluster_already_exists(); - } - - return std::make_pair(metaclusterRegistration.get(), dataClusterMetadata.present()); -} - -ACTOR template -Future managementClusterRegister(Transaction tr, - ClusterNameRef name, - ClusterConnectionString connectionString, - DataClusterEntry entry) { - std::pair result = - wait(managementClusterRegisterPrecheck(tr, name, DataClusterMetadata(entry, connectionString))); - - if (!result.second) { - entry.allocated = ClusterUsage(); - - if (entry.hasCapacity()) { - ManagementClusterMetadata::clusterCapacityIndex.insert( - tr, Tuple::makeTuple(entry.allocated.numTenantGroups, name)); + // Check that cluster name is available + ACTOR static Future registrationPrecheck(RegisterClusterImpl* self, Reference tr) { + state Optional dataClusterMetadata = wait(tryGetClusterTransaction(tr, self->clusterName)); + if (dataClusterMetadata.present()) { + throw cluster_already_exists(); } - ManagementClusterMetadata::dataClusters.set(tr, name, entry); - ManagementClusterMetadata::dataClusterConnectionRecords.set(tr, name, connectionString); + + return Void(); } - return Void(); -} + ACTOR static Future configureDataCluster(RegisterClusterImpl* self) { + state Reference dataClusterDb = wait(openDatabase(self->connectionString)); + state Reference tr = dataClusterDb->createTransaction(); + loop { + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); -ACTOR template -Future dataClusterRegister(Transaction tr, - ClusterNameRef name, - ClusterNameRef metaclusterName, - UID metaclusterId) { - state Future>> existingTenantsFuture = - TenantAPI::listTenantsTransaction(tr, ""_sr, "\xff\xff"_sr, 1); - state typename transaction_future_type::type existingDataFuture = - tr->getRange(normalKeys, 1); - state Future> clusterRegistrationFuture = - MetaclusterMetadata::metaclusterRegistration.get(tr); + state Future>> existingTenantsFuture = + TenantAPI::listTenantsTransaction(tr, ""_sr, "\xff\xff"_sr, 1); + state ThreadFuture existingDataFuture = tr->getRange(normalKeys, 1); - state Optional existingRegistration = wait(clusterRegistrationFuture); - if (existingRegistration.present()) { - if (existingRegistration.get().clusterType != ClusterType::METACLUSTER_DATA || - existingRegistration.get().name != name || existingRegistration.get().metaclusterId != metaclusterId) { - throw cluster_already_registered(); - } else { - // We already successfully registered the cluster with these details, so there's nothing to do - ASSERT(existingRegistration.get().metaclusterName == metaclusterName); - return existingRegistration.get().id; + // Check whether this cluster has already been registered + state Optional existingRegistration = + wait(MetaclusterMetadata::metaclusterRegistration.get(tr)); + if (existingRegistration.present()) { + if (existingRegistration.get().clusterType != ClusterType::METACLUSTER_DATA || + existingRegistration.get().name != self->clusterName || + !existingRegistration.get().matches(self->ctx.metaclusterRegistration.get())) { + throw cluster_already_registered(); + } else { + // We already successfully registered the cluster with these details, so there's nothing to do + return Void(); + } + } + + // Check for any existing data + std::vector> existingTenants = + wait(safeThreadFutureToFuture(existingTenantsFuture)); + if (!existingTenants.empty()) { + TraceEvent(SevWarn, "CannotRegisterClusterWithTenants").detail("ClusterName", self->clusterName); + throw cluster_not_empty(); + } + + RangeResult existingData = wait(safeThreadFutureToFuture(existingDataFuture)); + if (!existingData.empty()) { + TraceEvent(SevWarn, "CannotRegisterClusterWithData").detail("ClusterName", self->clusterName); + throw cluster_not_empty(); + } + + self->clusterEntry.id = deterministicRandom()->randomUniqueID(); + MetaclusterMetadata::metaclusterRegistration.set( + tr, + self->ctx.metaclusterRegistration.get().toDataClusterRegistration(self->clusterName, + self->clusterEntry.id)); + + wait(buggifiedCommit(tr, BUGGIFY)); + + TraceEvent("ConfiguredDataCluster") + .detail("ClusterName", self->clusterName) + .detail("ClusterID", self->clusterEntry.id) + .detail("Capacity", self->clusterEntry.capacity) + .detail("Version", tr->getCommittedVersion()) + .detail("ConnectionString", self->connectionString.toString()); + + return Void(); + } catch (Error& e) { + wait(safeThreadFutureToFuture(tr->onError(e))); + } } } - std::vector> existingTenants = - wait(safeThreadFutureToFuture(existingTenantsFuture)); - if (!existingTenants.empty()) { - TraceEvent(SevWarn, "CannotRegisterClusterWithTenants").detail("ClusterName", name); - throw cluster_not_empty(); + // Store the cluster entry for the new cluster + ACTOR static Future registerInManagementCluster(RegisterClusterImpl* self, + Reference tr) { + state Optional dataClusterMetadata = wait(tryGetClusterTransaction(tr, self->clusterName)); + if (dataClusterMetadata.present() && !dataClusterMetadata.get().matchesConfiguration( + DataClusterMetadata(self->clusterEntry, self->connectionString))) { + throw cluster_already_exists(); + } else if (!dataClusterMetadata.present()) { + self->clusterEntry.allocated = ClusterUsage(); + + if (self->clusterEntry.hasCapacity()) { + ManagementClusterMetadata::clusterCapacityIndex.insert( + tr, Tuple::makeTuple(self->clusterEntry.allocated.numTenantGroups, self->clusterName)); + } + ManagementClusterMetadata::dataClusters.set(tr, self->clusterName, self->clusterEntry); + ManagementClusterMetadata::dataClusterConnectionRecords.set(tr, self->clusterName, self->connectionString); + } + + TraceEvent("RegisteredDataCluster") + .detail("ClusterName", self->clusterName) + .detail("ClusterID", self->clusterEntry.id) + .detail("Capacity", self->clusterEntry.capacity) + .detail("Version", tr->getCommittedVersion()) + .detail("ConnectionString", self->connectionString.toString()); + + return Void(); } - RangeResult existingData = wait(safeThreadFutureToFuture(existingDataFuture)); - if (!existingData.empty()) { - TraceEvent(SevWarn, "CannotRegisterClusterWithData").detail("ClusterName", name); - throw cluster_not_empty(); + ACTOR static Future run(RegisterClusterImpl* self) { + wait(self->ctx.runManagementTransaction( + [self = self](Reference tr) { return registrationPrecheck(self, tr); })); + // Don't use ctx to run this transaction because we have not set up the data cluster metadata on it and we don't + // have a metacluster registration on the data cluster + wait(configureDataCluster(self)); + wait(self->ctx.runManagementTransaction( + [self = self](Reference tr) { return registerInManagementCluster(self, tr); })); + return Void(); } - - state UID clusterId = deterministicRandom()->randomUniqueID(); - MetaclusterMetadata::metaclusterRegistration.set( - tr, MetaclusterRegistrationEntry(metaclusterName, name, metaclusterId, clusterId)); - - return clusterId; -} + Future run() { return run(this); } +}; ACTOR template Future registerCluster(Reference db, ClusterName name, ClusterConnectionString connectionString, DataClusterEntry entry) { - if (name.startsWith("\xff"_sr)) { - throw invalid_cluster_name(); - } - - state MetaclusterRegistrationEntry managementClusterRegistration; - - // Step 1: Check for a conflicting cluster in the management cluster and get the metacluster ID - state Reference precheckTr = db->createTransaction(); - loop { - try { - precheckTr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); - std::pair result = - wait(managementClusterRegisterPrecheck(precheckTr, name, Optional())); - managementClusterRegistration = result.first; - - wait(buggifiedCommit(precheckTr, BUGGIFY)); - break; - } catch (Error& e) { - wait(safeThreadFutureToFuture(precheckTr->onError(e))); - } - } - - // Step 2: Configure the data cluster as a subordinate cluster - state Reference dataClusterDb = wait(openDatabase(connectionString)); - state Reference dataClusterTr = dataClusterDb->createTransaction(); - loop { - try { - dataClusterTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - UID clusterId = wait(dataClusterRegister( - dataClusterTr, name, managementClusterRegistration.name, managementClusterRegistration.metaclusterId)); - entry.id = clusterId; - - wait(buggifiedCommit(dataClusterTr, BUGGIFY)); - - TraceEvent("ConfiguredDataCluster") - .detail("ClusterName", name) - .detail("ClusterID", entry.id) - .detail("Capacity", entry.capacity) - .detail("Version", dataClusterTr->getCommittedVersion()) - .detail("ConnectionString", connectionString.toString()); - - break; - } catch (Error& e) { - wait(safeThreadFutureToFuture(dataClusterTr->onError(e))); - } - } - - // Step 3: Register the cluster in the management cluster - state Reference registerTr = db->createTransaction(); - loop { - try { - registerTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - wait(managementClusterRegister(registerTr, name, connectionString, entry)); - wait(buggifiedCommit(registerTr, BUGGIFY)); - - TraceEvent("RegisteredDataCluster") - .detail("ClusterName", name) - .detail("ClusterID", entry.id) - .detail("Capacity", entry.capacity) - .detail("Version", registerTr->getCommittedVersion()) - .detail("ConnectionString", connectionString.toString()); - - break; - } catch (Error& e) { - wait(safeThreadFutureToFuture(registerTr->onError(e))); - } - } - + state RegisterClusterImpl impl(db, name, connectionString, entry); + wait(impl.run()); return Void(); } -ACTOR template -Future> restoreClusterTransaction(Transaction tr, - ClusterName name, - std::string connectionString, - DataClusterEntry entry, - AddNewTenants addNewTenants, - RemoveMissingTenants removeMissingTenants) { - wait(delay(0)); // TODO: remove when implementation is added - return Optional(); -} - ACTOR template Future restoreCluster(Reference db, ClusterName name, @@ -547,279 +702,215 @@ Future restoreCluster(Reference db, DataClusterEntry entry, AddNewTenants addNewTenants, RemoveMissingTenants removeMissingTenants) { - state Reference tr = db->createTransaction(); + // TODO: add implementation + wait(delay(0.0)); + return Void(); +} - loop { - try { - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); +template +struct RemoveClusterImpl { + MetaclusterOperationContext ctx; - state Optional newCluster = - wait(restoreCluster(tr, name, connectionString, entry, addNewTenants, removeMissingTenants)); + // Initialization parameters + bool forceRemove; - wait(buggifiedCommit(tr, BUGGIFY)); + // Parameters set in lockDataCluster + Optional lastTenantId; - TraceEvent("RestoredDataCluster") - .detail("ClusterName", name) - .detail("ClusterId", newCluster.present() ? newCluster.get().id : UID()) - .detail("Version", tr->getCommittedVersion()); + RemoveClusterImpl(Reference managementDb, ClusterName clusterName, bool forceRemove) + : ctx(managementDb, clusterName), forceRemove(forceRemove) {} - return Void(); - } catch (Error& e) { - wait(safeThreadFutureToFuture(tr->onError(e))); + // Returns false if the cluster is no longer present, or true if it is present and the removal should proceed. + ACTOR static Future lockDataCluster(RemoveClusterImpl* self, Reference tr) { + if (!self->forceRemove && self->ctx.dataClusterMetadata.get().entry.allocated.numTenantGroups > 0) { + throw cluster_not_empty(); + } else if (!self->ctx.dataClusterMetadata.get().entry.locked) { + // Lock the cluster while we finish the remaining removal steps to prevent new tenants from being + // assigned to it. + DataClusterEntry updatedEntry = self->ctx.dataClusterMetadata.get().entry; + updatedEntry.locked = true; + updatedEntry.capacity.numTenantGroups = 0; + + updateClusterMetadata(tr, + self->ctx.clusterName.get(), + self->ctx.dataClusterMetadata.get(), + Optional(), + updatedEntry); } - } -} -// Returns the cluster metadata for the cluster being deleted, as well as a boolean that will be true if the entry -// has been removed. If false, then it's the responsibility of the caller to purge the data cluster from the management -// cluster. -ACTOR template -Future, bool>> managementClusterRemove(Transaction tr, - ClusterNameRef name, - bool checkEmpty) { - tr->setOption(FDBTransactionOptions::RAW_ACCESS); + ManagementClusterMetadata::clusterCapacityIndex.erase( + tr, + Tuple::makeTuple(self->ctx.dataClusterMetadata.get().entry.allocated.numTenantGroups, + self->ctx.clusterName.get())); - state Optional metadata = wait(tryGetClusterTransaction(tr, name)); - if (!metadata.present()) { - return std::make_pair(metadata, true); + // Get the last allocated tenant ID to be used on the detached data cluster + if (self->forceRemove) { + Optional lastId = wait(ManagementClusterMetadata::tenantMetadata.lastTenantId.get(tr)); + self->lastTenantId = lastId; + } + + TraceEvent("LockedDataCluster") + .detail("Name", self->ctx.clusterName.get()) + .detail("Version", tr->getCommittedVersion()); + + return true; } - bool purged = false; - if (checkEmpty && metadata.get().entry.allocated.numTenantGroups > 0) { - throw cluster_not_empty(); - } else if (metadata.get().entry.allocated.numTenantGroups == 0) { - ManagementClusterMetadata::dataClusters.erase(tr, name); - ManagementClusterMetadata::dataClusterConnectionRecords.erase(tr, name); - purged = true; - } else { - // We need to clean up the tenant metadata for this cluster before erasing it. While we are doing that, - // lock the entry to prevent other assignments. - DataClusterEntry updatedEntry = metadata.get().entry; - updatedEntry.locked = true; + // Delete metacluster metadata from the data cluster + ACTOR static Future updateDataCluster(RemoveClusterImpl* self, Reference tr) { + // Delete metacluster related metadata + MetaclusterMetadata::metaclusterRegistration.clear(tr); + TenantMetadata::tenantTombstones.clear(tr); + TenantMetadata::tombstoneCleanupData.clear(tr); - updateClusterMetadata(tr, name, metadata.get(), Optional(), updatedEntry); + // If we are force removing a cluster, then it will potentially contain tenants that have IDs + // larger than the next tenant ID to be allocated on the cluster. To avoid collisions, we advance + // the ID so that it will be the larger of the current one on the data cluster and the management + // cluster. + if (self->lastTenantId.present()) { + Optional lastId = wait(TenantMetadata::lastTenantId.get(tr)); + if (!lastId.present() || lastId.get() < self->lastTenantId.get()) { + TenantMetadata::lastTenantId.set(tr, self->lastTenantId.get()); + } + } + + TraceEvent("ReconfiguredDataCluster") + .detail("Name", self->ctx.clusterName.get()) + .detail("Version", tr->getCommittedVersion()); + + return Void(); } - ManagementClusterMetadata::clusterCapacityIndex.erase( - tr, Tuple::makeTuple(metadata.get().entry.allocated.numTenantGroups, name)); + // Returns true if all tenants have been purged + ACTOR static Future purgeTenants(RemoveClusterImpl* self, + Reference tr, + std::pair clusterTupleRange) { + // Get the list of tenants + state Future> tenantEntriesFuture = + ManagementClusterMetadata::clusterTenantIndex.getRange( + tr, clusterTupleRange.first, clusterTupleRange.second, CLIENT_KNOBS->REMOVE_CLUSTER_TENANT_BATCH_SIZE); - return std::make_pair(metadata, purged); -} + state KeyBackedRangeResult tenantEntries = wait(tenantEntriesFuture); -ACTOR template -Future managementClusterPurgeDataCluster(Reference db, ClusterNameRef name, UID dataClusterId) { - state Reference tr = db->createTransaction(); - state std::pair clusterTupleRange = - std::make_pair(Tuple::makeTuple(name), Tuple::makeTuple(keyAfter(name))); + // Erase each tenant from the tenant map on the management cluster + for (Tuple entry : tenantEntries.results) { + ASSERT(entry.getString(0) == self->ctx.clusterName.get()); + ManagementClusterMetadata::tenantMetadata.tenantMap.erase(tr, entry.getString(1)); + } - loop { - try { - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - state Future> tenantEntriesFuture = - ManagementClusterMetadata::clusterTenantIndex.getRange(tr, - clusterTupleRange.first, - clusterTupleRange.second, - CLIENT_KNOBS->REMOVE_CLUSTER_TENANT_BATCH_SIZE); - - Optional clusterMetadata = wait(tryGetClusterTransaction(tr, name)); - if (!clusterMetadata.present() && clusterMetadata.get().entry.id != dataClusterId) { - // Someone else must have already done the purge - return Void(); - } - - state KeyBackedRangeResult tenantEntries = wait(tenantEntriesFuture); - if (tenantEntries.results.empty()) { - break; - } - - for (Tuple entry : tenantEntries.results) { - ASSERT(entry.getString(0) == name); - ManagementClusterMetadata::tenantMetadata.tenantMap.erase(tr, entry.getString(1)); - } - - ManagementClusterMetadata::tenantMetadata.tenantCount.atomicOp( - tr, -tenantEntries.results.size(), MutationRef::AddValue); - ManagementClusterMetadata::clusterTenantCount.atomicOp( - tr, name, -tenantEntries.results.size(), MutationRef::AddValue); - - // Erase all of the tenants processed in this transaction from the cluster tenant index + // Erase all of the tenants processed in this transaction from the cluster tenant index + if (!tenantEntries.results.empty()) { ManagementClusterMetadata::clusterTenantIndex.erase( tr, clusterTupleRange.first, - Tuple::makeTuple(name, keyAfter(tenantEntries.results.rbegin()->getString(1)))); - - wait(buggifiedCommit(tr, BUGGIFY)); - tr->reset(); - - if (!tenantEntries.more) { - break; - } - } catch (Error& e) { - wait(safeThreadFutureToFuture(tr->onError(e))); + Tuple::makeTuple(self->ctx.clusterName.get(), keyAfter(tenantEntries.results.rbegin()->getString(1)))); } + + ManagementClusterMetadata::tenantMetadata.tenantCount.atomicOp( + tr, -tenantEntries.results.size(), MutationRef::AddValue); + ManagementClusterMetadata::clusterTenantCount.atomicOp( + tr, self->ctx.clusterName.get(), -tenantEntries.results.size(), MutationRef::AddValue); + + return !tenantEntries.more; } - loop { - try { - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - state Future> tenantGroupEntriesFuture = - ManagementClusterMetadata::clusterTenantGroupIndex.getRange( - tr, - clusterTupleRange.first, - clusterTupleRange.second, - CLIENT_KNOBS->REMOVE_CLUSTER_TENANT_BATCH_SIZE); + // Returns true if all tenant groups and the data cluster have been purged + ACTOR static Future purgeTenantGroupsAndDataCluster(RemoveClusterImpl* self, + Reference tr, + std::pair clusterTupleRange) { + // Get the list of tenant groups + state Future> tenantGroupEntriesFuture = + ManagementClusterMetadata::clusterTenantGroupIndex.getRange( + tr, clusterTupleRange.first, clusterTupleRange.second, CLIENT_KNOBS->REMOVE_CLUSTER_TENANT_BATCH_SIZE); - Optional clusterMetadata = wait(tryGetClusterTransaction(tr, name)); - if (!clusterMetadata.present() && clusterMetadata.get().entry.id != dataClusterId) { - // Someone else must have already done the purge - return Void(); - } - - state KeyBackedRangeResult tenantGroupEntries = wait(tenantGroupEntriesFuture); - for (Tuple entry : tenantGroupEntries.results) { - ASSERT(entry.getString(0) == name); - TenantGroupName tenantGroup = entry.getString(1); - ManagementClusterMetadata::tenantMetadata.tenantGroupTenantIndex.erase( - tr, Tuple::makeTuple(tenantGroup), Tuple::makeTuple(keyAfter(tenantGroup))); - ManagementClusterMetadata::tenantMetadata.tenantGroupMap.erase(tr, tenantGroup); - } - - if (!tenantGroupEntries.results.empty()) { - // Erase all of the tenants processed in this transaction from the cluster tenant index - ManagementClusterMetadata::clusterTenantIndex.erase( - tr, - clusterTupleRange.first, - Tuple::makeTuple(name, keyAfter(tenantGroupEntries.results.rbegin()->getString(1)))); - } - - if (!tenantGroupEntries.more) { - ManagementClusterMetadata::dataClusters.erase(tr, name); - ManagementClusterMetadata::dataClusterConnectionRecords.erase(tr, name); - } - - wait(buggifiedCommit(tr, BUGGIFY)); - tr->reset(); - - if (!tenantGroupEntries.more) { - TraceEvent("RemovedDataCluster").detail("Name", name).detail("Version", tr->getCommittedVersion()); - break; - } - } catch (Error& e) { - wait(safeThreadFutureToFuture(tr->onError(e))); + // Erase each tenant group from the tenant group map and the tenant group tenant index + state KeyBackedRangeResult tenantGroupEntries = wait(tenantGroupEntriesFuture); + for (Tuple entry : tenantGroupEntries.results) { + ASSERT(entry.getString(0) == self->ctx.clusterName.get()); + TenantGroupName tenantGroup = entry.getString(1); + ManagementClusterMetadata::tenantMetadata.tenantGroupTenantIndex.erase( + tr, Tuple::makeTuple(tenantGroup), Tuple::makeTuple(keyAfter(tenantGroup))); + ManagementClusterMetadata::tenantMetadata.tenantGroupMap.erase(tr, tenantGroup); } + + if (!tenantGroupEntries.results.empty()) { + // Erase all of the tenant groups processed in this transaction from the cluster tenant group index + ManagementClusterMetadata::clusterTenantGroupIndex.erase( + tr, + clusterTupleRange.first, + Tuple::makeTuple(self->ctx.clusterName.get(), + keyAfter(tenantGroupEntries.results.rbegin()->getString(1)))); + } + + // Erase the data cluster record from the management cluster if processing our last batch + if (!tenantGroupEntries.more) { + ManagementClusterMetadata::dataClusters.erase(tr, self->ctx.clusterName.get()); + ManagementClusterMetadata::dataClusterConnectionRecords.erase(tr, self->ctx.clusterName.get()); + } + + return !tenantGroupEntries.more; } - return Void(); -} + // Remove all metadata associated with the data cluster from the management cluster + ACTOR static Future managementClusterPurgeDataCluster(RemoveClusterImpl* self) { + state std::pair clusterTupleRange = std::make_pair( + Tuple::makeTuple(self->ctx.clusterName.get()), Tuple::makeTuple(keyAfter(self->ctx.clusterName.get()))); -ACTOR template -Future dataClusterRemove(Transaction tr, Optional lastTenantId, UID dataClusterId) { - state Optional metaclusterRegistration = - wait(MetaclusterMetadata::metaclusterRegistration.get(tr)); - if (!metaclusterRegistration.present()) { + // First remove all tenants associated with the data cluster from the management cluster + loop { + bool clearedAll = wait(self->ctx.runManagementTransaction( + [self = self, clusterTupleRange = clusterTupleRange](Reference tr) { + return purgeTenants(self, tr, clusterTupleRange); + })); + + if (clearedAll) { + break; + } + } + + // Next remove all tenant groups associated with the data cluster from the management cluster + loop { + bool clearedAll = wait(self->ctx.runManagementTransaction( + [self = self, clusterTupleRange = clusterTupleRange](Reference tr) { + return purgeTenantGroupsAndDataCluster(self, tr, clusterTupleRange); + })); + if (clearedAll) { + break; + } + } + + TraceEvent("RemovedDataCluster").detail("Name", self->ctx.clusterName.get()); return Void(); } - if (metaclusterRegistration.get().id != dataClusterId) { - return Void(); - } + ACTOR static Future run(RemoveClusterImpl* self) { + bool clusterIsPresent = wait(self->ctx.runManagementTransaction( + [self = self](Reference tr) { return lockDataCluster(self, tr); })); - MetaclusterMetadata::metaclusterRegistration.clear(tr); - TenantMetadata::tenantTombstones.clear(tr); - TenantMetadata::tombstoneCleanupData.clear(tr); - - // If we are force removing a cluster, then it will potentially contain tenants that have IDs - // larger than the next tenant ID to be allocated on the cluster. To avoid collisions, we advance - // the ID so that it will be the larger of the current one on the data cluster and the management - // cluster. - if (lastTenantId.present()) { - Optional lastId = wait(TenantMetadata::lastTenantId.get(tr)); - if (!lastId.present() || lastId.get() < lastTenantId.get()) { - TenantMetadata::lastTenantId.set(tr, lastTenantId.get()); - } - } - - return Void(); -} - -ACTOR template -Future removeCluster(Reference db, ClusterName name, bool forceRemove) { - // Step 1: Remove the data cluster from the metacluster - state Reference tr = db->createTransaction(); - state DataClusterMetadata metadata; - state Optional lastTenantId; - state Optional removedId; - state bool hasBeenPurged = false; - - loop { - try { - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - - std::pair, bool> result = - wait(managementClusterRemove(tr, name, !forceRemove)); - if (!result.first.present()) { - if (!removedId.present()) { - throw cluster_not_found(); - } else { - return Void(); + if (clusterIsPresent) { + try { + wait(self->ctx.runDataClusterTransaction( + [self = self](Reference tr) { return updateDataCluster(self, tr); })); + } catch (Error& e) { + // If this transaction gets retried, the metacluster information may have already been erased. + if (e.code() != error_code_invalid_metacluster_operation) { + throw; } } - metadata = result.first.get(); - if (!removedId.present()) { - removedId = metadata.entry.id; - } else if (removedId.get() != metadata.entry.id) { - // The cluster we were removing is gone and has already been replaced - return Void(); - } - - hasBeenPurged = result.second; - if (forceRemove) { - Optional lastId = wait(ManagementClusterMetadata::tenantMetadata.lastTenantId.get(tr)); - lastTenantId = lastId; - } - - wait(buggifiedCommit(tr, BUGGIFY)); - - if (hasBeenPurged) { - TraceEvent("RemovedDataCluster").detail("Name", name).detail("Version", tr->getCommittedVersion()); - } else { - TraceEvent("LockedDataCluster").detail("Name", name).detail("Version", tr->getCommittedVersion()); - } - break; - } catch (Error& e) { - wait(safeThreadFutureToFuture(tr->onError(e))); - } - } - - ASSERT(removedId.present()); - - // Step 2: Purge all metadata associated with the data cluster from the management cluster if this was not already - // completed in step 1. - if (!hasBeenPurged) { - wait(managementClusterPurgeDataCluster(db, name, removedId.get())); - } - - // Step 3: Update the data cluster to mark it as removed. - // Note that this is best effort; if it fails the cluster will still have been removed. - state Reference dataClusterDb = wait(openDatabase(metadata.connectionString)); - state Reference dataClusterTr = dataClusterDb->createTransaction(); - loop { - try { - dataClusterTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - - wait(dataClusterRemove(dataClusterTr, lastTenantId, removedId.get())); - wait(buggifiedCommit(dataClusterTr, BUGGIFY)); - - TraceEvent("ReconfiguredDataCluster") - .detail("Name", name) - .detail("Version", dataClusterTr->getCommittedVersion()); - break; - } catch (Error& e) { - wait(safeThreadFutureToFuture(dataClusterTr->onError(e))); + // This runs multiple transactions, so the run transaction calls are inside the function + wait(managementClusterPurgeDataCluster(self)); } + + return Void(); } + Future run() { return run(this); } +}; +ACTOR template +Future removeCluster(Reference db, ClusterName name, bool forceRemove) { + state RemoveClusterImpl impl(db, name, forceRemove); + wait(impl.run()); return Void(); } @@ -953,19 +1044,17 @@ Future managementClusterRemoveTenantFromGroup(Transaction tr, template struct CreateTenantImpl { + MetaclusterOperationContext ctx; + // Initialization parameters - Reference managementDb; TenantName tenantName; TenantMapEntry tenantEntry; - // Parameters set in assignTenantAndStoreInManagementCluster - DataClusterMetadata clusterMetadata; - // Parameter set if tenant creation permanently fails on the data cluster Optional replaceExistingTenantId; CreateTenantImpl(Reference managementDb, TenantName tenantName, TenantMapEntry tenantEntry) - : managementDb(managementDb), tenantName(tenantName), tenantEntry(tenantEntry) {} + : ctx(managementDb), tenantName(tenantName), tenantEntry(tenantEntry) {} ACTOR static Future checkClusterAvailability(Reference dataClusterDb, ClusterName clusterName) { @@ -983,9 +1072,39 @@ struct CreateTenantImpl { } } + // Returns true if the tenant is already assigned and can proceed to the next step and false if it needs + // to be created. Throws an error if the tenant already exists and cannot be created. + ACTOR static Future checkForExistingTenant(CreateTenantImpl* self, Reference tr) { + // Check if the tenant already exists. If it's partially created and matches the parameters we + // specified, continue creating it. Otherwise, fail with an error. + state Optional existingEntry = wait(tryGetTenantTransaction(tr, self->tenantName)); + if (existingEntry.present()) { + if (!existingEntry.get().matchesConfiguration(self->tenantEntry) || + (self->replaceExistingTenantId.present() && + existingEntry.get().id != self->replaceExistingTenantId.get()) || + existingEntry.get().tenantState != TenantState::REGISTERING) { + // The tenant already exists and is either completely created, has a different + // configuration, or is a different tenant than the one we intend to replace + throw tenant_already_exists(); + } else if (!self->replaceExistingTenantId.present()) { + // The tenant creation has already started, so resume where we left off + self->tenantEntry = existingEntry.get(); + ASSERT(existingEntry.get().assignedCluster.present()); + + wait(self->ctx.setCluster(tr, existingEntry.get().assignedCluster.get())); + return true; + } else { + // The previous creation is permanently failed, so create it again from scratch + } + } + + return false; + } + // Returns a pair with the name of the assigned cluster and whether the group was already assigned ACTOR static Future> assignTenant(CreateTenantImpl* self, Reference tr) { + // If our tenant group is already assigned, then we just use that assignment state Optional groupEntry; if (self->tenantEntry.tenantGroup.present()) { Optional _groupEntry = wait( @@ -998,6 +1117,7 @@ struct CreateTenantImpl { } } + // Get a set of the most full clusters that still have capacity state KeyBackedSet::RangeResultType availableClusters = wait(ManagementClusterMetadata::clusterCapacityIndex.getRange( tr, {}, {}, CLIENT_KNOBS->METACLUSTER_ASSIGNMENT_CLUSTERS_TO_CHECK, Snapshot::False, Reverse::True)); @@ -1013,12 +1133,15 @@ struct CreateTenantImpl { wait(waitForAll(dataClusterDbs)); + // Check the availability of our set of clusters state std::vector> clusterAvailabilityChecks; for (int i = 0; i < availableClusters.results.size(); ++i) { clusterAvailabilityChecks.push_back( checkClusterAvailability(dataClusterDbs[i].get(), availableClusters.results[i].getString(1))); } + // Wait for a successful availability check from some cluster. We prefer the most full cluster, but if it + // doesn't return quickly we may choose another. Optional clusterAvailabilityCheck = wait(timeout( success(clusterAvailabilityChecks[0]) || (delay(CLIENT_KNOBS->METACLUSTER_ASSIGNMENT_FIRST_CHOICE_DELAY) && waitForAny(clusterAvailabilityChecks)), @@ -1029,6 +1152,7 @@ struct CreateTenantImpl { throw transaction_too_old(); } + // Get the first cluster that was available state Optional chosenCluster; for (auto f : clusterAvailabilityChecks) { if (f.isReady()) { @@ -1041,156 +1165,103 @@ struct CreateTenantImpl { return std::make_pair(chosenCluster.get(), false); } - ACTOR static Future> managementClusterCreateTenant( - CreateTenantImpl* self, - Reference tr) { - ASSERT(self->tenantEntry.assignedCluster.present()); - ASSERT(self->tenantEntry.id >= 0); - - if (self->tenantName.startsWith("\xff"_sr)) { - throw invalid_tenant_name(); + ACTOR static Future assignTenantAndStoreInManagementCluster(CreateTenantImpl* self, + Reference tr) { + // If the tenant already exists, we either throw an error from this function or move on to the next phase + bool tenantExists = wait(checkForExistingTenant(self, tr)); + if (tenantExists) { + return Void(); } - state Future> existingEntryFuture = tryGetTenantTransaction(tr, self->tenantName); - Optional existingEntry = wait(existingEntryFuture); - // If we already have a tenant creation entry, then we don't need to update the map unless we are trying to - // replace the existing one. Only replace the existing one if its ID matches the ID we are trying to replace and - // it is in the REGISTERING phase. - if (existingEntry.present() && (!self->replaceExistingTenantId.present() || - existingEntry.get().id != self->replaceExistingTenantId.get() || - existingEntry.get().tenantState != TenantState::REGISTERING)) { - return std::make_pair(existingEntry.get(), false); - } + // Choose a cluster for the tenant + state std::pair assignment = wait(assignTenant(self, tr)); + self->tenantEntry.assignedCluster = assignment.first; + + // Update the context with the chosen cluster + state Future setClusterFuture = self->ctx.setCluster(tr, assignment.first); + + // Create a tenant entry in the management cluster + Optional lastId = wait(ManagementClusterMetadata::tenantMetadata.lastTenantId.get(tr)); + self->tenantEntry.setId(lastId.orDefault(-1) + 1); + ManagementClusterMetadata::tenantMetadata.lastTenantId.set(tr, self->tenantEntry.id); self->tenantEntry.tenantState = TenantState::REGISTERING; ManagementClusterMetadata::tenantMetadata.tenantMap.set(tr, self->tenantName, self->tenantEntry); - if (!existingEntry.present()) { - ManagementClusterMetadata::tenantMetadata.tenantCount.atomicOp(tr, 1, MutationRef::AddValue); - ManagementClusterMetadata::clusterTenantCount.atomicOp( - tr, self->tenantEntry.assignedCluster.get(), 1, MutationRef::AddValue); + ManagementClusterMetadata::tenantMetadata.tenantCount.atomicOp(tr, 1, MutationRef::AddValue); + ManagementClusterMetadata::clusterTenantCount.atomicOp( + tr, self->tenantEntry.assignedCluster.get(), 1, MutationRef::AddValue); - int64_t clusterTenantCount = wait(ManagementClusterMetadata::clusterTenantCount.getD( - tr, self->tenantEntry.assignedCluster.get(), Snapshot::False, 0)); + int64_t clusterTenantCount = wait(ManagementClusterMetadata::clusterTenantCount.getD( + tr, self->tenantEntry.assignedCluster.get(), Snapshot::False, 0)); - if (clusterTenantCount > CLIENT_KNOBS->MAX_TENANTS_PER_CLUSTER) { - throw cluster_no_capacity(); - } + if (clusterTenantCount > CLIENT_KNOBS->MAX_TENANTS_PER_CLUSTER) { + throw cluster_no_capacity(); } - return std::make_pair(self->tenantEntry, true); - } + // Updated indexes to include the new tenant + ManagementClusterMetadata::clusterTenantIndex.insert( + tr, Tuple::makeTuple(self->tenantEntry.assignedCluster.get(), self->tenantName)); - ACTOR static Future assignTenantAndStoreInManagementCluster(CreateTenantImpl* self) { - state Reference tr = self->managementDb->createTransaction(); - loop { - try { - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + wait(setClusterFuture); - state Future> assignmentFuture = assignTenant(self, tr); + managementClusterAddTenantToGroup( + tr, self->tenantName, self->tenantEntry, self->ctx.dataClusterMetadata.get(), assignment.second); - wait(success(TenantAPI::checkTenantMode(tr, ClusterType::METACLUSTER_MANAGEMENT))); - - state std::pair assignment = wait(assignmentFuture); - self->tenantEntry.assignedCluster = assignment.first; - state Future dataClusterMetadataFuture = - getClusterTransaction(tr, assignment.first); - - Optional lastId = wait(ManagementClusterMetadata::tenantMetadata.lastTenantId.get(tr)); - self->tenantEntry.setId(lastId.orDefault(-1) + 1); - - state std::pair createResult = wait(managementClusterCreateTenant(self, tr)); - state TenantMapEntry createdEntry = createResult.first; - - DataClusterMetadata _clusterMetadata = wait(dataClusterMetadataFuture); - self->clusterMetadata = _clusterMetadata; - - if (!createResult.second) { - if (!createdEntry.matchesConfiguration(self->tenantEntry) || - createdEntry.tenantState != TenantState::REGISTERING) { - throw tenant_already_exists(); - } else if (self->tenantEntry.assignedCluster != createdEntry.assignedCluster) { - ASSERT(createdEntry.assignedCluster.present()); - - DataClusterMetadata actualMetadata = - wait(getClusterTransaction(tr, createdEntry.assignedCluster.get())); - - self->clusterMetadata = actualMetadata; - } - } else { - ManagementClusterMetadata::clusterTenantIndex.insert( - tr, Tuple::makeTuple(createdEntry.assignedCluster.get(), self->tenantName)); - - managementClusterAddTenantToGroup( - tr, self->tenantName, createdEntry, self->clusterMetadata, assignment.second); - - ManagementClusterMetadata::tenantMetadata.lastTenantId.set(tr, createdEntry.id); - wait(buggifiedCommit(tr, BUGGIFY)); - } - - self->tenantEntry = createdEntry; - return Void(); - } catch (Error& e) { - wait(safeThreadFutureToFuture(tr->onError(e))); - } - } + return Void(); } // Returns true if the tenant creation should continue - ACTOR static Future storeTenantInDataCluster(CreateTenantImpl* self) { - state Reference dataClusterDb = wait(openDatabase(self->clusterMetadata.connectionString)); - Optional dataClusterTenant = wait( - TenantAPI::createTenant(dataClusterDb, self->tenantName, self->tenantEntry, ClusterType::METACLUSTER_DATA)); + ACTOR static Future storeTenantInDataCluster(CreateTenantImpl* self, Reference tr) { + std::pair, bool> dataClusterTenant = wait( + TenantAPI::createTenantTransaction(tr, self->tenantName, self->tenantEntry, ClusterType::METACLUSTER_DATA)); - return dataClusterTenant.present(); + return dataClusterTenant.first.present(); } - ACTOR static Future markTenantReady(CreateTenantImpl* self) { - state Reference tr = self->managementDb->createTransaction(); - loop { - try { - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - state Future tenantModeCheck = - TenantAPI::checkTenantMode(tr, ClusterType::METACLUSTER_MANAGEMENT); - state Optional managementEntry = wait(tryGetTenantTransaction(tr, self->tenantName)); - wait(tenantModeCheck); - if (!managementEntry.present()) { - throw tenant_removed(); - } else if (managementEntry.get().id != self->tenantEntry.id) { - throw tenant_already_exists(); - } + ACTOR static Future markTenantReady(CreateTenantImpl* self, Reference tr) { + state Optional managementEntry = wait(tryGetTenantTransaction(tr, self->tenantName)); + if (!managementEntry.present()) { + throw tenant_removed(); + } else if (managementEntry.get().id != self->tenantEntry.id) { + throw tenant_already_exists(); + } - if (managementEntry.get().tenantState == TenantState::REGISTERING) { - TenantMapEntry updatedEntry = managementEntry.get(); - updatedEntry.tenantState = TenantState::READY; - ManagementClusterMetadata::tenantMetadata.tenantMap.set(tr, self->tenantName, updatedEntry); - wait(buggifiedCommit(tr, BUGGIFY)); - } - - break; - } catch (Error& e) { - wait(safeThreadFutureToFuture(tr->onError(e))); - } + if (managementEntry.get().tenantState == TenantState::REGISTERING) { + TenantMapEntry updatedEntry = managementEntry.get(); + updatedEntry.tenantState = TenantState::READY; + ManagementClusterMetadata::tenantMetadata.tenantMap.set(tr, self->tenantName, updatedEntry); } return Void(); } ACTOR static Future run(CreateTenantImpl* self) { + if (self->tenantName.startsWith("\xff"_sr)) { + throw invalid_tenant_name(); + } + loop { - wait(assignTenantAndStoreInManagementCluster(self)); + wait(self->ctx.runManagementTransaction([self = self](Reference tr) { + return assignTenantAndStoreInManagementCluster(self, tr); + })); + self->replaceExistingTenantId = {}; try { - bool tenantStored = wait(storeTenantInDataCluster(self)); + bool tenantStored = wait(self->ctx.runDataClusterTransaction( + [self = self](Reference tr) { return storeTenantInDataCluster(self, tr); })); + if (tenantStored) { - wait(markTenantReady(self)); + wait(self->ctx.runManagementTransaction( + [self = self](Reference tr) { return markTenantReady(self, tr); })); } return Void(); } catch (Error& e) { if (e.code() == error_code_tenant_creation_permanently_failed) { - // If the data cluster has permanently failed to create the tenant, then we can reassign it in the - // management cluster and start over + // If the data cluster has permanently failed to create the tenant, then we can reassign it in + // the management cluster and start over self->replaceExistingTenantId = self->tenantEntry.id; + self->ctx.clearCluster(); } else { throw; } @@ -1207,166 +1278,131 @@ Future createTenant(Reference db, TenantName name, TenantMapEntry tena return Void(); } -ACTOR template -Future managementClusterDeleteTenant(Transaction tr, TenantName name, int64_t tenantId) { - state Optional tenantEntry = wait(tryGetTenantTransaction(tr, name)); - - return Void(); -} - template struct DeleteTenantImpl { + MetaclusterOperationContext ctx; + // Initialization parameters - Reference managementDb; TenantName tenantName; // Parameters set in getAssignedLocation int64_t tenantId; - Reference dataClusterDb; - DeleteTenantImpl(Reference managementDb, TenantName tenantName) - : managementDb(managementDb), tenantName(tenantName) {} + DeleteTenantImpl(Reference managementDb, TenantName tenantName) : ctx(managementDb), tenantName(tenantName) {} - ACTOR static Future managementClusterEraseTenantEntry(Reference tr, - TenantName tenantName, - TenantMapEntry tenantEntry) { - // Erase the tenant entry itself - ManagementClusterMetadata::tenantMetadata.tenantMap.erase(tr, tenantName); + // Loads the cluster details for the cluster where the tenant is assigned. + // Returns true if the deletion is already in progress + ACTOR static Future getAssignedLocation(DeleteTenantImpl* self, Reference tr) { + state Optional tenantEntry = wait(tryGetTenantTransaction(tr, self->tenantName)); - // This is idempotent because this function is only called if the tenant is in the map - ManagementClusterMetadata::tenantMetadata.tenantCount.atomicOp(tr, -1, MutationRef::AddValue); - ManagementClusterMetadata::clusterTenantCount.atomicOp( - tr, tenantEntry.assignedCluster.get(), -1, MutationRef::AddValue); + if (!tenantEntry.present()) { + throw tenant_not_found(); + } - // Clean up cluster based tenant indices and remove the tenant group if it is empty - state DataClusterMetadata clusterMetadata = wait(getClusterTransaction(tr, tenantEntry.assignedCluster.get())); + self->tenantId = tenantEntry.get().id; - // Remove the tenant from the cluster -> tenant index - ManagementClusterMetadata::clusterTenantIndex.erase( - tr, Tuple::makeTuple(tenantEntry.assignedCluster.get(), tenantName)); + wait(self->ctx.setCluster(tr, tenantEntry.get().assignedCluster.get())); + return tenantEntry.get().tenantState == TenantState::REMOVING; + } - // Remove the tenant from its tenant group - wait(managementClusterRemoveTenantFromGroup(tr, tenantName, tenantEntry, clusterMetadata)); + // Does an initial check if the tenant is empty. This is an optimization to prevent us marking a tenant + // in the deleted state while it has data, but it is still possible that data gets added to it after this + // point. + // + // SOMEDAY: should this also lock the tenant when locking is supported? + ACTOR static Future checkTenantEmpty(DeleteTenantImpl* self, Reference tr) { + state Optional tenantEntry = wait(TenantAPI::tryGetTenantTransaction(tr, self->tenantName)); + if (!tenantEntry.present() || tenantEntry.get().id != self->tenantId) { + // The tenant must have been removed simultaneously + return Void(); + } + + ThreadFuture rangeFuture = tr->getRange(prefixRange(tenantEntry.get().prefix), 1); + RangeResult result = wait(safeThreadFutureToFuture(rangeFuture)); + if (!result.empty()) { + throw tenant_not_empty(); + } return Void(); } - // Returns true if the deletion is already in progress - ACTOR static Future getAssignedLocation(DeleteTenantImpl* self) { - state Reference tr = self->managementDb->createTransaction(); - loop { - try { - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - state Future tenantModeCheck = - TenantAPI::checkTenantMode(tr, ClusterType::METACLUSTER_MANAGEMENT); + // Mark the tenant as being in a removing state on the management cluster + ACTOR static Future markTenantInRemovingState(DeleteTenantImpl* self, + Reference tr) { + state Optional tenantEntry = wait(tryGetTenantTransaction(tr, self->tenantName)); - state Optional tenantEntry = wait(tryGetTenantTransaction(tr, self->tenantName)); - wait(tenantModeCheck); - - if (!tenantEntry.present()) { - throw tenant_not_found(); - } - - self->tenantId = tenantEntry.get().id; - - DataClusterMetadata clusterMetadata = - wait(getClusterTransaction(tr, tenantEntry.get().assignedCluster.get())); - - Reference dataClusterDb = wait(openDatabase(clusterMetadata.connectionString)); - self->dataClusterDb = dataClusterDb; - - return tenantEntry.get().tenantState == TenantState::REMOVING; - } catch (Error& e) { - wait(safeThreadFutureToFuture(tr->onError(e))); - } + if (!tenantEntry.present() || tenantEntry.get().id != self->tenantId) { + // The tenant must have been removed simultaneously + return Void(); } + + if (tenantEntry.get().tenantState != TenantState::REMOVING) { + TenantMapEntry updatedEntry = tenantEntry.get(); + updatedEntry.tenantState = TenantState::REMOVING; + ManagementClusterMetadata::tenantMetadata.tenantMap.set(tr, self->tenantName, updatedEntry); + Optional entryCheck = + wait(ManagementClusterMetadata::tenantMetadata.tenantMap.get(tr, self->tenantName)); + } + + return Void(); } - ACTOR static Future checkTenantEmpty(DeleteTenantImpl* self) { - state Reference dataTenant = self->dataClusterDb->openTenant(self->tenantName); - state Reference dataTr = dataTenant->createTransaction(); - loop { - try { - ThreadFuture rangeFuture = dataTr->getRange(normalKeys, 1); - RangeResult result = wait(safeThreadFutureToFuture(rangeFuture)); - if (!result.empty()) { - throw tenant_not_empty(); - } - return Void(); - } catch (Error& e) { - wait(safeThreadFutureToFuture(dataTr->onError(e))); - } + // Delete the tenant and related metadata on the management cluster + ACTOR static Future deleteTenantFromManagementCluster(DeleteTenantImpl* self, + Reference tr) { + state Optional tenantEntry = wait(tryGetTenantTransaction(tr, self->tenantName)); + + if (!tenantEntry.present() || tenantEntry.get().id != self->tenantId) { + return Void(); } - } - ACTOR static Future markTenantInRemovingState(DeleteTenantImpl* self) { - state Reference tr = self->managementDb->createTransaction(); - loop { - try { - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - state Future tenantModeCheck = - TenantAPI::checkTenantMode(tr, ClusterType::METACLUSTER_MANAGEMENT); - state Optional tenantEntry = wait(tryGetTenantTransaction(tr, self->tenantName)); - wait(tenantModeCheck); + ASSERT(tenantEntry.get().tenantState == TenantState::REMOVING); - if (!tenantEntry.present() || tenantEntry.get().id != self->tenantId) { - // The tenant must have been removed simultaneously - return Void(); - } + // Erase the tenant entry itself + ManagementClusterMetadata::tenantMetadata.tenantMap.erase(tr, self->tenantName); - if (tenantEntry.get().tenantState != TenantState::REMOVING) { - TenantMapEntry updatedEntry = tenantEntry.get(); - updatedEntry.tenantState = TenantState::REMOVING; - ManagementClusterMetadata::tenantMetadata.tenantMap.set(tr, self->tenantName, updatedEntry); - wait(buggifiedCommit(tr, BUGGIFY)); - } + // This is idempotent because this function is only called if the tenant is in the map + ManagementClusterMetadata::tenantMetadata.tenantCount.atomicOp(tr, -1, MutationRef::AddValue); + ManagementClusterMetadata::clusterTenantCount.atomicOp( + tr, tenantEntry.get().assignedCluster.get(), -1, MutationRef::AddValue); - return Void(); - } catch (Error& e) { - wait(safeThreadFutureToFuture(tr->onError(e))); - } - } - } + // Clean up cluster based tenant indices and remove the tenant group if it is empty + state DataClusterMetadata clusterMetadata = + wait(getClusterTransaction(tr, tenantEntry.get().assignedCluster.get())); - ACTOR static Future deleteTenantFromManagementCluster(DeleteTenantImpl* self) { - state Reference tr = self->managementDb->createTransaction(); - loop { - try { - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - state Future tenantModeCheck = - TenantAPI::checkTenantMode(tr, ClusterType::METACLUSTER_MANAGEMENT); - state Optional tenantEntry = wait(tryGetTenantTransaction(tr, self->tenantName)); - wait(tenantModeCheck); + // Remove the tenant from the cluster -> tenant index + ManagementClusterMetadata::clusterTenantIndex.erase( + tr, Tuple::makeTuple(tenantEntry.get().assignedCluster.get(), self->tenantName)); - if (!tenantEntry.present() || tenantEntry.get().id != self->tenantId) { - return Void(); - } + // Remove the tenant from its tenant group + wait(managementClusterRemoveTenantFromGroup(tr, self->tenantName, tenantEntry.get(), clusterMetadata)); - ASSERT(tenantEntry.get().tenantState == TenantState::REMOVING); - - wait(managementClusterEraseTenantEntry(tr, self->tenantName, tenantEntry.get())); - wait(buggifiedCommit(tr, BUGGIFY)); - - return Void(); - } catch (Error& e) { - wait(safeThreadFutureToFuture(tr->onError(e))); - } - } + return Void(); } ACTOR static Future run(DeleteTenantImpl* self) { // Get information about the tenant and where it is assigned - bool deletionInProgress = wait(getAssignedLocation(self)); + bool deletionInProgress = wait(self->ctx.runManagementTransaction( + [self = self](Reference tr) { return getAssignedLocation(self, tr); })); + if (!deletionInProgress) { - wait(checkTenantEmpty(self)); - wait(markTenantInRemovingState(self)); + wait(self->ctx.runDataClusterTransaction( + [self = self](Reference tr) { return checkTenantEmpty(self, tr); })); + + wait(self->ctx.runManagementTransaction([self = self](Reference tr) { + return markTenantInRemovingState(self, tr); + })); } // Delete tenant on the data cluster - wait(TenantAPI::deleteTenant( - self->dataClusterDb, self->tenantName, self->tenantId, ClusterType::METACLUSTER_DATA)); + wait(self->ctx.runDataClusterTransaction([self = self](Reference tr) { + return TenantAPI::deleteTenantTransaction( + tr, self->tenantName, self->tenantId, ClusterType::METACLUSTER_DATA); + })); - wait(deleteTenantFromManagementCluster(self)); + wait(self->ctx.runManagementTransaction([self = self](Reference tr) { + return deleteTenantFromManagementCluster(self, tr); + })); return Void(); } @@ -1415,19 +1451,19 @@ Future>> listTenants(Reference template struct ConfigureTenantImpl { + MetaclusterOperationContext ctx; + // Initialization parameters - Reference managementDb; TenantName tenantName; std::map, Optional> configurationParameters; // Parameters set in updateManagementCluster TenantMapEntry updatedEntry; - DataClusterMetadata clusterMetadata; ConfigureTenantImpl(Reference managementDb, TenantName tenantName, std::map, Optional> configurationParameters) - : managementDb(managementDb), tenantName(tenantName), configurationParameters(configurationParameters) {} + : ctx(managementDb), tenantName(tenantName), configurationParameters(configurationParameters) {} // This verifies that the tenant group can be changed, and if so it updates all of the tenant group data // structures. It does not update the TenantMapEntry stored in the tenant map. @@ -1445,13 +1481,14 @@ struct ConfigureTenantImpl { // Removing a tenant group is only possible if we have capacity for more groups on the current cluster else if (!desiredGroup.present()) { - if (!self->clusterMetadata.entry.hasCapacity()) { + if (!self->ctx.dataClusterMetadata.get().entry.hasCapacity()) { throw metacluster_no_capacity(); } - wait(managementClusterRemoveTenantFromGroup(tr, self->tenantName, tenantEntry, self->clusterMetadata)); + wait(managementClusterRemoveTenantFromGroup( + tr, self->tenantName, tenantEntry, self->ctx.dataClusterMetadata.get())); managementClusterAddTenantToGroup( - tr, self->tenantName, entryWithUpdatedGroup, self->clusterMetadata, false); + tr, self->tenantName, entryWithUpdatedGroup, self->ctx.dataClusterMetadata.get(), false); return Void(); } @@ -1460,20 +1497,23 @@ struct ConfigureTenantImpl { // If we are creating a new tenant group, we need to have capacity on the current cluster if (!tenantGroupEntry.present()) { - if (!self->clusterMetadata.entry.hasCapacity()) { + if (!self->ctx.dataClusterMetadata.get().entry.hasCapacity()) { throw metacluster_no_capacity(); } else if (tenantEntry.tenantGroup.present()) { - wait(managementClusterRemoveTenantFromGroup(tr, self->tenantName, tenantEntry, self->clusterMetadata)); + wait(managementClusterRemoveTenantFromGroup( + tr, self->tenantName, tenantEntry, self->ctx.dataClusterMetadata.get())); } managementClusterAddTenantToGroup( - tr, self->tenantName, entryWithUpdatedGroup, self->clusterMetadata, false); + tr, self->tenantName, entryWithUpdatedGroup, self->ctx.dataClusterMetadata.get(), false); return Void(); } // Moves between groups in the same cluster are freely allowed else if (tenantGroupEntry.get().assignedCluster == tenantEntry.assignedCluster) { - wait(managementClusterRemoveTenantFromGroup(tr, self->tenantName, tenantEntry, self->clusterMetadata)); - managementClusterAddTenantToGroup(tr, self->tenantName, entryWithUpdatedGroup, self->clusterMetadata, true); + wait(managementClusterRemoveTenantFromGroup( + tr, self->tenantName, tenantEntry, self->ctx.dataClusterMetadata.get())); + managementClusterAddTenantToGroup( + tr, self->tenantName, entryWithUpdatedGroup, self->ctx.dataClusterMetadata.get(), true); return Void(); } @@ -1484,123 +1524,77 @@ struct ConfigureTenantImpl { } // Updates the configuration in the management cluster and marks it as being in the UPDATING_CONFIGURATION state - ACTOR static Future updateManagementCluster(ConfigureTenantImpl* self) { - state Reference tr = self->managementDb->createTransaction(); - loop { - try { - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - state Future tenantModeCheck = - TenantAPI::checkTenantMode(tr, ClusterType::METACLUSTER_MANAGEMENT); + ACTOR static Future updateManagementCluster(ConfigureTenantImpl* self, + Reference tr) { + state Optional tenantEntry = wait(tryGetTenantTransaction(tr, self->tenantName)); - state Optional tenantEntry = wait(tryGetTenantTransaction(tr, self->tenantName)); - wait(tenantModeCheck); - - if (!tenantEntry.present()) { - throw tenant_not_found(); - } - - if (tenantEntry.get().tenantState != TenantState::READY && - tenantEntry.get().tenantState != TenantState::UPDATING_CONFIGURATION) { - throw invalid_tenant_state(); - } - - self->updatedEntry = tenantEntry.get(); - DataClusterMetadata _clusterMetadata = - wait(getClusterTransaction(tr, tenantEntry.get().assignedCluster.get())); - - self->clusterMetadata = _clusterMetadata; - self->updatedEntry.tenantState = TenantState::UPDATING_CONFIGURATION; - - state std::map, Optional>::iterator configItr; - for (configItr = self->configurationParameters.begin(); - configItr != self->configurationParameters.end(); - ++configItr) { - if (configItr->first == "tenant_group"_sr) { - wait(updateTenantGroup(self, tr, self->updatedEntry, configItr->second)); - } - self->updatedEntry.configure(configItr->first, configItr->second); - } - - ++self->updatedEntry.configurationSequenceNum; - ManagementClusterMetadata::tenantMetadata.tenantMap.set(tr, self->tenantName, self->updatedEntry); - wait(buggifiedCommit(tr, BUGGIFY)); - - // If there is no assigned cluster, then we can terminate early - return Void(); - } catch (Error& e) { - wait(safeThreadFutureToFuture(tr->onError(e))); - } + if (!tenantEntry.present()) { + throw tenant_not_found(); } + + if (tenantEntry.get().tenantState != TenantState::READY && + tenantEntry.get().tenantState != TenantState::UPDATING_CONFIGURATION) { + throw invalid_tenant_state(); + } + + wait(self->ctx.setCluster(tr, tenantEntry.get().assignedCluster.get())); + + self->updatedEntry = tenantEntry.get(); + self->updatedEntry.tenantState = TenantState::UPDATING_CONFIGURATION; + + state std::map, Optional>::iterator configItr; + for (configItr = self->configurationParameters.begin(); configItr != self->configurationParameters.end(); + ++configItr) { + if (configItr->first == "tenant_group"_sr) { + wait(updateTenantGroup(self, tr, self->updatedEntry, configItr->second)); + } + self->updatedEntry.configure(configItr->first, configItr->second); + } + + ++self->updatedEntry.configurationSequenceNum; + ManagementClusterMetadata::tenantMetadata.tenantMap.set(tr, self->tenantName, self->updatedEntry); + + return Void(); } // Updates the configuration in the data cluster - ACTOR static Future updateDataCluster(ConfigureTenantImpl* self) { - state Reference dataClusterDb = wait(openDatabase(self->clusterMetadata.connectionString)); - state Reference tr = dataClusterDb->createTransaction(); + ACTOR static Future updateDataCluster(ConfigureTenantImpl* self, Reference tr) { + state Optional tenantEntry = wait(TenantAPI::tryGetTenantTransaction(tr, self->tenantName)); - loop { - try { - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - - state Future> metaclusterRegistrationFuture = - MetaclusterMetadata::metaclusterRegistration.get(tr); - - state Optional tenantEntry = - wait(TenantAPI::tryGetTenantTransaction(tr, self->tenantName)); - state Optional metaclusterRegistration = - wait(metaclusterRegistrationFuture); - - if (!tenantEntry.present() || tenantEntry.get().id != self->updatedEntry.id || - tenantEntry.get().configurationSequenceNum >= self->updatedEntry.configurationSequenceNum || - !metaclusterRegistration.present() || - metaclusterRegistration.get().clusterType != ClusterType::METACLUSTER_DATA) { - // If the tenant or cluster isn't in the metacluster, it must have been concurrently removed - return Void(); - } - - self->updatedEntry.tenantState = TenantState::READY; - wait( - TenantAPI::configureTenantTransaction(tr, self->tenantName, tenantEntry.get(), self->updatedEntry)); - wait(buggifiedCommit(tr, BUGGIFY)); - return Void(); - } catch (Error& e) { - wait(safeThreadFutureToFuture(tr->onError(e))); - } + if (!tenantEntry.present() || tenantEntry.get().id != self->updatedEntry.id || + tenantEntry.get().configurationSequenceNum >= self->updatedEntry.configurationSequenceNum) { + // If the tenant isn't in the metacluster, it must have been concurrently removed + return Void(); } + + self->updatedEntry.tenantState = TenantState::READY; + wait(TenantAPI::configureTenantTransaction(tr, self->tenantName, tenantEntry.get(), self->updatedEntry)); + return Void(); } // Updates the tenant state in the management cluster to READY - ACTOR static Future markManagementTenantAsReady(ConfigureTenantImpl* self) { - state Reference tr = self->managementDb->createTransaction(); - loop { - try { - tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - state Future tenantModeCheck = - TenantAPI::checkTenantMode(tr, ClusterType::METACLUSTER_MANAGEMENT); + ACTOR static Future markManagementTenantAsReady(ConfigureTenantImpl* self, + Reference tr) { + state Optional tenantEntry = wait(tryGetTenantTransaction(tr, self->tenantName)); - state Optional tenantEntry = wait(tryGetTenantTransaction(tr, self->tenantName)); - wait(tenantModeCheck); - - if (!tenantEntry.present() || tenantEntry.get().id != self->updatedEntry.id || - tenantEntry.get().tenantState != TenantState::UPDATING_CONFIGURATION || - tenantEntry.get().configurationSequenceNum > self->updatedEntry.configurationSequenceNum) { - return Void(); - } - - tenantEntry.get().tenantState = TenantState::READY; - ManagementClusterMetadata::tenantMetadata.tenantMap.set(tr, self->tenantName, self->updatedEntry); - wait(buggifiedCommit(tr, BUGGIFY)); - return Void(); - } catch (Error& e) { - wait(safeThreadFutureToFuture(tr->onError(e))); - } + if (!tenantEntry.present() || tenantEntry.get().id != self->updatedEntry.id || + tenantEntry.get().tenantState != TenantState::UPDATING_CONFIGURATION || + tenantEntry.get().configurationSequenceNum > self->updatedEntry.configurationSequenceNum) { + return Void(); } + + tenantEntry.get().tenantState = TenantState::READY; + ManagementClusterMetadata::tenantMetadata.tenantMap.set(tr, self->tenantName, self->updatedEntry); + return Void(); } ACTOR static Future run(ConfigureTenantImpl* self) { - wait(updateManagementCluster(self)); - wait(updateDataCluster(self)); - wait(markManagementTenantAsReady(self)); + wait(self->ctx.runManagementTransaction( + [self = self](Reference tr) { return updateManagementCluster(self, tr); })); + wait(self->ctx.runDataClusterTransaction( + [self = self](Reference tr) { return updateDataCluster(self, tr); })); + wait(self->ctx.runManagementTransaction( + [self = self](Reference tr) { return markManagementTenantAsReady(self, tr); })); return Void(); } diff --git a/fdbclient/include/fdbclient/TenantManagement.actor.h b/fdbclient/include/fdbclient/TenantManagement.actor.h index 2ff9cd838b..dfe3e66e41 100644 --- a/fdbclient/include/fdbclient/TenantManagement.actor.h +++ b/fdbclient/include/fdbclient/TenantManagement.actor.h @@ -313,47 +313,47 @@ Future deleteTenantTransaction(Transaction tr, TenantMetadata::tenantGroupMap.erase(tr, tenantEntry.get().tenantGroup.get()); } } + } - if (clusterType == ClusterType::METACLUSTER_DATA) { - // In data clusters, we store a tombstone - state Future> latestTombstoneFuture = - TenantMetadata::tenantTombstones.getRange(tr, {}, {}, 1, Snapshot::False, Reverse::True); - state Optional cleanupData = wait(TenantMetadata::tombstoneCleanupData.get(tr)); - state Version transactionReadVersion = wait(safeThreadFutureToFuture(tr->getReadVersion())); + if (clusterType == ClusterType::METACLUSTER_DATA) { + // In data clusters, we store a tombstone + state Future> latestTombstoneFuture = + TenantMetadata::tenantTombstones.getRange(tr, {}, {}, 1, Snapshot::False, Reverse::True); + state Optional cleanupData = wait(TenantMetadata::tombstoneCleanupData.get(tr)); + state Version transactionReadVersion = wait(safeThreadFutureToFuture(tr->getReadVersion())); - // If it has been long enough since we last cleaned up the tenant tombstones, we do that first - if (!cleanupData.present() || cleanupData.get().nextTombstoneEraseVersion <= transactionReadVersion) { - state int64_t deleteThroughId = cleanupData.present() ? cleanupData.get().nextTombstoneEraseId : -1; - // Delete all tombstones up through the one currently marked in the cleanup data - if (deleteThroughId >= 0) { - TenantMetadata::tenantTombstones.erase(tr, 0, deleteThroughId + 1); - } + // If it has been long enough since we last cleaned up the tenant tombstones, we do that first + if (!cleanupData.present() || cleanupData.get().nextTombstoneEraseVersion <= transactionReadVersion) { + state int64_t deleteThroughId = cleanupData.present() ? cleanupData.get().nextTombstoneEraseId : -1; + // Delete all tombstones up through the one currently marked in the cleanup data + if (deleteThroughId >= 0) { + TenantMetadata::tenantTombstones.erase(tr, 0, deleteThroughId + 1); + } - KeyBackedRangeResult latestTombstone = wait(latestTombstoneFuture); - int64_t nextDeleteThroughId = std::max(deleteThroughId, tenantId.get()); - if (!latestTombstone.results.empty()) { - nextDeleteThroughId = std::max(nextDeleteThroughId, latestTombstone.results[0]); - } + KeyBackedRangeResult latestTombstone = wait(latestTombstoneFuture); + int64_t nextDeleteThroughId = std::max(deleteThroughId, tenantId.get()); + if (!latestTombstone.results.empty()) { + nextDeleteThroughId = std::max(nextDeleteThroughId, latestTombstone.results[0]); + } - // The next cleanup will happen at or after TENANT_TOMBSTONE_CLEANUP_INTERVAL seconds have elapsed and - // will clean up tombstones through the most recently allocated ID. - TenantTombstoneCleanupData updatedCleanupData; - updatedCleanupData.tombstonesErasedThrough = deleteThroughId; - updatedCleanupData.nextTombstoneEraseId = nextDeleteThroughId; - updatedCleanupData.nextTombstoneEraseVersion = - transactionReadVersion + - CLIENT_KNOBS->TENANT_TOMBSTONE_CLEANUP_INTERVAL * CLIENT_KNOBS->VERSIONS_PER_SECOND; + // The next cleanup will happen at or after TENANT_TOMBSTONE_CLEANUP_INTERVAL seconds have elapsed and + // will clean up tombstones through the most recently allocated ID. + TenantTombstoneCleanupData updatedCleanupData; + updatedCleanupData.tombstonesErasedThrough = deleteThroughId; + updatedCleanupData.nextTombstoneEraseId = nextDeleteThroughId; + updatedCleanupData.nextTombstoneEraseVersion = + transactionReadVersion + + CLIENT_KNOBS->TENANT_TOMBSTONE_CLEANUP_INTERVAL * CLIENT_KNOBS->VERSIONS_PER_SECOND; - TenantMetadata::tombstoneCleanupData.set(tr, updatedCleanupData); + TenantMetadata::tombstoneCleanupData.set(tr, updatedCleanupData); - // If the tenant being deleted is within the tombstone window, record the tombstone - if (tenantId.get() > updatedCleanupData.tombstonesErasedThrough) { - TenantMetadata::tenantTombstones.insert(tr, tenantId.get()); - } - } else if (tenantId.get() > cleanupData.get().tombstonesErasedThrough) { - // If the tenant being deleted is within the tombstone window, record the tombstone + // If the tenant being deleted is within the tombstone window, record the tombstone + if (tenantId.get() > updatedCleanupData.tombstonesErasedThrough) { TenantMetadata::tenantTombstones.insert(tr, tenantId.get()); } + } else if (tenantId.get() > cleanupData.get().tombstonesErasedThrough) { + // If the tenant being deleted is within the tombstone window, record the tombstone + TenantMetadata::tenantTombstones.insert(tr, tenantId.get()); } } diff --git a/fdbserver/workloads/TenantManagementConcurrencyWorkload.actor.cpp b/fdbserver/workloads/TenantManagementConcurrencyWorkload.actor.cpp index a12c361b1f..af38fe6e6f 100644 --- a/fdbserver/workloads/TenantManagementConcurrencyWorkload.actor.cpp +++ b/fdbserver/workloads/TenantManagementConcurrencyWorkload.actor.cpp @@ -169,34 +169,22 @@ struct TenantManagementConcurrencyWorkload : TestWorkload { try { loop { - try { - Future createFuture = - self->useMetacluster - ? MetaclusterAPI::createTenant(self->mvDb, tenant, entry) - : success(TenantAPI::createTenant(self->dataDb.getReference(), tenant, entry)); - Optional result = wait(timeout(createFuture, 30)); - if (result.present()) { - break; - } - } catch (Error& e) { - // If we retried the creation after our initial attempt succeeded, then we proceed with the rest - // of the creation steps normally. Otherwise, the creation happened elsewhere and we failed - // here, so we can rethrow the error. - if (e.code() == error_code_tenant_already_exists) { - break; - } else if (e.code() == error_code_tenant_removed) { - ASSERT(self->useMetacluster); - break; - } else { - throw; - } + Future createFuture = + self->useMetacluster ? MetaclusterAPI::createTenant(self->mvDb, tenant, entry) + : success(TenantAPI::createTenant(self->dataDb.getReference(), tenant, entry)); + Optional result = wait(timeout(createFuture, 30)); + if (result.present()) { + break; } } return Void(); } catch (Error& e) { - if (e.code() != error_code_tenant_already_exists && e.code() != error_code_cluster_no_capacity) { + if (e.code() == error_code_tenant_removed) { + ASSERT(self->useMetacluster); + } else if (e.code() != error_code_tenant_already_exists && e.code() != error_code_cluster_no_capacity) { TraceEvent(SevError, "CreateTenantFailure").error(e).detail("TenantName", tenant); + ASSERT(false); } return Void(); @@ -208,24 +196,13 @@ struct TenantManagementConcurrencyWorkload : TestWorkload { try { loop { - try { - Future deleteFuture = self->useMetacluster - ? MetaclusterAPI::deleteTenant(self->mvDb, tenant) - : TenantAPI::deleteTenant(self->dataDb.getReference(), tenant); - Optional result = wait(timeout(deleteFuture, 30)); + Future deleteFuture = self->useMetacluster + ? MetaclusterAPI::deleteTenant(self->mvDb, tenant) + : TenantAPI::deleteTenant(self->dataDb.getReference(), tenant); + Optional result = wait(timeout(deleteFuture, 30)); - if (result.present()) { - break; - } - } catch (Error& e) { - // If we retried the deletion after our initial attempt succeeded, then we proceed with the - // rest of the deletion steps normally. Otherwise, the deletion happened elsewhere and we - // failed here, so we can rethrow the error. - if (e.code() == error_code_tenant_not_found) { - break; - } else { - throw; - } + if (result.present()) { + break; } } diff --git a/flow/include/flow/error_definitions.h b/flow/include/flow/error_definitions.h index 9281485038..f2bea47160 100755 --- a/flow/include/flow/error_definitions.h +++ b/flow/include/flow/error_definitions.h @@ -250,6 +250,7 @@ ERROR( metacluster_no_capacity, 2157, "Metacluster does not have capacity to cre ERROR( management_cluster_invalid_access, 2158, "Standard transactions cannot be run against the management cluster" ) ERROR( tenant_cannot_be_moved, 2159, "The tenant cannot be moved between clusters" ) ERROR( tenant_creation_permanently_failed, 2160, "The tenant creation did not complete in a timely manner and has permanently failed" ) +ERROR( cluster_locked, 2161, "The cluster has been locked" ) // 2200 - errors from bindings and official APIs ERROR( api_version_unset, 2200, "API version is not set" )