Merge pull request #10047 from sfc-gh-ajbeamon/add-metacluster-version

Add a metacluster version to the MetaclusterRegistrationEntry and validate it when loading the entry
This commit is contained in:
A.J. Beamon 2023-05-01 12:32:37 -07:00 committed by GitHub
commit 85f5e206a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 674 additions and 125 deletions

View File

@ -529,7 +529,8 @@
"primary_dc_missing",
"fetch_primary_dc_timeout",
"fetch_storage_wiggler_stats_timeout",
"fetch_consistency_scan_info_timeout"
"fetch_consistency_scan_info_timeout",
"metacluster_metrics_missing"
]
},
"issues":[

View File

@ -32,9 +32,17 @@ std::string clusterTypeToString(const ClusterType& clusterType) {
return "unknown";
}
}
KeyBackedObjectProperty<MetaclusterRegistrationEntry, decltype(IncludeVersion())>&
metacluster::metadata::metaclusterRegistration() {
static KeyBackedObjectProperty<MetaclusterRegistrationEntry, decltype(IncludeVersion())> instance(
"\xff/metacluster/clusterRegistration"_sr, IncludeVersion());
return instance;
}
KeyBackedObjectProperty<UnversionedMetaclusterRegistrationEntry, decltype(IncludeVersion())>&
metacluster::metadata::unversionedMetaclusterRegistration() {
static KeyBackedObjectProperty<UnversionedMetaclusterRegistrationEntry, decltype(IncludeVersion())> instance(
"\xff/metacluster/clusterRegistration"_sr, IncludeVersion());
return instance;
}

View File

@ -583,7 +583,8 @@ const KeyRef JSONSchemas::statusSchema = R"statusSchema(
"primary_dc_missing",
"fetch_primary_dc_timeout",
"fetch_storage_wiggler_stats_timeout",
"fetch_consistency_scan_info_timeout"
"fetch_consistency_scan_info_timeout",
"metacluster_metrics_missing"
]
},
"issues":[

View File

@ -28,9 +28,33 @@
std::string clusterTypeToString(const ClusterType& clusterType);
struct MetaclusterRegistrationEntry {
enum class MetaclusterVersion {
// Smaller than any legal version; used for testing
BEGIN = 0,
// This is the smallest version of metacluster metadata understood by this version of FDB. It should be updated any
// time support for older versions is dropped. Our contract is that we will support at least one older version, but
// likely we will stop supporting versions older than that.
MIN_SUPPORTED = 1,
// The initial version used for metacluster metadata
V1 = 1,
// This is the largest version of metacluster metadata understood by this version of FDB. It should be increased any
// time an FDB version adds a new metacluster version.
MAX_SUPPORTED = 1,
// Larger than any legal version; used for testing
END
};
template <bool Versioned>
struct MetaclusterRegistrationEntryImpl {
constexpr static FileIdentifier file_identifier = 13448589;
// Set to true to allow tests to write metacluster registrations that are invalid
static inline bool allowUnsupportedRegistrationWrites = false;
ClusterType clusterType;
ClusterName metaclusterName;
@ -38,20 +62,31 @@ struct MetaclusterRegistrationEntry {
UID metaclusterId;
UID id;
MetaclusterRegistrationEntry() = default;
MetaclusterRegistrationEntry(ClusterName metaclusterName, UID metaclusterId)
MetaclusterVersion version;
MetaclusterRegistrationEntryImpl() = default;
MetaclusterRegistrationEntryImpl(ClusterName metaclusterName, UID metaclusterId, MetaclusterVersion version)
: clusterType(ClusterType::METACLUSTER_MANAGEMENT), metaclusterName(metaclusterName), name(metaclusterName),
metaclusterId(metaclusterId), id(metaclusterId) {}
MetaclusterRegistrationEntry(ClusterName metaclusterName, ClusterName name, UID metaclusterId, UID id)
metaclusterId(metaclusterId), id(metaclusterId), version(version) {}
MetaclusterRegistrationEntryImpl(ClusterName metaclusterName,
ClusterName name,
UID metaclusterId,
UID id,
MetaclusterVersion version)
: clusterType(ClusterType::METACLUSTER_DATA), metaclusterName(metaclusterName), name(name),
metaclusterId(metaclusterId), id(id) {
metaclusterId(metaclusterId), id(id), version(version) {
ASSERT(metaclusterName != name && metaclusterId != id);
}
template <bool V>
MetaclusterRegistrationEntryImpl(MetaclusterRegistrationEntryImpl<V> other)
: clusterType(other.clusterType), metaclusterName(other.metaclusterName), name(other.name),
metaclusterId(other.metaclusterId), id(other.id), version(other.version) {}
// Returns true if this entry is associated with the same cluster as the passed in entry. If one entry is from the
// management cluster and the other is from a data cluster, this checks whether they are part of the same
// metacluster.
bool matches(MetaclusterRegistrationEntry const& other) const {
bool matches(MetaclusterRegistrationEntryImpl const& other) const {
if (metaclusterName != other.metaclusterName || metaclusterId != other.metaclusterId) {
return false;
} else if (clusterType == ClusterType::METACLUSTER_DATA && other.clusterType == ClusterType::METACLUSTER_DATA &&
@ -62,58 +97,81 @@ struct MetaclusterRegistrationEntry {
return true;
}
MetaclusterRegistrationEntry toManagementClusterRegistration() const {
MetaclusterRegistrationEntryImpl toManagementClusterRegistration() const {
ASSERT(clusterType == ClusterType::METACLUSTER_DATA);
return MetaclusterRegistrationEntry(metaclusterName, metaclusterId);
return MetaclusterRegistrationEntryImpl(metaclusterName, metaclusterId, version);
}
MetaclusterRegistrationEntry toDataClusterRegistration(ClusterName name, UID id) const {
MetaclusterRegistrationEntryImpl toDataClusterRegistration(ClusterName name, UID id) const {
ASSERT(clusterType == ClusterType::METACLUSTER_MANAGEMENT);
return MetaclusterRegistrationEntry(metaclusterName, name, metaclusterId, id);
return MetaclusterRegistrationEntryImpl(metaclusterName, name, metaclusterId, id, version);
}
Value encode() const { return ObjectWriter::toValue(*this, IncludeVersion()); }
static MetaclusterRegistrationEntry decode(ValueRef const& value) {
return ObjectReader::fromStringRef<MetaclusterRegistrationEntry>(value, IncludeVersion());
static MetaclusterRegistrationEntryImpl decode(ValueRef const& value) {
return ObjectReader::fromStringRef<MetaclusterRegistrationEntryImpl>(value, IncludeVersion());
}
static Optional<MetaclusterRegistrationEntry> decode(Optional<Value> value) {
return value.map([](ValueRef const& v) { return MetaclusterRegistrationEntry::decode(v); });
static Optional<MetaclusterRegistrationEntryImpl> decode(Optional<Value> value) {
return value.map([](ValueRef const& v) { return MetaclusterRegistrationEntryImpl::decode(v); });
}
std::string toString() const {
if (clusterType == ClusterType::METACLUSTER_MANAGEMENT) {
return fmt::format(
"metacluster name: {}, metacluster id: {}", printable(metaclusterName), metaclusterId.shortString());
} else {
return fmt::format("metacluster name: {}, metacluster id: {}, data cluster name: {}, data cluster id: {}",
return fmt::format("metacluster name: {}, metacluster id: {}, version: {}",
printable(metaclusterName),
metaclusterId.shortString(),
printable(name),
id.shortString());
(int)version);
} else {
return fmt::format(
"metacluster name: {}, metacluster id: {}, data cluster name: {}, data cluster id: {}, version: {}",
printable(metaclusterName),
metaclusterId.shortString(),
printable(name),
id.shortString(),
(int)version);
}
}
bool operator==(MetaclusterRegistrationEntry const& other) const {
template <bool V>
bool operator==(MetaclusterRegistrationEntryImpl<V> const& other) const {
return clusterType == other.clusterType && metaclusterName == other.metaclusterName && name == other.name &&
metaclusterId == other.metaclusterId && id == other.id;
metaclusterId == other.metaclusterId && id == other.id && version == other.version;
}
bool operator!=(MetaclusterRegistrationEntry const& other) const { return !(*this == other); }
template <bool V>
bool operator!=(MetaclusterRegistrationEntryImpl<V> const& other) const {
return !(*this == other);
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, clusterType, metaclusterName, name, metaclusterId, id);
serializer(ar, clusterType, metaclusterName, name, metaclusterId, id, version);
if constexpr (Ar::isDeserializing && Versioned) {
if (version < MetaclusterVersion::MIN_SUPPORTED || version > MetaclusterVersion::MAX_SUPPORTED) {
throw unsupported_metacluster_version();
}
}
}
};
template <>
struct Traceable<MetaclusterRegistrationEntry> : std::true_type {
static std::string toString(MetaclusterRegistrationEntry const& entry) { return entry.toString(); }
template <bool Versioned>
struct Traceable<MetaclusterRegistrationEntryImpl<Versioned>> : std::true_type {
static std::string toString(MetaclusterRegistrationEntryImpl<Versioned> const& entry) { return entry.toString(); }
};
using MetaclusterRegistrationEntry = MetaclusterRegistrationEntryImpl<true>;
using UnversionedMetaclusterRegistrationEntry = MetaclusterRegistrationEntryImpl<false>;
// Registration information for a metacluster, stored on both management and data clusters
namespace metacluster::metadata {
// Use of this metacluster registration property will verify that the metacluster registration has a compatible version
// before using it. This should be the default choice for accessing the metacluster registration.
KeyBackedObjectProperty<MetaclusterRegistrationEntry, decltype(IncludeVersion())>& metaclusterRegistration();
// Use of this metacluster registration property will not verify that the metacluster registration has a compatible
// version. It should only be used in limited circumstances.
KeyBackedObjectProperty<UnversionedMetaclusterRegistrationEntry, decltype(IncludeVersion())>&
unversionedMetaclusterRegistration();
}; // namespace metacluster::metadata
#endif

View File

@ -747,19 +747,30 @@ private:
}
}
template <bool Versioned>
void reportMetaclusterRegistration(ValueRef value) {
MetaclusterRegistrationEntryImpl<Versioned> entry = MetaclusterRegistrationEntryImpl<Versioned>::decode(value);
TraceEvent("SetMetaclusterRegistration", dbgid)
.detail("ClusterType", entry.clusterType)
.detail("MetaclusterID", entry.metaclusterId)
.detail("MetaclusterName", entry.metaclusterName)
.detail("ClusterID", entry.id)
.detail("ClusterName", entry.name)
.detail("MetaclusterVersion", entry.version);
}
void checkSetMetaclusterRegistration(MutationRef m) {
if (m.param1 == metacluster::metadata::metaclusterRegistration().key) {
MetaclusterRegistrationEntry entry = MetaclusterRegistrationEntry::decode(m.param2);
TraceEvent("SetMetaclusterRegistration", dbgid)
.detail("ClusterType", entry.clusterType)
.detail("MetaclusterID", entry.metaclusterId)
.detail("MetaclusterName", entry.metaclusterName)
.detail("ClusterID", entry.id)
.detail("ClusterName", entry.name);
if (MetaclusterRegistrationEntry::allowUnsupportedRegistrationWrites) {
reportMetaclusterRegistration<false>(m.param2);
} else {
CODE_PROBE(true, "Writing metacluster registration with version validation");
reportMetaclusterRegistration<true>(m.param2);
}
Optional<Value> value =
txnStateStore->readValue(metacluster::metadata::metaclusterRegistration().key).get();
txnStateStore->readValue(metacluster::metadata::unversionedMetaclusterRegistration().key).get();
if (!initialCommit) {
txnStateStore->set(KeyValueRef(m.param1, m.param2));
}

View File

@ -2801,12 +2801,13 @@ ACTOR Future<Void> workerHealthMonitor(ClusterControllerData* self) {
}
ACTOR Future<Void> metaclusterMetricsUpdater(ClusterControllerData* self) {
state Future<Void> updaterDelay = Void();
loop {
state Future<Void> updaterDelay =
self->db.clusterType == ClusterType::METACLUSTER_MANAGEMENT ? delay(60.0) : Never();
choose {
when(wait(self->db.serverInfo->onChange())) {}
when(wait(updaterDelay)) {
when(wait(self->db.serverInfo->onChange())) {
updaterDelay = Void();
}
when(wait(self->db.clusterType == ClusterType::METACLUSTER_MANAGEMENT ? updaterDelay : Never())) {
try {
wait(store(self->db.metaclusterMetrics,
metacluster::MetaclusterMetrics::getMetaclusterMetrics(self->cx)));
@ -2816,6 +2817,8 @@ ACTOR Future<Void> metaclusterMetricsUpdater(ClusterControllerData* self) {
throw;
}
}
updaterDelay = delay(60.0);
}
}
}

View File

@ -1202,8 +1202,8 @@ ACTOR Future<Void> readTransactionSystemState(Reference<ClusterRecoveryData> sel
Optional<Value> metaclusterRegistrationVal =
wait(self->txnStateStore->readValue(metacluster::metadata::metaclusterRegistration().key));
Optional<MetaclusterRegistrationEntry> metaclusterRegistration =
MetaclusterRegistrationEntry::decode(metaclusterRegistrationVal);
Optional<UnversionedMetaclusterRegistrationEntry> metaclusterRegistration =
UnversionedMetaclusterRegistrationEntry::decode(metaclusterRegistrationVal);
Optional<ClusterName> metaclusterName;
Optional<UID> metaclusterId;
Optional<ClusterName> clusterName;

View File

@ -3028,7 +3028,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
std::vector<NetworkAddress> incompatibleConnections,
Version datacenterVersionDifference,
ConfigBroadcaster const* configBroadcaster,
Optional<MetaclusterRegistrationEntry> metaclusterRegistration,
Optional<UnversionedMetaclusterRegistrationEntry> metaclusterRegistration,
metacluster::MetaclusterMetrics metaclusterMetrics) {
state double tStart = timer();
@ -3375,12 +3375,18 @@ ACTOR Future<StatusReply> clusterGetStatus(
if (metaclusterRegistration.get().clusterType == ClusterType::METACLUSTER_DATA) {
metacluster["data_cluster_name"] = metaclusterRegistration.get().name;
metacluster["data_cluster_id"] = metaclusterRegistration.get().id.toString();
} else { // clusterType == ClusterType::METACLUSTER_MANAGEMENT
} else if (!metaclusterMetrics.error.present()) { // clusterType == ClusterType::METACLUSTER_MANAGEMENT
metacluster["num_data_clusters"] = metaclusterMetrics.numDataClusters;
tenants["num_tenants"] = metaclusterMetrics.numTenants;
tenants["tenant_group_capacity"] = metaclusterMetrics.tenantGroupCapacity;
tenants["tenant_groups_allocated"] = metaclusterMetrics.tenantGroupsAllocated;
} else {
messages.push_back(JsonString::makeMessage(
"metacluster_metrics_missing",
fmt::format("Failed to fetch metacluster metrics: {}.", metaclusterMetrics.error.get())
.c_str()));
}
} else {
metacluster["cluster_type"] = clusterTypeToString(ClusterType::STANDALONE);
}

View File

@ -149,7 +149,7 @@ public:
AsyncVar<bool> blobRestoreEnabled;
ClusterType clusterType = ClusterType::STANDALONE;
Optional<ClusterName> metaclusterName;
Optional<MetaclusterRegistrationEntry> metaclusterRegistration;
Optional<UnversionedMetaclusterRegistrationEntry> metaclusterRegistration;
metacluster::MetaclusterMetrics metaclusterMetrics;
DBInfo()

View File

@ -53,7 +53,7 @@ Future<StatusReply> clusterGetStatus(
std::vector<NetworkAddress> const& incompatibleConnections,
Version const& datacenterVersionDifference,
ConfigBroadcaster const* const& conifgBroadcaster,
Optional<MetaclusterRegistrationEntry> const& metaclusterRegistration,
Optional<UnversionedMetaclusterRegistrationEntry> const& metaclusterRegistration,
metacluster::MetaclusterMetrics const& metaclusterMetrics);
struct WorkerEvents : std::map<NetworkAddress, TraceEventFields> {};

View File

@ -48,6 +48,8 @@ struct MetaclusterManagementWorkload : TestWorkload {
struct TenantTestData : ReferenceCounted<TenantTestData> {
ClusterName cluster;
Optional<TenantGroupName> tenantGroup;
Optional<UID> lockId;
TenantAPI::TenantLockState lockState = TenantAPI::TenantLockState::UNLOCKED;
TenantTestData() {}
TenantTestData(ClusterName cluster, Optional<TenantGroupName> tenantGroup)
@ -67,6 +69,7 @@ struct MetaclusterManagementWorkload : TestWorkload {
bool registered = false;
bool detached = false;
int tenantGroupCapacity = 0;
MetaclusterVersion version;
std::map<TenantName, Reference<TenantTestData>> tenants;
std::map<TenantGroupName, Reference<TenantGroupData>> tenantGroups;
@ -76,7 +79,11 @@ struct MetaclusterManagementWorkload : TestWorkload {
DataClusterData(Database db) : db(db) {}
};
bool metaclusterCreated;
Reference<IDatabase> managementDb;
MetaclusterVersion managementVersion;
std::map<ClusterName, Reference<DataClusterData>> dataDbs;
std::map<TenantGroupName, Reference<TenantGroupData>> tenantGroups;
std::set<TenantName> ungroupedTenants;
@ -87,17 +94,30 @@ struct MetaclusterManagementWorkload : TestWorkload {
int maxTenants;
int maxTenantGroups;
int64_t tenantIdPrefix;
bool allowTenantIdPrefixReuse;
int64_t tenantIdPrefix = -1;
std::set<int64_t> usedPrefixes;
double testDuration;
MetaclusterManagementWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
MetaclusterManagementWorkload(WorkloadContext const& wcx)
: TestWorkload(wcx), metaclusterCreated(deterministicRandom()->coinflip()) {
maxTenants = std::min<int>(1e8 - 1, getOption(options, "maxTenants"_sr, 1000));
maxTenantGroups = std::min<int>(2 * maxTenants, getOption(options, "maxTenantGroups"_sr, 20));
testDuration = getOption(options, "testDuration"_sr, 120.0);
tenantIdPrefix = getOption(options,
"tenantIdPrefix"_sr,
deterministicRandom()->randomInt(TenantAPI::TENANT_ID_PREFIX_MIN_VALUE,
TenantAPI::TENANT_ID_PREFIX_MAX_VALUE + 1));
allowTenantIdPrefixReuse = deterministicRandom()->coinflip();
MetaclusterRegistrationEntry::allowUnsupportedRegistrationWrites = true;
}
Optional<int64_t> generateTenantIdPrefix() {
for (int i = 0; i < 20; ++i) {
int64_t newPrefix = deterministicRandom()->randomInt(TenantAPI::TENANT_ID_PREFIX_MIN_VALUE,
TenantAPI::TENANT_ID_PREFIX_MAX_VALUE + 1);
if (allowTenantIdPrefixReuse || !usedPrefixes.count(newPrefix)) {
return newPrefix;
}
}
return false;
}
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override { out.insert("Attrition"); }
@ -114,21 +134,32 @@ struct MetaclusterManagementWorkload : TestWorkload {
}
}
ACTOR static Future<Void> _setup(Database cx, MetaclusterManagementWorkload* self) {
Reference<IDatabase> threadSafeHandle =
wait(unsafeThreadFutureToFuture(ThreadSafeDatabase::createFromExistingDatabase(cx)));
MultiVersionApi::api->selectApiVersion(cx->apiVersion.version());
self->managementDb = MultiVersionDatabase::debugCreateFromExistingDatabase(threadSafeHandle);
ASSERT(g_simulator->extraDatabases.size() > 0);
for (auto connectionString : g_simulator->extraDatabases) {
ClusterConnectionString ccs(connectionString);
self->dataDbIndex.push_back(ClusterName(format("cluster_%08d", self->dataDbs.size())));
self->dataDbs[self->dataDbIndex.back()] = makeReference<DataClusterData>(
Database::createSimulatedExtraDatabase(connectionString, cx->defaultTenant));
if (self->metaclusterCreated) {
self->tenantIdPrefix = self->generateTenantIdPrefix().get();
self->usedPrefixes.insert(self->tenantIdPrefix);
}
wait(success(
metacluster::createMetacluster(cx.getReference(), "management_cluster"_sr, self->tenantIdPrefix, false)));
metacluster::util::SimulatedMetacluster simMetacluster = wait(metacluster::util::createSimulatedMetacluster(
cx,
self->tenantIdPrefix,
Optional<metacluster::DataClusterEntry>(),
metacluster::util::SkipMetaclusterCreation(!self->metaclusterCreated)));
self->managementDb = simMetacluster.managementDb;
for (auto const& [name, db] : simMetacluster.dataDbs) {
self->dataDbIndex.push_back(name);
self->dataDbs[name] = makeReference<DataClusterData>(db);
}
if (self->metaclusterCreated) {
Optional<MetaclusterRegistrationEntry> registration =
wait(metacluster::metadata::metaclusterRegistration().get(self->managementDb));
ASSERT(registration.present());
self->managementVersion = registration.get().version;
}
return Void();
}
@ -149,6 +180,134 @@ struct MetaclusterManagementWorkload : TestWorkload {
return tenantGroup;
}
bool isValidVersion(Optional<Reference<DataClusterData>> dataDb = {}) {
bool managementValid = managementVersion >= MetaclusterVersion::MIN_SUPPORTED &&
managementVersion <= MetaclusterVersion::MAX_SUPPORTED;
bool dataValid = !dataDb.present() || (dataDb.get()->version >= MetaclusterVersion::MIN_SUPPORTED &&
dataDb.get()->version <= MetaclusterVersion::MAX_SUPPORTED);
return managementValid && dataValid;
}
ACTOR template <class DB>
static Future<Void> setMetaclusterVersion(Reference<DB> db, MetaclusterVersion version) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state Optional<UnversionedMetaclusterRegistrationEntry> registration =
wait(metacluster::metadata::unversionedMetaclusterRegistration().get(tr));
ASSERT(registration.present());
if (registration.get().version == version) {
return Void();
}
MetaclusterRegistrationEntry newRegistration(registration.get());
newRegistration.version = version;
metacluster::metadata::metaclusterRegistration().set(tr, newRegistration);
wait(safeThreadFutureToFuture(tr->commit()));
// The metacluster registration entry causes a recovery, so it cannot succeed
ASSERT(false);
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
Future<Void> checkAndResetMetaclusterVersion(ClusterName clusterName) {
auto dataDbItr = dataDbs.find(clusterName);
ASSERT(dataDbItr != dataDbs.end());
ASSERT(!isValidVersion(dataDbItr->second));
dataDbItr->second->version = (MetaclusterVersion)deterministicRandom()->randomInt(
(int)MetaclusterVersion::MIN_SUPPORTED, (int)MetaclusterVersion::MAX_SUPPORTED + 1);
return setMetaclusterVersion(dataDbItr->second->db.getReference(), dataDbItr->second->version);
}
ACTOR static Future<Void> createMetacluster(MetaclusterManagementWorkload* self) {
state ClusterName clusterName = "metacluster"_sr;
if (deterministicRandom()->random01() < 0.01) {
clusterName = clusterName.withPrefix("\xff"_sr);
}
try {
state Optional<int64_t> newTenantIdPrefix = self->generateTenantIdPrefix();
if (!newTenantIdPrefix.present()) {
return Void();
}
Optional<std::string> result =
wait(metacluster::createMetacluster(self->managementDb, clusterName, newTenantIdPrefix.get(), false));
ASSERT(!clusterName.startsWith("\xff"_sr));
if (!result.present()) {
ASSERT(!self->metaclusterCreated);
self->metaclusterCreated = true;
self->tenantIdPrefix = newTenantIdPrefix.get();
self->usedPrefixes.insert(self->tenantIdPrefix);
Optional<MetaclusterRegistrationEntry> registrationEntry =
wait(metacluster::metadata::metaclusterRegistration().get(self->managementDb));
ASSERT(registrationEntry.present());
self->managementVersion = registrationEntry.get().version;
} else {
ASSERT(self->metaclusterCreated);
}
} catch (Error& e) {
if (e.code() == error_code_invalid_cluster_name) {
ASSERT(clusterName.startsWith("\xff"_sr));
return Void();
} else if (e.code() == error_code_unsupported_metacluster_version) {
ASSERT(!self->isValidVersion());
return Void();
}
TraceEvent(SevError, "CreateMetaclusterFailure").error(e).detail("ClusterName", clusterName);
ASSERT(false);
}
return Void();
}
ACTOR static Future<Void> decommissionMetacluster(MetaclusterManagementWorkload* self) {
state bool empty = self->createdTenants.empty();
for (auto db : self->dataDbs) {
empty = empty && !db.second->registered;
}
try {
wait(metacluster::decommissionMetacluster(self->managementDb));
ASSERT(self->metaclusterCreated);
ASSERT(self->isValidVersion());
ASSERT(empty);
self->metaclusterCreated = false;
} catch (Error& e) {
if (e.code() == error_code_cluster_not_empty) {
ASSERT(!empty);
return Void();
} else if (e.code() == error_code_invalid_metacluster_operation) {
ASSERT(!self->metaclusterCreated);
return Void();
} else if (e.code() == error_code_unsupported_metacluster_version) {
ASSERT(!self->isValidVersion());
return Void();
}
TraceEvent(SevError, "DecommissionMetaclusterFailure").error(e);
ASSERT(false);
}
return Void();
}
ACTOR static Future<Void> registerCluster(MetaclusterManagementWorkload* self) {
state ClusterName clusterName = self->chooseClusterName();
state Reference<DataClusterData> dataDb = self->dataDbs[clusterName];
@ -207,6 +366,13 @@ struct MetaclusterManagementWorkload : TestWorkload {
ASSERT(!clusterName.startsWith("\xff"_sr));
ASSERT(!dataDb->registered);
ASSERT(!dataDb->detached || dataDb->tenants.empty());
ASSERT(self->isValidVersion());
ASSERT(self->metaclusterCreated);
Optional<MetaclusterRegistrationEntry> registration =
wait(metacluster::metadata::metaclusterRegistration().get(dataDb->db.getReference()));
ASSERT(registration.present());
dataDb->version = registration.get().version;
dataDb->tenantGroupCapacity = entry.capacity.numTenantGroups;
self->totalTenantGroupCapacity += entry.capacity.numTenantGroups;
@ -226,6 +392,12 @@ struct MetaclusterManagementWorkload : TestWorkload {
} else if (e.code() == error_code_invalid_cluster_name) {
ASSERT(clusterName.startsWith("\xff"_sr));
return Void();
} else if (e.code() == error_code_unsupported_metacluster_version) {
ASSERT(!self->isValidVersion(dataDb));
return Void();
} else if (e.code() == error_code_invalid_metacluster_operation) {
ASSERT(!self->metaclusterCreated);
return Void();
}
TraceEvent(SevError, "RegisterClusterFailure").error(e).detail("ClusterName", clusterName);
@ -262,6 +434,12 @@ struct MetaclusterManagementWorkload : TestWorkload {
ASSERT(!clusterMetadata.present());
break;
} else if (e.code() == error_code_unsupported_metacluster_version) {
if (!self->isValidVersion()) {
return Void();
} else {
wait(self->checkAndResetMetaclusterVersion(clusterName));
}
} else {
throw;
}
@ -270,6 +448,8 @@ struct MetaclusterManagementWorkload : TestWorkload {
ASSERT(dataDb->registered);
ASSERT(detachCluster || dataDb->tenants.empty());
ASSERT(self->isValidVersion(dataDb));
ASSERT(self->metaclusterCreated);
self->totalTenantGroupCapacity -= std::max<int64_t>(
dataDb->tenantGroups.size() + dataDb->ungroupedTenants.size(), dataDb->tenantGroupCapacity);
@ -299,6 +479,9 @@ struct MetaclusterManagementWorkload : TestWorkload {
} else if (e.code() == error_code_cluster_not_empty && !detachCluster) {
ASSERT(!dataDb->tenants.empty());
return Void();
} else if (e.code() == error_code_invalid_metacluster_operation) {
ASSERT(!self->metaclusterCreated);
return Void();
}
TraceEvent(SevError, "RemoveClusterFailure").error(e).detail("ClusterName", clusterName);
@ -323,7 +506,7 @@ struct MetaclusterManagementWorkload : TestWorkload {
return Void();
}
ACTOR static Future<Void> resolveCollisions(MetaclusterManagementWorkload* self,
ACTOR static Future<bool> resolveCollisions(MetaclusterManagementWorkload* self,
ClusterName clusterName,
Reference<DataClusterData> dataDb) {
state std::set<TenantName> tenantsToRemove;
@ -383,8 +566,7 @@ struct MetaclusterManagementWorkload : TestWorkload {
}
}
ASSERT(foundTenantCollision || foundGroupCollision);
return Void();
return foundTenantCollision || foundGroupCollision;
}
ACTOR static Future<Void> restoreCluster(MetaclusterManagementWorkload* self) {
@ -396,6 +578,7 @@ struct MetaclusterManagementWorkload : TestWorkload {
state std::vector<std::string> messages;
state bool retried = false;
loop {
try {
if (dataDb->detached) {
@ -426,12 +609,9 @@ struct MetaclusterManagementWorkload : TestWorkload {
continue;
}
ASSERT(self->metaclusterCreated);
ASSERT(dataDb->registered || dataDb->detached);
// Since we are not creating a new management cluster with updated tenant ID prefix, it will
// fail to repopulate a management cluster unless we force reuse of the tenant ID prefix or in
// some cases if the cluster is empty
ASSERT(forceReuseTenantIdPrefix || !dataDb->detached || dataDb->tenants.empty());
ASSERT(self->isValidVersion(dataDb));
if (dataDb->detached && !dryRun) {
dataDb->detached = false;
@ -465,13 +645,33 @@ struct MetaclusterManagementWorkload : TestWorkload {
error.code() == error_code_invalid_tenant_configuration) {
ASSERT(dataDb->detached);
wait(removeFailedRestoredCluster(self, clusterName));
wait(resolveCollisions(self, clusterName, dataDb));
continue;
bool resolvedCollisions = wait(resolveCollisions(self, clusterName, dataDb));
if (resolvedCollisions) {
continue;
} else {
// We attempted to restore a cluster with a tenant ID matching an existing tenant, which can
// happen if we create a new metacluster with the same ID as an old one
ASSERT(self->allowTenantIdPrefixReuse);
ASSERT(!messages.empty());
ASSERT(messages[0].find("has the same ID"));
return Void();
}
} else if (error.code() == error_code_invalid_metacluster_configuration) {
ASSERT(!forceReuseTenantIdPrefix && dataDb->detached);
wait(removeFailedRestoredCluster(self, clusterName));
forceReuseTenantIdPrefix = true;
continue;
} else if (error.code() == error_code_unsupported_metacluster_version) {
if (!self->isValidVersion()) {
return Void();
} else {
wait(self->checkAndResetMetaclusterVersion(clusterName));
continue;
}
} else if (e.code() == error_code_invalid_metacluster_operation) {
ASSERT(!self->metaclusterCreated);
return Void();
}
TraceEvent(SevError, "RestoreClusterFailure").error(error).detail("ClusterName", clusterName);
@ -492,6 +692,8 @@ struct MetaclusterManagementWorkload : TestWorkload {
wait(metacluster::listClusters(self->managementDb, clusterName1, clusterName2, limit));
ASSERT(clusterName1 <= clusterName2);
ASSERT(self->isValidVersion());
ASSERT(self->metaclusterCreated);
auto resultItr = clusterList.begin();
@ -514,7 +716,14 @@ struct MetaclusterManagementWorkload : TestWorkload {
if (e.code() == error_code_inverted_range) {
ASSERT(clusterName1 > clusterName2);
return Void();
} else if (e.code() == error_code_unsupported_metacluster_version) {
ASSERT(!self->isValidVersion());
return Void();
} else if (e.code() == error_code_invalid_metacluster_operation) {
ASSERT(!self->metaclusterCreated);
return Void();
}
TraceEvent(SevError, "ListClustersFailure")
.error(e)
.detail("BeginClusterName", clusterName1)
@ -533,13 +742,23 @@ struct MetaclusterManagementWorkload : TestWorkload {
try {
metacluster::DataClusterMetadata clusterMetadata =
wait(metacluster::getCluster(self->managementDb, clusterName));
ASSERT(dataDb->registered);
ASSERT(self->isValidVersion());
ASSERT(self->metaclusterCreated);
ASSERT(dataDb->db->getConnectionRecord()->getConnectionString() == clusterMetadata.connectionString);
} catch (Error& e) {
if (e.code() == error_code_cluster_not_found) {
ASSERT(!dataDb->registered);
return Void();
} else if (e.code() == error_code_unsupported_metacluster_version) {
ASSERT(!self->isValidVersion());
return Void();
} else if (e.code() == error_code_invalid_metacluster_operation) {
ASSERT(!self->metaclusterCreated);
return Void();
}
TraceEvent(SevError, "GetClusterFailure").error(e).detail("ClusterName", clusterName);
ASSERT(false);
}
@ -603,6 +822,9 @@ struct MetaclusterManagementWorkload : TestWorkload {
}
}
ASSERT(self->metaclusterCreated);
ASSERT(self->isValidVersion());
if (updatedEntry.present()) {
int allocatedGroups = dataDb->tenantGroups.size() + dataDb->ungroupedTenants.size();
int64_t tenantGroupDelta =
@ -613,6 +835,14 @@ struct MetaclusterManagementWorkload : TestWorkload {
dataDb->tenantGroupCapacity = updatedEntry.get().capacity.numTenantGroups;
}
} catch (Error& e) {
if (e.code() == error_code_unsupported_metacluster_version) {
ASSERT(!self->isValidVersion());
return Void();
} else if (e.code() == error_code_invalid_metacluster_operation) {
ASSERT(!self->metaclusterCreated);
return Void();
}
TraceEvent(SevError, "ConfigureClusterFailure").error(e).detail("ClusterName", clusterName);
ASSERT(false);
}
@ -729,6 +959,16 @@ struct MetaclusterManagementWorkload : TestWorkload {
}
throw error;
} else if (e.code() == error_code_unsupported_metacluster_version) {
if (!self->isValidVersion()) {
return Void();
} else {
Optional<metacluster::MetaclusterTenantMapEntry> entry =
wait(metacluster::tryGetTenant(self->managementDb, tenant));
ASSERT(entry.present());
wait(self->checkAndResetMetaclusterVersion(entry.get().assignedCluster));
continue;
}
} else {
throw;
}
@ -737,6 +977,7 @@ struct MetaclusterManagementWorkload : TestWorkload {
metacluster::MetaclusterTenantMapEntry entry = wait(metacluster::getTenant(self->managementDb, tenant));
ASSERT(self->metaclusterCreated);
ASSERT(!exists);
ASSERT(hasCapacity);
ASSERT(entry.tenantGroup == tenantGroup);
@ -750,6 +991,8 @@ struct MetaclusterManagementWorkload : TestWorkload {
ASSERT(assignedCluster != self->dataDbs.end());
ASSERT(assignedCluster->second->tenants.try_emplace(tenant, tenantData).second);
ASSERT(self->isValidVersion(assignedCluster->second));
if (tenantGroup.present()) {
auto tenantGroupData =
self->tenantGroups
@ -785,6 +1028,9 @@ struct MetaclusterManagementWorkload : TestWorkload {
ASSERT(itr != self->tenantGroups.end());
ASSERT(itr->second->cluster != tenantMapEntry.assignedCluster);
return Void();
} else if (e.code() == error_code_invalid_metacluster_operation) {
ASSERT(!self->metaclusterCreated);
return Void();
}
TraceEvent(SevError, "CreateTenantFailure").error(e).detail("TenantName", tenant);
@ -819,12 +1065,22 @@ struct MetaclusterManagementWorkload : TestWorkload {
wait(metacluster::tryGetTenant(self->managementDb, tenant));
ASSERT(!entry.present());
break;
} else if (e.code() == error_code_unsupported_metacluster_version) {
if (!self->isValidVersion()) {
return Void();
} else {
auto tenantItr = self->createdTenants.find(tenant);
ASSERT(tenantItr != self->createdTenants.end());
wait(self->checkAndResetMetaclusterVersion(tenantItr->second->cluster));
continue;
}
} else {
throw;
}
}
}
ASSERT(self->metaclusterCreated);
ASSERT(exists);
auto tenantData = self->createdTenants.find(tenant);
ASSERT(tenantData != self->createdTenants.end());
@ -846,6 +1102,7 @@ struct MetaclusterManagementWorkload : TestWorkload {
ASSERT(dataDb->registered);
auto tenantItr = dataDb->tenants.find(tenant);
ASSERT(tenantItr != dataDb->tenants.end());
ASSERT(self->isValidVersion(dataDb));
bool reducedAllocatedCount = false;
if (erasedTenantGroup) {
@ -866,6 +1123,9 @@ struct MetaclusterManagementWorkload : TestWorkload {
if (e.code() == error_code_tenant_not_found) {
ASSERT(!exists);
return Void();
} else if (e.code() == error_code_invalid_metacluster_operation) {
ASSERT(!self->metaclusterCreated);
return Void();
}
TraceEvent(SevError, "DeleteTenantFailure").error(e).detail("TenantName", tenant);
@ -902,18 +1162,38 @@ struct MetaclusterManagementWorkload : TestWorkload {
{ "assigned_cluster"_sr, newClusterName }, { "tenant_group"_sr, newTenantGroup }
};
state bool configChanged =
exists && (itr->second->tenantGroup != newTenantGroup || oldClusterName != newClusterName);
try {
loop {
Future<Void> configureFuture = metacluster::configureTenant(
self->managementDb, tenant, configurationParameters, ignoreCapacityLimit);
Optional<Void> result = wait(timeout(configureFuture, deterministicRandom()->randomInt(1, 30)));
try {
Optional<Void> result = wait(timeout(configureFuture, deterministicRandom()->randomInt(1, 30)));
if (result.present()) {
break;
if (result.present()) {
break;
}
wait(verifyListFilter(self, tenant, "configureTenant"));
} catch (Error& e) {
state Error error = e;
if (e.code() == error_code_unsupported_metacluster_version) {
if (!self->isValidVersion()) {
return Void();
} else {
auto tenantItr = self->createdTenants.find(tenant);
ASSERT(tenantItr != self->createdTenants.end());
wait(self->checkAndResetMetaclusterVersion(tenantItr->second->cluster));
continue;
}
}
throw error;
}
wait(verifyListFilter(self, tenant, "configureTenant"));
}
ASSERT(self->metaclusterCreated);
ASSERT(exists);
auto tenantData = self->createdTenants.find(tenant);
ASSERT(tenantData != self->createdTenants.end());
@ -921,6 +1201,12 @@ struct MetaclusterManagementWorkload : TestWorkload {
auto& dataDb = self->dataDbs[tenantData->second->cluster];
ASSERT(dataDb->registered);
if (configChanged) {
ASSERT(self->isValidVersion(dataDb));
} else {
ASSERT(self->isValidVersion());
}
bool allocationRemoved = false;
bool allocationAdded = false;
if (tenantData->second->tenantGroup != newTenantGroup) {
@ -980,6 +1266,9 @@ struct MetaclusterManagementWorkload : TestWorkload {
self->createdTenants[tenant]->cluster != self->tenantGroups[newTenantGroup.get()]->cluster);
}
return Void();
} else if (e.code() == error_code_invalid_metacluster_operation) {
ASSERT(!self->metaclusterCreated);
return Void();
}
TraceEvent(SevError, "ConfigureTenantFailure")
@ -992,6 +1281,87 @@ struct MetaclusterManagementWorkload : TestWorkload {
return Void();
}
ACTOR static Future<Void> lockTenant(MetaclusterManagementWorkload* self) {
state TenantName tenant = self->chooseTenantName();
state TenantAPI::TenantLockState lockState = (TenantAPI::TenantLockState)deterministicRandom()->randomInt(0, 3);
auto itr = self->createdTenants.find(tenant);
state bool exists = itr != self->createdTenants.end();
state UID lockId = exists && itr->second->lockId.present() && deterministicRandom()->coinflip()
? itr->second->lockId.get()
: deterministicRandom()->randomUniqueID();
try {
loop {
Future<Void> lockFuture =
metacluster::changeTenantLockState(self->managementDb, tenant, lockState, lockId);
try {
Optional<Void> result = wait(timeout(lockFuture, deterministicRandom()->randomInt(1, 30)));
if (result.present()) {
break;
}
} catch (Error& e) {
state Error error = e;
if (e.code() == error_code_unsupported_metacluster_version) {
if (!self->isValidVersion()) {
return Void();
} else {
auto tenantItr = self->createdTenants.find(tenant);
ASSERT(tenantItr != self->createdTenants.end());
wait(self->checkAndResetMetaclusterVersion(tenantItr->second->cluster));
continue;
}
}
throw error;
}
}
ASSERT(self->metaclusterCreated);
ASSERT(exists);
auto tenantData = self->createdTenants.find(tenant);
ASSERT(tenantData != self->createdTenants.end());
auto& dataDb = self->dataDbs[tenantData->second->cluster];
ASSERT(dataDb->registered);
if (lockState == tenantData->second->lockState) {
ASSERT(self->isValidVersion());
} else {
ASSERT(self->isValidVersion(dataDb));
}
if (lockState == TenantAPI::TenantLockState::UNLOCKED) {
tenantData->second->lockId = {};
} else {
tenantData->second->lockId = lockId;
}
tenantData->second->lockState = lockState;
} catch (Error& e) {
if (e.code() == error_code_tenant_not_found) {
ASSERT(!exists);
return Void();
} else if (e.code() == error_code_invalid_tenant_configuration) {
ASSERT(exists);
return Void();
} else if (e.code() == error_code_invalid_metacluster_operation) {
ASSERT(!self->metaclusterCreated);
return Void();
}
TraceEvent(SevError, "LockTenantFailure")
.error(e)
.detail("TenantName", tenant)
.detail("LockState", TenantAPI::tenantLockStateToString(lockState))
.detail("LockId", lockId);
ASSERT(false);
}
return Void();
}
ACTOR static Future<Void> renameTenant(MetaclusterManagementWorkload* self) {
state TenantName tenant = self->chooseTenantName();
state TenantName newTenantName = self->chooseTenantName();
@ -1016,15 +1386,28 @@ struct MetaclusterManagementWorkload : TestWorkload {
retried = true;
wait(verifyListFilter(self, tenant, "renameTenant"));
} catch (Error& e) {
state Error error = e;
// If we retry the rename after it had succeeded, we will get an error that we should ignore
if (e.code() == error_code_tenant_not_found && exists && !newTenantExists && retried) {
break;
} else if (e.code() == error_code_unsupported_metacluster_version) {
if (!self->isValidVersion()) {
return Void();
} else {
auto tenantItr = self->createdTenants.find(tenant);
ASSERT(tenantItr != self->createdTenants.end());
wait(self->checkAndResetMetaclusterVersion(tenantItr->second->cluster));
continue;
}
}
throw e;
throw error;
}
}
wait(verifyListFilter(self, newTenantName, "renameTenantNew"));
ASSERT(self->metaclusterCreated);
ASSERT(exists);
ASSERT(!newTenantExists);
@ -1047,6 +1430,7 @@ struct MetaclusterManagementWorkload : TestWorkload {
auto& dataDb = self->dataDbs[newEntry.assignedCluster];
ASSERT(dataDb->registered);
ASSERT(self->isValidVersion(dataDb));
dataDb->tenants.erase(tenant);
dataDb->tenants[newTenantName] = tenantData;
@ -1068,6 +1452,9 @@ struct MetaclusterManagementWorkload : TestWorkload {
} else if (e.code() == error_code_tenant_already_exists) {
ASSERT(newTenantExists);
return Void();
} else if (e.code() == error_code_invalid_metacluster_operation) {
ASSERT(!self->metaclusterCreated);
return Void();
}
TraceEvent(SevError, "RenameTenantFailure")
@ -1080,6 +1467,27 @@ struct MetaclusterManagementWorkload : TestWorkload {
return Void();
}
ACTOR static Future<Void> changeClusterVersion(MetaclusterManagementWorkload* self) {
state MetaclusterVersion newVersion = (MetaclusterVersion)deterministicRandom()->randomInt(
(int)MetaclusterVersion::BEGIN, (int)MetaclusterVersion::END + 1);
if (deterministicRandom()->coinflip()) {
if (self->metaclusterCreated) {
self->managementVersion = newVersion;
wait(self->setMetaclusterVersion(self->managementDb, self->managementVersion));
}
} else {
ClusterName clusterName = self->chooseClusterName();
auto& dataDb = self->dataDbs[clusterName];
if (dataDb->registered) {
dataDb->version = newVersion;
wait(self->setMetaclusterVersion(dataDb->db.getReference(), dataDb->version));
}
}
return Void();
}
Future<Void> start(Database const& cx) override {
if (clientId == 0) {
return _start(cx, this);
@ -1092,27 +1500,35 @@ struct MetaclusterManagementWorkload : TestWorkload {
// Run a random sequence of operations for the duration of the test
while (now() < start + self->testDuration) {
state int operation = deterministicRandom()->randomInt(0, 10);
state int operation = deterministicRandom()->randomInt(0, 14);
if (operation == 0) {
wait(registerCluster(self));
wait(createMetacluster(self));
} else if (operation == 1) {
wait(removeCluster(self));
wait(decommissionMetacluster(self));
} else if (operation == 2) {
wait(listClusters(self));
wait(registerCluster(self));
} else if (operation == 3) {
wait(getCluster(self));
wait(removeCluster(self));
} else if (operation == 4) {
wait(configureCluster(self));
wait(listClusters(self));
} else if (operation == 5) {
wait(createTenant(self));
wait(getCluster(self));
} else if (operation == 6) {
wait(deleteTenant(self));
wait(configureCluster(self));
} else if (operation == 7) {
wait(configureTenant(self));
} else if (operation == 8) {
wait(renameTenant(self));
} else if (operation == 9) {
wait(restoreCluster(self));
} else if (operation == 8) {
wait(createTenant(self));
} else if (operation == 9) {
wait(deleteTenant(self));
} else if (operation == 10) {
wait(configureTenant(self));
} else if (operation == 11) {
wait(lockTenant(self));
} else if (operation == 12) {
wait(renameTenant(self));
} else if (operation == 13) {
wait(changeClusterVersion(self));
}
}
@ -1165,7 +1581,7 @@ struct MetaclusterManagementWorkload : TestWorkload {
return Void();
}
ACTOR static Future<Void> decommissionMetacluster(MetaclusterManagementWorkload* self) {
ACTOR static Future<Void> teardownMetacluster(MetaclusterManagementWorkload* self) {
state Reference<ITransaction> tr = self->managementDb->createTransaction();
state bool deleteTenants = deterministicRandom()->coinflip();
@ -1212,36 +1628,64 @@ struct MetaclusterManagementWorkload : TestWorkload {
}
}
ACTOR static Future<bool> _check(MetaclusterManagementWorkload* self) {
// The metacluster consistency check runs the tenant consistency check for each cluster
state metacluster::util::MetaclusterConsistencyCheck<IDatabase> metaclusterConsistencyCheck(
self->managementDb, metacluster::util::AllowPartialMetaclusterOperations::False);
wait(metaclusterConsistencyCheck.run());
std::map<ClusterName, metacluster::DataClusterMetadata> dataClusters = wait(
metacluster::listClusters(self->managementDb, ""_sr, "\xff\xff"_sr, CLIENT_KNOBS->MAX_DATA_CLUSTERS + 1));
int totalTenantGroupsAllocated = 0;
std::vector<Future<Void>> dataClusterChecks;
for (auto [clusterName, dataClusterData] : self->dataDbs) {
auto dataClusterItr = dataClusters.find(clusterName);
if (dataClusterData->registered) {
ASSERT(dataClusterItr != dataClusters.end());
ASSERT(dataClusterItr->second.entry.capacity.numTenantGroups == dataClusterData->tenantGroupCapacity);
totalTenantGroupsAllocated +=
dataClusterData->tenantGroups.size() + dataClusterData->ungroupedTenants.size();
} else {
ASSERT(dataClusterItr == dataClusters.end());
}
dataClusterChecks.push_back(checkDataCluster(self, clusterName, dataClusterData));
std::vector<Future<Void>> setVersionFutures;
if (self->metaclusterCreated) {
self->managementVersion = MetaclusterVersion::MAX_SUPPORTED;
setVersionFutures.push_back(self->setMetaclusterVersion(self->managementDb, self->managementVersion));
}
for (auto& [name, dataDb] : self->dataDbs) {
if (dataDb->registered) {
dataDb->version = MetaclusterVersion::MAX_SUPPORTED;
setVersionFutures.push_back(self->setMetaclusterVersion(dataDb->db.getReference(), dataDb->version));
}
}
auto capacityNumbers = metacluster::util::metaclusterCapacity(dataClusters);
ASSERT(capacityNumbers.first.numTenantGroups == self->totalTenantGroupCapacity);
ASSERT(capacityNumbers.second.numTenantGroups == totalTenantGroupsAllocated);
wait(waitForAll(dataClusterChecks));
wait(waitForAll(setVersionFutures));
wait(decommissionMetacluster(self));
// The metacluster consistency check runs the tenant consistency check for each cluster
if (self->metaclusterCreated) {
state metacluster::util::MetaclusterConsistencyCheck<IDatabase> metaclusterConsistencyCheck(
self->managementDb, metacluster::util::AllowPartialMetaclusterOperations::False);
wait(metaclusterConsistencyCheck.run());
std::map<ClusterName, metacluster::DataClusterMetadata> dataClusters = wait(metacluster::listClusters(
self->managementDb, ""_sr, "\xff\xff"_sr, CLIENT_KNOBS->MAX_DATA_CLUSTERS + 1));
int totalTenantGroupsAllocated = 0;
std::vector<Future<Void>> dataClusterChecks;
for (auto [clusterName, dataClusterData] : self->dataDbs) {
auto dataClusterItr = dataClusters.find(clusterName);
if (dataClusterData->registered) {
ASSERT(dataClusterItr != dataClusters.end());
ASSERT(dataClusterItr->second.entry.capacity.numTenantGroups ==
dataClusterData->tenantGroupCapacity);
totalTenantGroupsAllocated +=
dataClusterData->tenantGroups.size() + dataClusterData->ungroupedTenants.size();
} else {
ASSERT(dataClusterItr == dataClusters.end());
}
dataClusterChecks.push_back(checkDataCluster(self, clusterName, dataClusterData));
}
auto capacityNumbers = metacluster::util::metaclusterCapacity(dataClusters);
ASSERT(capacityNumbers.first.numTenantGroups == self->totalTenantGroupCapacity);
ASSERT(capacityNumbers.second.numTenantGroups == totalTenantGroupsAllocated);
wait(waitForAll(dataClusterChecks));
wait(teardownMetacluster(self));
} else {
Optional<MetaclusterRegistrationEntry> managementRegistration =
wait(metacluster::metadata::metaclusterRegistration().get(self->managementDb));
ASSERT(!managementRegistration.present());
state std::map<ClusterName, Reference<DataClusterData>>::iterator dataDbItr;
for (dataDbItr = self->dataDbs.begin(); dataDbItr != self->dataDbs.end(); ++dataDbItr) {
ASSERT(!dataDbItr->second->registered);
Optional<MetaclusterRegistrationEntry> dataDbRegistration =
wait(metacluster::metadata::metaclusterRegistration().get(dataDbItr->second->db.getReference()));
ASSERT(!dataDbRegistration.present());
}
}
return true;
}

View File

@ -260,8 +260,8 @@ struct TenantManagementWorkload : TestWorkload {
self->mvDb = simMetacluster.managementDb;
if (self->useMetacluster) {
self->dataClusterName = simMetacluster.dataDbs.begin()->first;
ASSERT_EQ(simMetacluster.dataDbs.size(), 1);
self->dataClusterName = simMetacluster.dataDbs.begin()->first;
self->dataDb = simMetacluster.dataDbs.begin()->second;
} else {
self->dataDb = cx;

View File

@ -285,6 +285,7 @@ ERROR( invalid_data_cluster, 2171, "The data cluster being restored has no recor
ERROR( metacluster_mismatch, 2172, "The cluster does not have the expected name or is associated with a different metacluster" )
ERROR( conflicting_restore, 2173, "Another restore is running for the same data cluster" )
ERROR( invalid_metacluster_configuration, 2174, "Metacluster configuration is invalid" )
ERROR( unsupported_metacluster_version, 2175, "Client is not compatible with the metacluster" )
// 2200 - errors from bindings and official APIs
ERROR( api_version_unset, 2200, "API version is not set" )

View File

@ -57,6 +57,13 @@ ACTOR Future<MetaclusterMetrics> getMetaclusterMetricsImpl(Database db) {
return metrics;
} catch (Error& e) {
TraceEvent("MetaclusterUpdaterError").error(e);
if (e.code() == error_code_unsupported_metacluster_version) {
TraceEvent(SevWarnAlways, "MetaclusterMetricsFailure").error(e);
MetaclusterMetrics metrics;
metrics.error = e.what();
return metrics;
}
wait(tr->onError(e));
}
}

View File

@ -99,7 +99,8 @@ Future<Optional<std::string>> createMetacluster(Reference<DB> db,
metaclusterUid = deterministicRandom()->randomUniqueID();
}
metadata::metaclusterRegistration().set(tr, MetaclusterRegistrationEntry(name, metaclusterUid.get()));
metadata::metaclusterRegistration().set(
tr, MetaclusterRegistrationEntry(name, metaclusterUid.get(), MetaclusterVersion::V1));
TenantMetadata::tenantIdPrefix().set(tr, tenantIdPrefix);

View File

@ -80,6 +80,9 @@ private:
ASSERT_EQ(data.metaclusterRegistration.get().clusterType, ClusterType::METACLUSTER_MANAGEMENT);
ASSERT(data.metaclusterRegistration.get().id == data.metaclusterRegistration.get().metaclusterId &&
data.metaclusterRegistration.get().name == data.metaclusterRegistration.get().metaclusterName);
ASSERT_GE(data.metaclusterRegistration.get().version, MetaclusterVersion::MIN_SUPPORTED);
ASSERT_LE(data.metaclusterRegistration.get().version, MetaclusterVersion::MAX_SUPPORTED);
ASSERT_LE(data.dataClusters.size(), CLIENT_KNOBS->MAX_DATA_CLUSTERS);
ASSERT_LE(data.tenantData.tenantCount, metaclusterMaxTenants);
ASSERT(data.clusterTenantCounts.results.size() <= data.dataClusters.size() && !data.clusterTenantCounts.more);
@ -198,6 +201,9 @@ private:
ASSERT(data.metaclusterRegistration.get().matches(managementData.metaclusterRegistration.get()));
ASSERT(data.metaclusterRegistration.get().name == clusterName);
ASSERT(data.metaclusterRegistration.get().id == clusterMetadata.entry.id);
ASSERT_GE(data.metaclusterRegistration.get().version, MetaclusterVersion::MIN_SUPPORTED);
ASSERT_LE(data.metaclusterRegistration.get().version, MetaclusterVersion::MAX_SUPPORTED);
ASSERT_EQ(data.metaclusterRegistration.get().version, managementData.metaclusterRegistration.get().version);
if (data.tenantData.lastTenantId >= 0) {
ASSERT_EQ(TenantAPI::getTenantIdPrefix(data.tenantData.lastTenantId), managementData.tenantIdPrefix);

View File

@ -32,6 +32,8 @@ struct MetaclusterMetrics {
int tenantGroupCapacity = 0;
int tenantGroupsAllocated = 0;
Optional<std::string> error;
MetaclusterMetrics() = default;
static Future<MetaclusterMetrics> getMetaclusterMetrics(Database db);