restructure updater code and add capacity check in metacluster management workload

This commit is contained in:
Jon Fu 2022-09-29 16:24:02 -07:00
parent 9d5879d2f5
commit 461e42bfe1
7 changed files with 57 additions and 34 deletions

View File

@ -447,6 +447,8 @@ Creates a new tenant in the cluster.
``TENANT_GROUP`` - The tenant group the tenant will be placed in. ``TENANT_GROUP`` - The tenant group the tenant will be placed in.
``CLUSTER_NAME`` - The cluster the tenant will be placed in (metacluster only). If unspecified, the metacluster will choose the cluster.
delete delete
^^^^^^ ^^^^^^

View File

@ -606,9 +606,9 @@ std::vector<const char*> tenantHintGenerator(std::vector<StringRef> const& token
if (tokens.size() == 1) { if (tokens.size() == 1) {
return { "<create|delete|list|get|configure|rename>", "[ARGS]" }; return { "<create|delete|list|get|configure|rename>", "[ARGS]" };
} else if (tokencmp(tokens[1], "create") && tokens.size() < 5) { } else if (tokencmp(tokens[1], "create") && tokens.size() < 5) {
static std::vector<const char*> opts = { static std::vector<const char*> opts = { "<NAME>",
"<NAME> [tenant_group=<TENANT_GROUP>] [assigned_cluster=<CLUSTER_NAME>]" "[tenant_group=<TENANT_GROUP>]",
}; "[assigned_cluster=<CLUSTER_NAME>]" };
return std::vector<const char*>(opts.begin() + tokens.size() - 2, opts.end()); return std::vector<const char*>(opts.begin() + tokens.size() - 2, opts.end());
} else if (tokencmp(tokens[1], "delete") && tokens.size() < 3) { } else if (tokencmp(tokens[1], "delete") && tokens.size() < 3) {
static std::vector<const char*> opts = { "<NAME>" }; static std::vector<const char*> opts = { "<NAME>" };

View File

@ -27,7 +27,7 @@
namespace MetaclusterAPI { namespace MetaclusterAPI {
std::pair<ClusterUsage, ClusterUsage> metaclusterCapacity(std::map<ClusterName, DataClusterMetadata> clusters) { std::pair<ClusterUsage, ClusterUsage> metaclusterCapacity(std::map<ClusterName, DataClusterMetadata> const& clusters) {
ClusterUsage tenantGroupCapacity; ClusterUsage tenantGroupCapacity;
ClusterUsage tenantGroupsAllocated; ClusterUsage tenantGroupsAllocated;
for (auto cluster : clusters) { for (auto cluster : clusters) {

View File

@ -116,7 +116,7 @@ struct ManagementClusterMetadata {
}; };
// Helper function to compute metacluster capacity by passing the result of MetaclusterAPI::listClusters // Helper function to compute metacluster capacity by passing the result of MetaclusterAPI::listClusters
std::pair<ClusterUsage, ClusterUsage> metaclusterCapacity(std::map<ClusterName, DataClusterMetadata> clusters); std::pair<ClusterUsage, ClusterUsage> metaclusterCapacity(std::map<ClusterName, DataClusterMetadata> const& clusters);
ACTOR Future<Reference<IDatabase>> openDatabase(ClusterConnectionString connectionString); ACTOR Future<Reference<IDatabase>> openDatabase(ClusterConnectionString connectionString);

View File

@ -2687,36 +2687,51 @@ ACTOR Future<Void> workerHealthMonitor(ClusterControllerData* self) {
} }
ACTOR Future<Void> metaclusterMetricsUpdater(ClusterControllerData* self) { ACTOR Future<Void> metaclusterMetricsUpdater(ClusterControllerData* self) {
loop { state Future<Void> updaterDelay =
if (self->db.clusterType == ClusterType::METACLUSTER_MANAGEMENT) { self->db.clusterType == ClusterType::METACLUSTER_MANAGEMENT ? delay(60.0) : Never();
loop choose {
when(wait(self->db.serverInfo->onChange())) {
updaterDelay = self->db.clusterType == ClusterType::METACLUSTER_MANAGEMENT ? delay(60.0) : Never();
}
when(wait(updaterDelay)) {
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->cx); state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->cx);
try { loop {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); try {
state std::map<ClusterName, DataClusterMetadata> clusters = wait( tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
MetaclusterAPI::listClustersTransaction(tr, ""_sr, "\xff"_sr, CLIENT_KNOBS->MAX_DATA_CLUSTERS)); state std::map<ClusterName, DataClusterMetadata> clusters;
state int64_t tenantCount = state int64_t tenantCount;
wait(MetaclusterAPI::ManagementClusterMetadata::tenantMetadata().tenantCount.getD( wait(store(clusters,
tr, Snapshot::False, 0)); MetaclusterAPI::listClustersTransaction(
state std::pair<ClusterUsage, ClusterUsage> capacityNumbers = tr, ""_sr, "\xff"_sr, CLIENT_KNOBS->MAX_DATA_CLUSTERS)) &&
MetaclusterAPI::metaclusterCapacity(clusters); store(tenantCount,
MetaclusterMetrics metrics; MetaclusterAPI::ManagementClusterMetadata::tenantMetadata().tenantCount.getD(
metrics.numTenants = tenantCount; tr, Snapshot::False, 0)));
metrics.numDataClusters = clusters.size(); state std::pair<ClusterUsage, ClusterUsage> capacityNumbers =
metrics.tenantGroupCapacity = capacityNumbers.first.numTenantGroups; MetaclusterAPI::metaclusterCapacity(clusters);
metrics.tenantGroupsAllocated = capacityNumbers.second.numTenantGroups;
self->db.metaclusterMetrics = metrics; MetaclusterMetrics metrics;
TraceEvent("MetaclusterCapacity") metrics.numTenants = tenantCount;
.detail("DataClusters", self->db.metaclusterMetrics.numDataClusters) metrics.numDataClusters = clusters.size();
.detail("TenantGroupCapacity", self->db.metaclusterMetrics.tenantGroupCapacity) metrics.tenantGroupCapacity = capacityNumbers.first.numTenantGroups;
.detail("TenantGroupsAllocated", self->db.metaclusterMetrics.tenantGroupsAllocated); metrics.tenantGroupsAllocated = capacityNumbers.second.numTenantGroups;
} catch (Error& e) { self->db.metaclusterMetrics = metrics;
TraceEvent("MetaclusterUpdaterError").error(e); TraceEvent("MetaclusterCapacity")
wait(tr->onError(e)); .detail("DataClusters", self->db.metaclusterMetrics.numDataClusters)
continue; .detail("TenantGroupCapacity", self->db.metaclusterMetrics.tenantGroupCapacity)
.detail("TenantGroupsAllocated", self->db.metaclusterMetrics.tenantGroupsAllocated);
break;
} catch (Error& e) {
TraceEvent("MetaclusterUpdaterError").error(e);
// Cluster can change types during/before a metacluster transaction
// and throw an error due to timing issues.
// In such cases, go back to choose loop instead of retrying
if (e.code() == error_code_invalid_metacluster_operation) {
break;
}
wait(tr->onError(e));
}
} }
} }
// Background updater updates every minute
wait(delay(60.0));
} }
} }

View File

@ -1168,8 +1168,8 @@ ACTOR Future<Void> readTransactionSystemState(Reference<ClusterRecoveryData> sel
Optional<UID> metaclusterId; Optional<UID> metaclusterId;
Optional<ClusterName> clusterName; Optional<ClusterName> clusterName;
Optional<UID> clusterId; Optional<UID> clusterId;
self->controllerData->db.metaclusterRegistration = metaclusterRegistration;
if (metaclusterRegistration.present()) { if (metaclusterRegistration.present()) {
self->controllerData->db.metaclusterRegistration = metaclusterRegistration.get();
self->controllerData->db.metaclusterName = metaclusterRegistration.get().metaclusterName; self->controllerData->db.metaclusterName = metaclusterRegistration.get().metaclusterName;
self->controllerData->db.clusterType = metaclusterRegistration.get().clusterType; self->controllerData->db.clusterType = metaclusterRegistration.get().clusterType;
metaclusterName = metaclusterRegistration.get().metaclusterName; metaclusterName = metaclusterRegistration.get().metaclusterName;
@ -1179,7 +1179,6 @@ ACTOR Future<Void> readTransactionSystemState(Reference<ClusterRecoveryData> sel
clusterId = metaclusterRegistration.get().id; clusterId = metaclusterRegistration.get().id;
} }
} else { } else {
self->controllerData->db.metaclusterRegistration = Optional<MetaclusterRegistrationEntry>();
self->controllerData->db.metaclusterName = Optional<ClusterName>(); self->controllerData->db.metaclusterName = Optional<ClusterName>();
self->controllerData->db.clusterType = ClusterType::STANDALONE; self->controllerData->db.clusterType = ClusterType::STANDALONE;
} }

View File

@ -906,18 +906,25 @@ struct MetaclusterManagementWorkload : TestWorkload {
std::map<ClusterName, DataClusterMetadata> dataClusters = wait(MetaclusterAPI::listClusters( std::map<ClusterName, DataClusterMetadata> dataClusters = wait(MetaclusterAPI::listClusters(
self->managementDb, ""_sr, "\xff\xff"_sr, CLIENT_KNOBS->MAX_DATA_CLUSTERS + 1)); self->managementDb, ""_sr, "\xff\xff"_sr, CLIENT_KNOBS->MAX_DATA_CLUSTERS + 1));
int totalTenantGroupsAllocated = 0;
std::vector<Future<Void>> dataClusterChecks; std::vector<Future<Void>> dataClusterChecks;
for (auto [clusterName, dataClusterData] : self->dataDbs) { for (auto [clusterName, dataClusterData] : self->dataDbs) {
auto dataClusterItr = dataClusters.find(clusterName); auto dataClusterItr = dataClusters.find(clusterName);
if (dataClusterData.registered) { if (dataClusterData.registered) {
ASSERT(dataClusterItr != dataClusters.end()); ASSERT(dataClusterItr != dataClusters.end());
ASSERT(dataClusterItr->second.entry.capacity.numTenantGroups == dataClusterData.tenantGroupCapacity); ASSERT(dataClusterItr->second.entry.capacity.numTenantGroups == dataClusterData.tenantGroupCapacity);
totalTenantGroupsAllocated +=
dataClusterData.tenantGroups.size() + dataClusterData.ungroupedTenants.size();
} else { } else {
ASSERT(dataClusterItr == dataClusters.end()); ASSERT(dataClusterItr == dataClusters.end());
} }
dataClusterChecks.push_back(checkDataCluster(self, clusterName, dataClusterData)); dataClusterChecks.push_back(checkDataCluster(self, clusterName, dataClusterData));
} }
auto capacityNumbers = MetaclusterAPI::metaclusterCapacity(dataClusters);
ASSERT(capacityNumbers.first.numTenantGroups == self->totalTenantGroupCapacity);
ASSERT(capacityNumbers.second.numTenantGroups == totalTenantGroupsAllocated);
wait(waitForAll(dataClusterChecks)); wait(waitForAll(dataClusterChecks));
wait(decommissionMetacluster(self)); wait(decommissionMetacluster(self));