Cleanup metadata associated with a cluster when force removing it. Move some metacluster metadata into the normal key-space. Cleanup some todos.

This commit is contained in:
A.J. Beamon 2022-06-24 14:55:57 -07:00
parent 9260367cac
commit 4b69723a2f
9 changed files with 210 additions and 38 deletions

View File

@ -245,6 +245,9 @@ ACTOR Future<bool> metaclusterGetCommand(Reference<IDatabase> db, std::vector<St
fmt::print(" connection string: {}\n", metadata.connectionString.toString().c_str());
fmt::print(" tenant group capacity: {}\n", metadata.entry.capacity.numTenantGroups);
fmt::print(" allocated tenant groups: {}\n", metadata.entry.allocated.numTenantGroups);
if (metadata.entry.locked) {
fmt::print(" locked: true\n");
}
}
} catch (Error& e) {
if (useJson) {

View File

@ -289,6 +289,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( MAX_TENANTS_PER_CLUSTER, 1e6 );
init( MAX_DATA_CLUSTERS, 1e5 );
init( REMOVE_CLUSTER_TENANT_BATCH_SIZE, 1e4 ); if ( randomize && BUGGIFY ) REMOVE_CLUSTER_TENANT_BATCH_SIZE = 1;
// clang-format on
}

View File

@ -280,6 +280,7 @@ public:
int MAX_TENANTS_PER_CLUSTER;
int MAX_DATA_CLUSTERS;
int REMOVE_CLUSTER_TENANT_BATCH_SIZE;
ClientKnobs(Randomize randomize);
void initialize(Randomize randomize);

View File

@ -60,6 +60,10 @@ struct DataClusterEntry {
ClusterUsage capacity;
ClusterUsage allocated;
// If true, then tenants cannot be assigned to this cluster. This is used when a cluster is being forcefully
// removed.
bool locked = false;
DataClusterEntry() = default;
DataClusterEntry(ClusterUsage capacity) : capacity(capacity) {}
DataClusterEntry(UID id, ClusterUsage capacity, ClusterUsage allocated)
@ -84,12 +88,15 @@ struct DataClusterEntry {
json_spirit::mObject obj;
obj["capacity"] = capacity.toJson();
obj["allocated"] = allocated.toJson();
if (locked) {
obj["locked"] = locked;
}
return obj;
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id, capacity, allocated);
serializer(ar, id, capacity, allocated, locked);
}
};

View File

@ -37,4 +37,23 @@ ACTOR Future<Reference<IDatabase>> openDatabase(ClusterConnectionString connecti
return MultiVersionApi::api->createDatabase(clusterFile);
}
}
Key getDataClusterTenantIndexKey(ClusterNameRef cluster, Optional<TenantNameRef> tenant) {
Tuple tuple;
tuple.append(cluster);
if (tenant.present()) {
tuple.append(tenant.get());
}
return dataClusterTenantIndexKeys.begin.withSuffix(tuple.pack());
}
Key getDataClusterTenantGroupIndexKey(ClusterNameRef cluster, Optional<TenantGroupNameRef> tenantGroup) {
Tuple tuple;
tuple.append(cluster);
if (tenantGroup.present()) {
tuple.append(tenantGroup.get());
}
return dataClusterTenantGroupIndexKeys.begin.withSuffix(tuple.pack());
}
}; // namespace MetaclusterAPI

View File

@ -32,6 +32,7 @@
#include "fdbclient/Metacluster.h"
#include "fdbclient/MultiVersionTransaction.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/TenantManagement.actor.h"
#include "fdbclient/VersionedMap.h"
#include "flow/flat_buffers.h"
#include "flow/actorcompiler.h" // has to be last include
@ -84,10 +85,13 @@ namespace MetaclusterAPI {
ACTOR Future<Reference<IDatabase>> openDatabase(ClusterConnectionString connectionString);
Key getDataClusterTenantIndexKey(ClusterNameRef cluster, Optional<TenantNameRef> tenant);
Key getDataClusterTenantGroupIndexKey(ClusterNameRef cluster, Optional<TenantGroupNameRef> tenantGroup);
ACTOR template <class Transaction>
Future<Optional<DataClusterMetadata>> tryGetClusterTransaction(Transaction tr, ClusterNameRef name) {
state Key dataClusterMetadataKey = name.withPrefix(dataClusterMetadataPrefix);
state Key dataClusterConnectionRecordKey = name.withPrefix(dataClusterConnectionRecordPrefix);
state Key dataClusterMetadataKey = dataClusterMetadataPrefix.withSuffix(name);
state Key dataClusterConnectionRecordKey = dataClusterConnectionRecordPrefix.withSuffix(name);
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
@ -269,10 +273,10 @@ void updateClusterMetadata(Transaction tr,
Optional<DataClusterEntry> updatedEntry) {
if (updatedEntry.present()) {
tr->set(name.withPrefix(dataClusterMetadataPrefix), updatedEntry.get().encode());
tr->set(dataClusterMetadataPrefix.withSuffix(name), updatedEntry.get().encode());
}
if (updatedConnectionString.present()) {
tr->set(name.withPrefix(dataClusterConnectionRecordPrefix), updatedConnectionString.get().toString());
tr->set(dataClusterConnectionRecordPrefix.withSuffix(name), updatedConnectionString.get().toString());
}
}
@ -306,8 +310,8 @@ Future<Void> managementClusterRegister(Transaction tr,
ClusterNameRef name,
ClusterConnectionString connectionString,
DataClusterEntry entry) {
state Key dataClusterMetadataKey = name.withPrefix(dataClusterMetadataPrefix);
state Key dataClusterConnectionRecordKey = name.withPrefix(dataClusterConnectionRecordPrefix);
state Key dataClusterMetadataKey = dataClusterMetadataPrefix.withSuffix(name);
state Key dataClusterConnectionRecordKey = dataClusterConnectionRecordPrefix.withSuffix(name);
std::pair<MetaclusterRegistrationEntry, bool> result =
wait(managementClusterRegisterPrecheck(tr, name, DataClusterMetadata(entry, connectionString)));
@ -485,26 +489,127 @@ Future<Void> restoreCluster(Reference<DB> db,
}
}
// Returns the cluster metadata for the cluster being deleted, as well as a boolean that will be true if the entry
// has been removed. If false, then it's the responsibility of the caller to purge the data cluster from the management
// cluster.
ACTOR template <class Transaction>
Future<Optional<DataClusterMetadata>> managementClusterRemove(Transaction tr, ClusterNameRef name, bool checkEmpty) {
state Key dataClusterMetadataKey = name.withPrefix(dataClusterMetadataPrefix);
state Key dataClusterConnectionRecordKey = name.withPrefix(dataClusterConnectionRecordPrefix);
Future<std::pair<Optional<DataClusterMetadata>, bool>> managementClusterRemove(Transaction tr,
ClusterNameRef name,
bool checkEmpty) {
state Key dataClusterMetadataKey = dataClusterMetadataPrefix.withSuffix(name);
state Key dataClusterConnectionRecordKey = dataClusterConnectionRecordPrefix.withSuffix(name);
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
state Optional<DataClusterMetadata> metadata = wait(tryGetClusterTransaction(tr, name));
if (!metadata.present()) {
return metadata;
return std::make_pair(metadata, true);
}
if (checkEmpty && metadata.get().entry.allocated.numTenantGroups > 0) {
throw cluster_not_empty();
} else if (metadata.get().entry.allocated.numTenantGroups == 0) {
tr->clear(dataClusterMetadataKey);
tr->clear(dataClusterConnectionRecordKey);
return std::make_pair(metadata, true);
} else {
// We need to clean up the tenant metadata for this cluster before erasing it. While we are doing that,
// lock the entry to prevent other assignments.
DataClusterEntry updatedEntry = metadata.get().entry;
updatedEntry.locked = true;
updateClusterMetadata(tr, name, Optional<ClusterConnectionString>(), updatedEntry);
return std::make_pair(metadata, false);
}
}
ACTOR template <class DB>
Future<Void> managementClusterPurgeDataCluster(Reference<DB> db, ClusterNameRef name, UID dataClusterId) {
// Remove all records associated with this cluster from the management cluster
state KeyRange tenantIndexKeys = prefixRange(getDataClusterTenantIndexKey(name, Optional<TenantNameRef>()));
state KeyRange tenantGroupIndexKeys =
prefixRange(getDataClusterTenantGroupIndexKey(name, Optional<TenantGroupNameRef>()));
state Reference<typename DB::TransactionT> tr = db->createTransaction();
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state typename DB::TransactionT::template FutureT<RangeResult> tenantEntriesFuture =
tr->getRange(tenantIndexKeys, CLIENT_KNOBS->REMOVE_CLUSTER_TENANT_BATCH_SIZE);
Optional<DataClusterMetadata> clusterMetadata = wait(tryGetClusterTransaction(tr, name));
if (!clusterMetadata.present() && clusterMetadata.get().entry.id != dataClusterId) {
// Someone else must have already done the purge
return Void();
}
state RangeResult tenantEntries = wait(safeThreadFutureToFuture(tenantEntriesFuture));
if (tenantEntries.empty()) {
break;
}
for (auto entry : tenantEntries) {
Tuple result = Tuple::unpack(entry.key.removePrefix(dataClusterTenantIndexKeys.begin));
ASSERT(result.getString(0) == name);
tr->clear(tenantMapPrefix.withSuffix(result.getString(1)));
}
tr->clear(KeyRangeRef(tenantIndexKeys.begin, keyAfter(tenantEntries.rbegin()->key)));
wait(buggifiedCommit(tr, BUGGIFY));
tr->reset();
if (!tenantEntries.more) {
break;
}
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
tr->clear(dataClusterMetadataKey);
tr->clear(dataClusterConnectionRecordKey);
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state typename DB::TransactionT::template FutureT<RangeResult> tenantGroupEntriesFuture =
tr->getRange(tenantGroupIndexKeys, CLIENT_KNOBS->REMOVE_CLUSTER_TENANT_BATCH_SIZE);
return metadata;
Optional<DataClusterMetadata> clusterMetadata = wait(tryGetClusterTransaction(tr, name));
if (!clusterMetadata.present() && clusterMetadata.get().entry.id != dataClusterId) {
// Someone else must have already done the purge
return Void();
}
state RangeResult tenantGroupEntries = wait(safeThreadFutureToFuture(tenantGroupEntriesFuture));
for (auto entry : tenantGroupEntries) {
Tuple result = Tuple::unpack(entry.key.removePrefix(dataClusterTenantGroupIndexKeys.begin));
ASSERT(result.getString(0) == name);
TenantGroupName tenantGroup = result.getString(1);
tr->clear(prefixRange(TenantAPI::getTenantGroupIndexKey(tenantGroup, Optional<TenantNameRef>())));
tr->clear(tenantGroupMetadataKeys.begin.withSuffix(tenantGroup));
}
if (!tenantGroupEntries.empty()) {
tr->clear(KeyRangeRef(tenantGroupIndexKeys.begin, keyAfter(tenantGroupEntries.rbegin()->key)));
}
if (!tenantGroupEntries.more) {
tr->clear(dataClusterMetadataPrefix.withSuffix(name));
tr->clear(dataClusterConnectionRecordPrefix.withSuffix(name));
}
wait(buggifiedCommit(tr, BUGGIFY));
tr->reset();
if (!tenantGroupEntries.more) {
TraceEvent("RemovedDataCluster").detail("Name", name).detail("Version", tr->getCommittedVersion());
break;
}
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
return Void();
}
ACTOR template <class Transaction>
@ -549,13 +654,15 @@ Future<Void> removeCluster(Reference<DB> db, ClusterName name, bool forceRemove)
state DataClusterMetadata metadata;
state Optional<int64_t> lastTenantId;
state Optional<UID> removedId;
state bool hasBeenPurged = false;
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
Optional<DataClusterMetadata> _metadata = wait(managementClusterRemove(tr, name, !forceRemove));
if (!_metadata.present()) {
std::pair<Optional<DataClusterMetadata>, bool> result =
wait(managementClusterRemove(tr, name, !forceRemove));
if (!result.first.present()) {
if (!removedId.present()) {
throw cluster_not_found();
} else {
@ -563,7 +670,7 @@ Future<Void> removeCluster(Reference<DB> db, ClusterName name, bool forceRemove)
}
}
metadata = _metadata.get();
metadata = result.first.get();
if (!removedId.present()) {
removedId = metadata.entry.id;
} else if (removedId.get() != metadata.entry.id) {
@ -571,6 +678,7 @@ Future<Void> removeCluster(Reference<DB> db, ClusterName name, bool forceRemove)
return Void();
}
hasBeenPurged = result.second;
if (forceRemove) {
state typename DB::TransactionT::template FutureT<Optional<Value>> lastIdFuture =
tr->get(tenantLastIdKey);
@ -582,7 +690,11 @@ Future<Void> removeCluster(Reference<DB> db, ClusterName name, bool forceRemove)
wait(buggifiedCommit(tr, BUGGIFY));
TraceEvent("RemovedDataCluster").detail("Name", name).detail("Version", tr->getCommittedVersion());
if (hasBeenPurged) {
TraceEvent("RemovedDataCluster").detail("Name", name).detail("Version", tr->getCommittedVersion());
} else {
TraceEvent("LockedDataCluster").detail("Name", name).detail("Version", tr->getCommittedVersion());
}
break;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
@ -591,7 +703,13 @@ Future<Void> removeCluster(Reference<DB> db, ClusterName name, bool forceRemove)
ASSERT(removedId.present());
// Step 2: Reconfigure data cluster and remove metadata.
// Step 2: Purge all metadata associated with the data cluster from the management cluster if this was not already
// completed in step 1.
if (!hasBeenPurged) {
wait(managementClusterPurgeDataCluster(db, name, removedId.get()));
}
// Step 3: Update the data cluster to mark it as removed.
// Note that this is best effort; if it fails the cluster will still have been removed.
state Reference<IDatabase> dataClusterDb = wait(openDatabase(metadata.connectionString));
state Reference<ITransaction> dataClusterTr = dataClusterDb->createTransaction();
@ -782,11 +900,18 @@ Future<Void> createTenant(Reference<DB> db, TenantName name, TenantMapEntry tena
Optional<DataClusterMetadata> actualMetadata =
wait(tryGetClusterTransaction(assignTr, createdTenant.assignedCluster.get()));
// TODO: move the tenant to an error state?
// A cluster cannot be removed through these APIs unless it has no tenants assigned to it.
ASSERT(actualMetadata.present());
clusterMetadata = actualMetadata.get();
}
} else {
assignTr->set(getDataClusterTenantIndexKey(createdTenant.assignedCluster.get(), name), ""_sr);
if (tenantEntry.tenantGroup.present()) {
assignTr->set(getDataClusterTenantGroupIndexKey(createdTenant.assignedCluster.get(),
tenantEntry.tenantGroup.get()),
""_sr);
}
assignTr->set(tenantLastIdKey, TenantMapEntry::idToPrefix(tenantEntry.id));
wait(buggifiedCommit(assignTr, BUGGIFY));
}
@ -857,10 +982,9 @@ Future<Void> deleteTenant(Reference<DB> db, TenantName name) {
Optional<DataClusterMetadata> _clusterMetadata =
wait(tryGetClusterTransaction(managementTr, tenantEntry1.get().assignedCluster.get()));
if (!_clusterMetadata.present()) {
// TODO: better error
throw operation_failed();
}
// A cluster cannot be removed through these APIs unless it has no tenants assigned to it.
ASSERT(_clusterMetadata.present());
clusterMetadata = _clusterMetadata.get();
alreadyRemoving = tenantEntry1.get().tenantState == TenantState::REMOVING;
} else {
@ -957,12 +1081,21 @@ Future<Void> deleteTenant(Reference<DB> db, TenantName name) {
state bool decrementTenantGroupCount =
finalClusterMetadata.present() && !tenantEntry3.get().tenantGroup.present();
if (finalClusterMetadata.present() && tenantEntry3.get().tenantGroup.present()) {
RangeResult result = wait(safeThreadFutureToFuture(tenantGroupIndexFuture));
if (result.size() == 0) {
managementTr->clear(
tenantGroupMetadataKeys.begin.withSuffix(tenantEntry3.get().tenantGroup.get()));
decrementTenantGroupCount = true;
if (finalClusterMetadata.present()) {
managementTr->clear(getDataClusterTenantIndexKey(tenantEntry3.get().assignedCluster.get(), name),
""_sr);
if (tenantEntry3.get().tenantGroup.present()) {
managementTr->clear(getDataClusterTenantGroupIndexKey(tenantEntry3.get().assignedCluster.get(),
tenantEntry3.get().tenantGroup.get()),
""_sr);
RangeResult result = wait(safeThreadFutureToFuture(tenantGroupIndexFuture));
if (result.size() == 0) {
managementTr->clear(
tenantGroupMetadataKeys.begin.withSuffix(tenantEntry3.get().tenantGroup.get()));
decrementTenantGroupCount = true;
}
}
}
if (decrementTenantGroupCount) {

View File

@ -1425,15 +1425,20 @@ const KeyRangeRef tenantGroupTenantIndexKeys("\xff/tenant/tenantGroup/tenantMap/
const KeyRangeRef tenantTombstoneKeys("\xff/tenant/tombstones/"_sr, "\xff/tenant/tombstones0"_sr);
// Metacluster management cluster keys
const KeyRangeRef dataClusterMetadataKeys("\xff/metacluster/dataCluster/metadata/"_sr,
"\xff/metacluster/dataCluster/metadata0"_sr);
const KeyRangeRef dataClusterMetadataKeys("metacluster/dataCluster/metadata/"_sr,
"metacluster/dataCluster/metadata0"_sr);
const KeyRef dataClusterMetadataPrefix = dataClusterMetadataKeys.begin;
const KeyRangeRef dataClusterConnectionRecordKeys("\xff/metacluster/dataCluster/connectionString/"_sr,
"\xff/metacluster/dataCluster/connectionString0"_sr);
const KeyRangeRef dataClusterConnectionRecordKeys("metacluster/dataCluster/connectionString/"_sr,
"metacluster/dataCluster/connectionString0"_sr);
const KeyRef dataClusterConnectionRecordPrefix = dataClusterConnectionRecordKeys.begin;
const KeyRangeRef tenantGroupMetadataKeys("\xff/metacluster/tenantGroup/metadata/"_sr,
"\xff/metacluster/tenantGroup/metadata0"_sr);
const KeyRangeRef tenantGroupMetadataKeys("metacluster/tenantGroup/metadata/"_sr,
"metacluster/tenantGroup/metadata0"_sr);
const KeyRangeRef dataClusterTenantIndexKeys("metacluster/dataCluster/tenantMap/"_sr,
"metacluster/dataCluster/tenantMap0"_sr);
const KeyRangeRef dataClusterTenantGroupIndexKeys("metacluster/dataCluster/tenantGroupMap/"_sr,
"metacluster/dataCluster/tenantGroupMap0"_sr);
// Metacluster data cluster keys
const KeyRef metaclusterRegistrationKey = "\xff/metacluster/clusterRegistration"_sr;

View File

@ -642,6 +642,8 @@ extern const KeyRangeRef dataClusterConnectionRecordKeys;
extern const KeyRef dataClusterConnectionRecordPrefix;
extern const KeyRef metaclusterRegistrationKey;
extern const KeyRangeRef tenantGroupMetadataKeys;
extern const KeyRangeRef dataClusterTenantIndexKeys;
extern const KeyRangeRef dataClusterTenantGroupIndexKeys;
#pragma clang diagnostic pop

View File

@ -270,8 +270,9 @@ Future<Void> deleteTenantTransaction(Transaction tr,
if (tenantEntry.present() && (!tenantId.present() || tenantEntry.get().id == tenantId.get())) {
if (clusterType == ClusterType::METACLUSTER_MANAGEMENT &&
tenantEntry.get().tenantState != TenantState::REMOVING) {
// TODO: better error
throw operation_failed();
// The metacluster API will not delete a tenant from the management cluster without first putting it into a
// REMOVING state
ASSERT(false);
}
state typename transaction_future_type<Transaction, RangeResult>::type prefixRangeFuture =