Add an index that allows us to allocate tenants from the non-full cluster with the most tenants already assigned to it.

This commit is contained in:
A.J. Beamon 2022-06-27 12:28:55 -07:00
parent 4b69723a2f
commit eccf244ba6
6 changed files with 116 additions and 53 deletions

View File

@ -171,7 +171,8 @@ ACTOR Future<bool> metaclusterConfigureCommand(Reference<IDatabase> db, std::vec
return false;
}
MetaclusterAPI::updateClusterMetadata(tr, tokens[2], config.get().first, config.get().second);
MetaclusterAPI::updateClusterMetadata(
tr, tokens[2], metadata.get(), config.get().first, config.get().second);
wait(safeThreadFutureToFuture(tr->commit()));
break;

View File

@ -3617,8 +3617,8 @@ struct RestoreDispatchTaskFunc : RestoreTaskFuncBase {
// Get a batch of files. We're targeting batchSize blocks being dispatched so query for batchSize files (each
// of which is 0 or more blocks).
state int taskBatchSize = BUGGIFY ? 1 : CLIENT_KNOBS->RESTORE_DISPATCH_ADDTASK_SIZE;
state RestoreConfig::FileSetT::Values files =
wait(restore.fileSet().getRange(tr, { beginVersion, beginFile }, {}, taskBatchSize));
state RestoreConfig::FileSetT::Values files = wait(restore.fileSet().getRange(
tr, Optional<RestoreConfig::RestoreFile>({ beginVersion, beginFile }), {}, taskBatchSize));
// allPartsDone will be set once all block tasks in the current batch are finished.
state Reference<TaskFuture> allPartsDone;

View File

@ -23,6 +23,7 @@
#include <utility>
#include <vector>
#include "fdbclient/GenericTransactionHelper.h"
#include "fdbclient/IClientApi.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/Subspace.h"
@ -497,48 +498,63 @@ public:
typedef std::vector<ValueType> Values;
// If end is not present one key past the end of the map is used.
Future<Values> getRange(Reference<ReadYourWritesTransaction> tr,
ValueType const& begin,
template <class Transaction>
Future<Values> getRange(Transaction tr,
Optional<ValueType> const& begin,
Optional<ValueType> const& end,
int limit,
Snapshot snapshot = Snapshot::False) const {
Snapshot snapshot = Snapshot::False,
Reverse reverse = Reverse::False) const {
Subspace s = space; // 'this' could be invalid inside lambda
Key beginKey = begin.present() ? s.pack(Codec<ValueType>::pack(begin.get())) : space.range().begin;
Key endKey = end.present() ? s.pack(Codec<ValueType>::pack(end.get())) : space.range().end;
return map(
tr->getRange(KeyRangeRef(s.pack(Codec<ValueType>::pack(begin)), endKey), GetRangeLimits(limit), snapshot),
[s](RangeResult const& kvs) -> Values {
Values results;
for (int i = 0; i < kvs.size(); ++i) {
results.push_back(Codec<ValueType>::unpack(s.unpack(kvs[i].key)));
}
return results;
});
typename transaction_future_type<Transaction, RangeResult>::type getRangeFuture =
tr->getRange(KeyRangeRef(beginKey, endKey), GetRangeLimits(limit), snapshot, reverse);
return holdWhile(getRangeFuture,
map(safeThreadFutureToFuture(getRangeFuture), [s](RangeResult const& kvs) -> Values {
Values results;
for (int i = 0; i < kvs.size(); ++i) {
results.push_back(Codec<ValueType>::unpack(s.unpack(kvs[i].key)));
}
return results;
}));
}
Future<bool> exists(Reference<ReadYourWritesTransaction> tr,
ValueType const& val,
Snapshot snapshot = Snapshot::False) const {
return map(tr->get(space.pack(Codec<ValueType>::pack(val)), snapshot),
[](Optional<Value> const& val) -> bool { return val.present(); });
template <class Transaction>
Future<bool> exists(Transaction tr, ValueType const& val, Snapshot snapshot = Snapshot::False) const {
typename transaction_future_type<Transaction, Optional<Value>>::type getFuture =
tr->get(space.pack(Codec<ValueType>::pack(val)), snapshot);
return holdWhile(getFuture, map(safeThreadFutureToFuture(getFuture), [](Optional<Value> const& val) -> bool {
return val.present();
}));
}
// Returns the expectedSize of the set key
int insert(Reference<ReadYourWritesTransaction> tr, ValueType const& val) {
template <class Transaction>
int insert(Transaction tr, ValueType const& val) {
Key k = space.pack(Codec<ValueType>::pack(val));
tr->set(k, StringRef());
return k.expectedSize();
}
void erase(Reference<ReadYourWritesTransaction> tr, ValueType const& val) {
template <class Transaction>
void erase(Transaction tr, ValueType const& val) {
return tr->clear(space.pack(Codec<ValueType>::pack(val)));
}
void erase(Reference<ReadYourWritesTransaction> tr, ValueType const& begin, ValueType const& end) {
template <class Transaction>
void erase(Transaction tr, ValueType const& begin, ValueType const& end) {
return tr->clear(
KeyRangeRef(space.pack(Codec<ValueType>::pack(begin)), space.pack(Codec<ValueType>::pack(end))));
}
void clear(Reference<ReadYourWritesTransaction> tr) { return tr->clear(space.range()); }
template <class Transaction>
void clear(Transaction tr) {
return tr->clear(space.range());
}
Subspace space;
};

View File

@ -56,4 +56,10 @@ Key getDataClusterTenantGroupIndexKey(ClusterNameRef cluster, Optional<TenantGro
return dataClusterTenantGroupIndexKeys.begin.withSuffix(tuple.pack());
}
KeyBackedSet<Tuple> ManagementClusterMetadata::clusterCapacityIndex("metacluster/clusterCapacityIndex/"_sr);
Tuple ManagementClusterMetadata::getClusterCapacityTuple(ClusterNameRef const& clusterName,
DataClusterEntry const& entry) {
return Tuple().append(entry.allocated.numTenantGroups).append(clusterName);
}
}; // namespace MetaclusterAPI

View File

@ -29,6 +29,7 @@
#include "fdbclient/FDBTypes.h"
#include "fdbclient/GenericTransactionHelper.h"
#include "fdbclient/GenericManagementAPI.actor.h"
#include "fdbclient/KeyBackedTypes.h"
#include "fdbclient/Metacluster.h"
#include "fdbclient/MultiVersionTransaction.h"
#include "fdbclient/SystemData.h"
@ -83,6 +84,12 @@ FDB_DECLARE_BOOLEAN_PARAM(RemoveMissingTenants);
namespace MetaclusterAPI {
struct ManagementClusterMetadata {
// A set of non-full clusters where the key is the tuple (num tenant groups allocated, cluster name).
static KeyBackedSet<Tuple> clusterCapacityIndex;
static Tuple getClusterCapacityTuple(ClusterNameRef const& clusterName, DataClusterEntry const& entry);
};
ACTOR Future<Reference<IDatabase>> openDatabase(ClusterConnectionString connectionString);
Key getDataClusterTenantIndexKey(ClusterNameRef cluster, Optional<TenantNameRef> tenant);
@ -263,17 +270,35 @@ Future<Void> decommissionMetacluster(Reference<DB> db) {
return Void();
}
template <class Transaction>
void updateClusterCapacityIndex(Transaction tr,
ClusterName name,
DataClusterEntry previousEntry,
DataClusterEntry updatedEntry) {
// Entries are put in the cluster capacity index ordered by how many items are already allocated to them
if (previousEntry.hasCapacity()) {
ManagementClusterMetadata::clusterCapacityIndex.erase(
tr, ManagementClusterMetadata::getClusterCapacityTuple(name, previousEntry));
}
if (updatedEntry.hasCapacity()) {
ManagementClusterMetadata::clusterCapacityIndex.insert(
tr, ManagementClusterMetadata::getClusterCapacityTuple(name, updatedEntry));
}
}
// This should only be called from a transaction that has already confirmed that the cluster entry
// is present. The updatedEntry should use the existing entry and modify only those fields that need
// to be changed.
template <class Transaction>
void updateClusterMetadata(Transaction tr,
ClusterNameRef name,
DataClusterMetadata previousMetadata,
Optional<ClusterConnectionString> updatedConnectionString,
Optional<DataClusterEntry> updatedEntry) {
if (updatedEntry.present()) {
tr->set(dataClusterMetadataPrefix.withSuffix(name), updatedEntry.get().encode());
updateClusterCapacityIndex(tr, name, previousMetadata.entry, updatedEntry.get());
}
if (updatedConnectionString.present()) {
tr->set(dataClusterConnectionRecordPrefix.withSuffix(name), updatedConnectionString.get().toString());
@ -319,6 +344,10 @@ Future<Void> managementClusterRegister(Transaction tr,
if (!result.second) {
entry.allocated = ClusterUsage();
if (entry.hasCapacity()) {
ManagementClusterMetadata::clusterCapacityIndex.insert(
tr, ManagementClusterMetadata::getClusterCapacityTuple(name, entry));
}
tr->set(dataClusterMetadataKey, entry.encode());
tr->set(dataClusterConnectionRecordKey, connectionString.toString());
}
@ -506,21 +535,26 @@ Future<std::pair<Optional<DataClusterMetadata>, bool>> managementClusterRemove(T
return std::make_pair(metadata, true);
}
bool purged = false;
if (checkEmpty && metadata.get().entry.allocated.numTenantGroups > 0) {
throw cluster_not_empty();
} else if (metadata.get().entry.allocated.numTenantGroups == 0) {
tr->clear(dataClusterMetadataKey);
tr->clear(dataClusterConnectionRecordKey);
return std::make_pair(metadata, true);
purged = true;
} else {
// We need to clean up the tenant metadata for this cluster before erasing it. While we are doing that,
// lock the entry to prevent other assignments.
DataClusterEntry updatedEntry = metadata.get().entry;
updatedEntry.locked = true;
updateClusterMetadata(tr, name, Optional<ClusterConnectionString>(), updatedEntry);
return std::make_pair(metadata, false);
updateClusterMetadata(tr, name, metadata.get(), Optional<ClusterConnectionString>(), updatedEntry);
}
ManagementClusterMetadata::clusterCapacityIndex.erase(
tr, ManagementClusterMetadata::getClusterCapacityTuple(name, metadata.get().entry));
return std::make_pair(metadata, purged);
}
ACTOR template <class DB>
@ -819,42 +853,50 @@ Future<std::map<ClusterName, DataClusterMetadata>> listClusters(Reference<DB> db
ACTOR template <class Transaction>
Future<std::pair<ClusterName, DataClusterMetadata>> assignTenant(Transaction tr, TenantMapEntry tenantEntry) {
// TODO: check for invalid tenant group name
// TODO: check that the chosen cluster is available, otherwise we can try another
state typename transaction_future_type<Transaction, Optional<Value>>::type groupMetadataFuture;
state bool creatingTenantGroup = true;
if (tenantEntry.tenantGroup.present()) {
if (tenantEntry.tenantGroup.get().startsWith("\xff"_sr)) {
throw invalid_tenant_group_name();
}
groupMetadataFuture = tr->get(tenantGroupMetadataKeys.begin.withSuffix(tenantEntry.tenantGroup.get()));
Optional<Value> groupMetadata = wait(safeThreadFutureToFuture(groupMetadataFuture));
if (groupMetadata.present()) {
creatingTenantGroup = false;
state TenantGroupEntry groupEntry = TenantGroupEntry::decode(groupMetadata.get());
Optional<DataClusterMetadata> clusterMetadata =
wait(tryGetClusterTransaction(tr, groupEntry.assignedCluster));
// TODO: This is only true if we clean up tenant state after force removal.
ASSERT(clusterMetadata.present());
return std::make_pair(groupEntry.assignedCluster, clusterMetadata.get());
}
}
// TODO: more efficient
std::map<ClusterName, DataClusterMetadata> clusters =
wait(listClustersTransaction(tr, ""_sr, "\xff"_sr, CLIENT_KNOBS->MAX_DATA_CLUSTERS));
state KeyBackedSet<Tuple>::Values availableClusters = wait(ManagementClusterMetadata::clusterCapacityIndex.getRange(
tr, Optional<Tuple>(), Optional<Tuple>(), 1, Snapshot::False, Reverse::True));
for (auto c : clusters) {
if (!creatingTenantGroup || c.second.entry.hasCapacity()) {
if (creatingTenantGroup) {
++c.second.entry.allocated.numTenantGroups;
updateClusterMetadata(tr, c.first, Optional<ClusterConnectionString>(), c.second.entry);
if (tenantEntry.tenantGroup.present()) {
tr->set(tenantGroupMetadataKeys.begin.withSuffix(tenantEntry.tenantGroup.get()),
TenantGroupEntry(c.first).encode());
}
}
return c;
state Optional<ClusterName> chosenCluster;
if (!availableClusters.empty()) {
// TODO: check that the chosen cluster is available, otherwise we can try another
chosenCluster = availableClusters[0].getString(1);
}
if (chosenCluster.present()) {
Optional<DataClusterMetadata> clusterMetadata = wait(tryGetClusterTransaction(tr, chosenCluster.get()));
ASSERT(clusterMetadata.present());
DataClusterEntry clusterEntry = clusterMetadata.get().entry;
ASSERT(clusterEntry.hasCapacity());
++clusterEntry.allocated.numTenantGroups;
updateClusterMetadata(
tr, chosenCluster.get(), clusterMetadata.get(), Optional<ClusterConnectionString>(), clusterEntry);
if (tenantEntry.tenantGroup.present()) {
tr->set(tenantGroupMetadataKeys.begin.withSuffix(tenantEntry.tenantGroup.get()),
TenantGroupEntry(chosenCluster.get()).encode());
}
return std::make_pair(chosenCluster.get(), clusterMetadata.get());
}
throw metacluster_no_capacity();
@ -1082,14 +1124,11 @@ Future<Void> deleteTenant(Reference<DB> db, TenantName name) {
finalClusterMetadata.present() && !tenantEntry3.get().tenantGroup.present();
if (finalClusterMetadata.present()) {
managementTr->clear(getDataClusterTenantIndexKey(tenantEntry3.get().assignedCluster.get(), name),
""_sr);
managementTr->clear(getDataClusterTenantIndexKey(tenantEntry3.get().assignedCluster.get(), name));
if (tenantEntry3.get().tenantGroup.present()) {
managementTr->clear(getDataClusterTenantGroupIndexKey(tenantEntry3.get().assignedCluster.get(),
tenantEntry3.get().tenantGroup.get()),
""_sr);
tenantEntry3.get().tenantGroup.get()));
RangeResult result = wait(safeThreadFutureToFuture(tenantGroupIndexFuture));
if (result.size() == 0) {
managementTr->clear(
@ -1102,6 +1141,7 @@ Future<Void> deleteTenant(Reference<DB> db, TenantName name) {
--updatedEntry.allocated.numTenantGroups;
updateClusterMetadata(managementTr,
tenantEntry3.get().assignedCluster.get(),
finalClusterMetadata.get(),
Optional<ClusterConnectionString>(),
updatedEntry);
}

View File

@ -197,7 +197,6 @@ ERROR( value_not_tuple, 2042, "The value cannot be parsed as a tuple" );
ERROR( mapper_not_tuple, 2043, "The mapper cannot be parsed as a tuple" );
ERROR( invalid_checkpoint_format, 2044, "Invalid checkpoint format" )
ERROR( incompatible_protocol_version, 2100, "Incompatible protocol version" )
ERROR( transaction_too_large, 2101, "Transaction exceeds byte limit" )
ERROR( key_too_large, 2102, "Key length exceeds limit" )
@ -232,6 +231,7 @@ ERROR( tenants_disabled, 2136, "Tenants have been disabled in the cluster" )
ERROR( unknown_tenant, 2137, "Tenant is not available from this server" )
ERROR( illegal_tenant_access, 2138, "Illegal tenant access" )
ERROR( tenant_removed, 2139, "The tenant was removed" )
ERROR( invalid_tenant_group_name, 2140, "Tenant group name cannot begin with \\xff" )
ERROR( invalid_cluster_name, 2150, "Data cluster name cannot begin with \\xff" )
ERROR( invalid_metacluster_operation, 2151, "Metacluster operation performed on non-metacluster" )