Remove subordinate mode and instead use a key in the system key-space to track metacluster membership. Use this key to determine if a cluster is already part of a metacluster or can be configured away from required mode. Disallow configuring to or from a management cluster that has data.

This commit is contained in:
A.J. Beamon 2022-05-11 17:10:55 -07:00
parent 0025158aa1
commit 85019612d4
16 changed files with 431 additions and 325 deletions

View File

@ -64,7 +64,7 @@ The ``commit`` command commits the current transaction. Any sets or clears execu
configure
---------
The ``configure`` command changes the database configuration. Its syntax is ``configure [new|tss] [single|double|triple|three_data_hall|three_datacenter] [ssd|memory] [grv_proxies=<N>] [commit_proxies=<N>] [resolvers=<N>] [logs=<N>] [count=<TSS_COUNT>] [perpetual_storage_wiggle=<WIGGLE_SPEED>] [perpetual_storage_wiggle_locality=<<LOCALITY_KEY>:<LOCALITY_VALUE>|0>] [storage_migration_type={disabled|aggressive|gradual}] [tenant_mode={disabled|optional|required|management|subordinate}]``.
The ``configure`` command changes the database configuration. Its syntax is ``configure [new|tss] [single|double|triple|three_data_hall|three_datacenter] [ssd|memory] [grv_proxies=<N>] [commit_proxies=<N>] [resolvers=<N>] [logs=<N>] [count=<TSS_COUNT>] [perpetual_storage_wiggle=<WIGGLE_SPEED>] [perpetual_storage_wiggle_locality=<<LOCALITY_KEY>:<LOCALITY_VALUE>|0>] [storage_migration_type={disabled|aggressive|gradual}] [tenant_mode={disabled|optional|required|management}]``.
The ``new`` option, if present, initializes a new database with the given configuration rather than changing the configuration of an existing one. When ``new`` is used, both a redundancy mode and a storage engine must be specified.

View File

@ -784,8 +784,7 @@
"disabled",
"optional",
"required",
"management",
"subordinate"
"management"
]}
},
"data":{

View File

@ -30,7 +30,6 @@ FoundationDB clusters support the following tenant modes:
* ``optional`` - Tenants can be created. Each transaction can choose whether or not to use a tenant. This mode is primarily intended for migration and testing purposes, and care should be taken to avoid conflicts between tenant and non-tenant data.
* ``required`` - Tenants can be created. Each normal transaction must use a tenant. To support special access needs, transactions will be permitted to access the raw key-space using the ``RAW_ACCESS`` transaction option.
* ``management`` - The cluster is configured as the management cluster in a metacluster. Subordinate data clusters can be registered with this management cluster and created tenants will be assigned to them. This cluster is not allowed to store data.
* ``subordinate`` - The cluster is configured as a data cluster in a metacluster. Like ``required`` mode, except tenants cannot be created and deleted directly on a ``subordinate`` cluster.
Creating and deleting tenants
=============================

View File

@ -272,6 +272,14 @@ ACTOR Future<bool> configureCommandActor(Reference<IDatabase> db,
stderr,
"WARN: Sharded RocksDB storage engine type is still in experimental stage, not yet production tested.\n");
break;
case ConfigurationResult::DATABASE_NOT_EMPTY:
fprintf(stderr, "ERROR: The cluster configuration cannot be changed because the cluster is not empty.\n");
ret = false;
break;
case ConfigurationResult::DATABASE_IS_REGISTERED:
fprintf(stderr, "ERROR: A cluster cannot be configured out of `required' mode while part of a metacluster.\n");
ret = false;
break;
default:
ASSERT(false);
ret = false;
@ -317,7 +325,7 @@ CommandFactory configureFactory(
"commit_proxies=<COMMIT_PROXIES>|grv_proxies=<GRV_PROXIES>|logs=<LOGS>|resolvers=<RESOLVERS>>*|"
"count=<TSS_COUNT>|perpetual_storage_wiggle=<WIGGLE_SPEED>|perpetual_storage_wiggle_locality="
"<<LOCALITY_KEY>:<LOCALITY_VALUE>|0>|storage_migration_type={disabled|gradual|aggressive}"
"|tenant_mode={disabled|optional|required|management|subordinate}|blob_granules_enabled={0|1}",
"|tenant_mode={disabled|optional|required|management}|blob_granules_enabled={0|1}",
"change the database configuration",
"The `new' option, if present, initializes a new database with the given configuration rather than changing "
"the configuration of an existing one. When used, both a redundancy mode and a storage engine must be "
@ -348,7 +356,7 @@ CommandFactory configureFactory(
"perpetual_storage_wiggle_locality=<<LOCALITY_KEY>:<LOCALITY_VALUE>|0>: Set the process filter for wiggling. "
"The processes that match the given locality key and locality value are only wiggled. The value 0 will disable "
"the locality filter and matches all the processes for wiggling.\n\n"
"tenant_mode=<disabled|optional|required|management|subordinate>: Sets the tenant mode for the cluster. If "
"tenant_mode=<disabled|optional|required|management>: Sets the tenant mode for the cluster. If "
"optional, then transactions can be run with or without specifying tenants. If required, all data must be "
"accessed using tenants.\n\n"

View File

@ -145,6 +145,7 @@ set(FDBCLIENT_SRCS
TaskBucket.h
Tenant.cpp
Tenant.h
TenantManagement.actor.h
TenantSpecialKeys.actor.cpp
TestKnobCollection.cpp
TestKnobCollection.h

View File

@ -1328,7 +1328,7 @@ struct TenantMode {
// These enumerated values are stored in the database configuration, so can NEVER be changed. Only add new ones
// just before END.
// Note: OPTIONAL_TENANT is not named OPTIONAL because of a collision with a Windows macro.
enum Mode { DISABLED = 0, OPTIONAL_TENANT = 1, REQUIRED = 2, MANAGEMENT = 3, SUBORDINATE = 4, END = 5 };
enum Mode { DISABLED = 0, OPTIONAL_TENANT = 1, REQUIRED = 2, MANAGEMENT = 3, END = 4 };
TenantMode() : mode(DISABLED) {}
TenantMode(Mode mode) : mode(mode) {
@ -1353,8 +1353,6 @@ struct TenantMode {
return "required";
case MANAGEMENT:
return "management";
case SUBORDINATE:
return "subordinate";
default:
ASSERT(false);
}

View File

@ -42,6 +42,7 @@ the contents of the system key space.
#include "fdbclient/DatabaseConfiguration.h"
#include "fdbclient/Status.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/TenantManagement.actor.h"
#include "flow/actorcompiler.h" // has to be last include
// ConfigurationResult enumerates normal outcomes of changeConfig() and various error
@ -70,6 +71,8 @@ enum class ConfigurationResult {
SUCCESS_WARN_SHARDED_ROCKSDB_EXPERIMENTAL,
DATABASE_CREATED_WARN_ROCKSDB_EXPERIMENTAL,
DATABASE_CREATED_WARN_SHARDED_ROCKSDB_EXPERIMENTAL,
DATABASE_NOT_EMPTY,
DATABASE_IS_REGISTERED
};
enum class CoordinatorsResult {
@ -475,6 +478,43 @@ Future<ConfigurationResult> changeConfigTransaction(Transaction tr,
newConfig.storageServerStoreType == KeyValueStoreType::SSD_SHARDED_ROCKSDB) {
warnShardedRocksDBIsExperimental = true;
}
if (newConfig.tenantMode != oldConfig.tenantMode) {
if (newConfig.tenantMode == TenantMode::MANAGEMENT || oldConfig.tenantMode == TenantMode::MANAGEMENT) {
state Future<std::map<TenantName, TenantMapEntry>> tenantsFuture =
listTenantsTransaction(tr, ""_sr, "\xff\xff"_sr, 1);
state typename transaction_future_type<Transaction, RangeResult>::type dataClustersFuture =
tr->getRange(dataClusterMetadataKeys, 1);
if (newConfig.tenantMode == TenantMode::MANAGEMENT) {
state typename transaction_future_type<Transaction, RangeResult>::type dbContentsFuture =
tr->getRange(normalKeys, 1);
RangeResult dbContents = wait(safeThreadFutureToFuture(dbContentsFuture));
if (!dbContents.empty()) {
return ConfigurationResult::DATABASE_NOT_EMPTY;
}
}
RangeResult dataClusters = wait(safeThreadFutureToFuture(dataClustersFuture));
if (!dataClusters.empty()) {
return ConfigurationResult::DATABASE_NOT_EMPTY;
}
std::map<TenantName, TenantMapEntry> tenants = wait(tenantsFuture);
if (!tenants.empty()) {
return ConfigurationResult::DATABASE_NOT_EMPTY;
}
} else if (oldConfig.tenantMode == TenantMode::REQUIRED) {
state typename transaction_future_type<Transaction, Optional<Value>>::type dbRegistrationFuture =
tr->get(dataClusterRegistrationKey);
Optional<Value> dbRegistration = wait(safeThreadFutureToFuture(dbRegistrationFuture));
if (dbRegistration.present()) {
return ConfigurationResult::DATABASE_IS_REGISTERED;
}
}
}
}
}
if (creating) {
@ -687,269 +727,6 @@ Future<ConfigurationResult> changeConfig(Reference<DB> db,
// used by special keys and fdbcli
std::string generateErrorMessage(const CoordinatorsResult& res);
ACTOR template <class Transaction>
Future<Optional<TenantMapEntry>> tryGetTenantTransaction(Transaction tr, TenantName name) {
state Key tenantMapKey = name.withPrefix(tenantMapPrefix);
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
state typename transaction_future_type<Transaction, Optional<Value>>::type tenantFuture = tr->get(tenantMapKey);
Optional<Value> val = wait(safeThreadFutureToFuture(tenantFuture));
return val.map<TenantMapEntry>([](Optional<Value> v) { return decodeTenantEntry(v.get()); });
}
ACTOR template <class DB>
Future<Optional<TenantMapEntry>> tryGetTenant(Reference<DB> db, TenantName name) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
loop {
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<TenantMapEntry> entry = wait(tryGetTenantTransaction(tr, name));
return entry;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
ACTOR template <class Transaction>
Future<TenantMapEntry> getTenantTransaction(Transaction tr, TenantName name) {
Optional<TenantMapEntry> entry = wait(tryGetTenantTransaction(tr, name));
if (!entry.present()) {
throw tenant_not_found();
}
return entry.get();
}
ACTOR template <class DB>
Future<TenantMapEntry> getTenant(Reference<DB> db, TenantName name) {
Optional<TenantMapEntry> entry = wait(tryGetTenant(db, name));
if (!entry.present()) {
throw tenant_not_found();
}
return entry.get();
}
// Creates a tenant with the given name. If the tenant already exists, an empty optional will be returned.
ACTOR template <class Transaction>
Future<Optional<TenantMapEntry>> createTenantTransaction(Transaction tr,
TenantNameRef name,
TenantMapEntry tenantEntry = TenantMapEntry()) {
state Key tenantMapKey = name.withPrefix(tenantMapPrefix);
if (name.startsWith("\xff"_sr)) {
throw invalid_tenant_name();
}
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Future<Optional<TenantMapEntry>> existingEntryFuture = tryGetTenantTransaction(tr, name);
state typename transaction_future_type<Transaction, Optional<Value>>::type tenantDataPrefixFuture =
tr->get(tenantDataPrefixKey);
state typename transaction_future_type<Transaction, Optional<Value>>::type lastIdFuture = tr->get(tenantLastIdKey);
state typename transaction_future_type<Transaction, Optional<Value>>::type tenantModeFuture =
tr->get(configKeysPrefix.withSuffix("tenant_mode"_sr));
Optional<Value> tenantMode = wait(safeThreadFutureToFuture(tenantModeFuture));
if (!tenantMode.present() || tenantMode.get() == StringRef(format("%d", TenantMode::DISABLED))) {
throw tenants_disabled();
}
Optional<TenantMapEntry> existingEntry = wait(existingEntryFuture);
if (existingEntry.present()) {
return Optional<TenantMapEntry>();
}
state Optional<Value> lastIdVal = wait(safeThreadFutureToFuture(lastIdFuture));
Optional<Value> tenantDataPrefix = wait(safeThreadFutureToFuture(tenantDataPrefixFuture));
if (tenantDataPrefix.present() &&
tenantDataPrefix.get().size() + TenantMapEntry::ROOT_PREFIX_SIZE > CLIENT_KNOBS->TENANT_PREFIX_SIZE_LIMIT) {
TraceEvent(SevWarnAlways, "TenantPrefixTooLarge")
.detail("TenantSubspace", tenantDataPrefix.get())
.detail("TenantSubspaceLength", tenantDataPrefix.get().size())
.detail("RootPrefixLength", TenantMapEntry::ROOT_PREFIX_SIZE)
.detail("MaxTenantPrefixSize", CLIENT_KNOBS->TENANT_PREFIX_SIZE_LIMIT);
throw client_invalid_operation();
}
tenantEntry.id = lastIdVal.present() ? TenantMapEntry::prefixToId(lastIdVal.get()) + 1 : 0;
tenantEntry.setSubspace(tenantDataPrefix.present() ? (KeyRef)tenantDataPrefix.get() : ""_sr);
state typename transaction_future_type<Transaction, RangeResult>::type prefixRangeFuture =
tr->getRange(prefixRange(tenantEntry.prefix), 1);
RangeResult contents = wait(safeThreadFutureToFuture(prefixRangeFuture));
if (!contents.empty()) {
throw tenant_prefix_allocator_conflict();
}
tr->set(tenantLastIdKey, TenantMapEntry::idToPrefix(tenantEntry.id));
tr->set(tenantMapKey, encodeTenantEntry(tenantEntry));
return tenantEntry;
}
ACTOR template <class DB>
Future<Void> createTenant(Reference<DB> db, TenantName name, TenantMapEntry tenantEntry = TenantMapEntry()) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
state bool firstTry = true;
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (firstTry) {
Optional<TenantMapEntry> entry = wait(tryGetTenantTransaction(tr, name));
if (entry.present()) {
throw tenant_already_exists();
}
firstTry = false;
}
state Optional<TenantMapEntry> newTenant = wait(createTenantTransaction(tr, name, tenantEntry));
if (BUGGIFY) {
throw commit_unknown_result();
}
wait(safeThreadFutureToFuture(tr->commit()));
if (BUGGIFY) {
throw commit_unknown_result();
}
TraceEvent("CreatedTenant")
.detail("Tenant", name)
.detail("TenantId", newTenant.present() ? newTenant.get().id : -1)
.detail("Prefix", newTenant.present() ? (StringRef)newTenant.get().prefix : "Unknown"_sr)
.detail("TenantGroup", tenantEntry.tenantGroup)
.detail("Version", tr->getCommittedVersion());
return Void();
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
ACTOR template <class Transaction>
Future<Void> deleteTenantTransaction(Transaction tr, TenantNameRef name) {
state Key tenantMapKey = name.withPrefix(tenantMapPrefix);
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Optional<TenantMapEntry> tenantEntry = wait(tryGetTenantTransaction(tr, name));
if (!tenantEntry.present()) {
return Void();
}
state typename transaction_future_type<Transaction, RangeResult>::type prefixRangeFuture =
tr->getRange(prefixRange(tenantEntry.get().prefix), 1);
RangeResult contents = wait(safeThreadFutureToFuture(prefixRangeFuture));
if (!contents.empty()) {
throw tenant_not_empty();
}
tr->clear(tenantMapKey);
return Void();
}
ACTOR template <class DB>
Future<Void> deleteTenant(Reference<DB> db, TenantName name) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
state bool firstTry = true;
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (firstTry) {
Optional<TenantMapEntry> entry = wait(tryGetTenantTransaction(tr, name));
if (!entry.present()) {
throw tenant_not_found();
}
firstTry = false;
}
wait(deleteTenantTransaction(tr, name));
if (BUGGIFY) {
throw commit_unknown_result();
}
wait(safeThreadFutureToFuture(tr->commit()));
if (BUGGIFY) {
throw commit_unknown_result();
}
TraceEvent("DeletedTenant").detail("Tenant", name).detail("Version", tr->getCommittedVersion());
return Void();
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
// This should only be called from a transaction that has already confirmed that the cluster entry
// is present. The updatedEntry should use the existing entry and modify only those fields that need
// to be changed.
template <class Transaction>
void configureTenantTransaction(Transaction tr, TenantNameRef tenantName, TenantMapEntry tenantEntry) {
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
tr->set(tenantName.withPrefix(tenantMapPrefix), encodeTenantEntry(tenantEntry));
}
ACTOR template <class Transaction>
Future<std::map<TenantName, TenantMapEntry>> listTenantsTransaction(Transaction tr,
TenantNameRef begin,
TenantNameRef end,
int limit) {
state KeyRange range = KeyRangeRef(begin, end).withPrefix(tenantMapPrefix);
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
state typename transaction_future_type<Transaction, RangeResult>::type listFuture =
tr->getRange(firstGreaterOrEqual(range.begin), firstGreaterOrEqual(range.end), limit);
RangeResult results = wait(safeThreadFutureToFuture(listFuture));
std::map<TenantName, TenantMapEntry> tenants;
for (auto kv : results) {
tenants[kv.key.removePrefix(tenantMapPrefix)] = decodeTenantEntry(kv.value);
}
return tenants;
}
ACTOR template <class DB>
Future<std::map<TenantName, TenantMapEntry>> listTenants(Reference<DB> db,
TenantName begin,
TenantName end,
int limit) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
loop {
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
std::map<TenantName, TenantMapEntry> tenants = wait(listTenantsTransaction(tr, begin, end, limit));
return tenants;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
} // namespace ManagementAPI
#include "flow/unactorcompiler.h"

View File

@ -196,10 +196,8 @@ std::map<std::string, std::string> configForToken(std::string const& mode) {
tenantMode = TenantMode::REQUIRED;
} else if (value == "management") {
tenantMode = TenantMode::MANAGEMENT;
} else if (value == "subordinate") {
tenantMode = TenantMode::SUBORDINATE;
} else {
printf("Error: Only disabled|optional|required|management|subordinate are valid for tenant_mode.\n");
printf("Error: Only disabled|optional|required|management are valid for tenant_mode.\n");
return out;
}
out[p + key] = format("%d", tenantMode);

View File

@ -77,10 +77,40 @@ struct DataClusterEntry {
DataClusterEntry(int64_t id, ClusterUsage capacity, ClusterUsage allocated)
: id(id), capacity(capacity), allocated(allocated) {}
Value encode() { return ObjectWriter::toValue(*this, IncludeVersion(ProtocolVersion::withMetacluster())); }
static DataClusterEntry decode(ValueRef const& value) {
DataClusterEntry entry;
ObjectReader reader(value.begin(), IncludeVersion());
reader.deserialize(entry);
return entry;
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id, capacity, allocated);
}
};
struct DataClusterRegistrationEntry {
constexpr static FileIdentifier file_identifier = 13448589;
ClusterName name;
DataClusterRegistrationEntry() = default;
DataClusterRegistrationEntry(ClusterName name) : name(name) {}
Value encode() { return ObjectWriter::toValue(*this, IncludeVersion(ProtocolVersion::withMetacluster())); }
static DataClusterRegistrationEntry decode(ValueRef const& value) {
DataClusterRegistrationEntry entry;
ObjectReader reader(value.begin(), IncludeVersion());
reader.deserialize(entry);
return entry;
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, name);
}
};
#endif

View File

@ -73,13 +73,11 @@ ACTOR Future<RangeResult> getDataClusterList(ReadYourWritesTransaction* ryw,
RangeResult results;
for (auto cluster : clusters) {
ValueRef value =
ObjectWriter::toValue(cluster.second, IncludeVersion(ProtocolVersion::withMetacluster()), results.arena());
results.push_back(
results.arena(),
KeyValueRef(cluster.first.withPrefix(
"\xff\xff/metacluster_internal/management_cluster/data_cluster/map/"_sr, results.arena()),
value));
cluster.second.encode(results.arena())));
}
return results;

View File

@ -46,6 +46,17 @@ struct DataClusterMetadata {
DataClusterMetadata(DataClusterEntry const& entry, ClusterConnectionString const& connectionString)
: entry(entry), connectionString(connectionString) {}
Value encode() { return ObjectWriter::toValue(*this, IncludeVersion(ProtocolVersion::withMetacluster())); }
Value encode(Arena& arena) {
return ObjectWriter::toValue(*this, IncludeVersion(ProtocolVersion::withMetacluster()), arena);
}
static DataClusterMetadata decode(ValueRef const& value) {
DataClusterMetadata metadata;
ObjectReader reader(value.begin(), IncludeVersion());
reader.deserialize(metadata);
return metadata;
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, connectionString, entry);
@ -75,7 +86,7 @@ Future<Optional<DataClusterMetadata>> managementClusterTryGetCluster(Transaction
if (metadata.present()) {
ASSERT(connectionString.present());
return Optional<DataClusterMetadata>(DataClusterMetadata(
decodeDataClusterEntry(metadata.get()), ClusterConnectionString(connectionString.get().toString())));
DataClusterEntry::decode(metadata.get()), ClusterConnectionString(connectionString.get().toString())));
} else {
return Optional<DataClusterMetadata>();
}
@ -91,10 +102,7 @@ Future<Optional<DataClusterMetadata>> tryGetClusterTransaction(Transaction tr, C
state Optional<Value> clusterEntry = wait(safeThreadFutureToFuture(clusterEntryFuture));
if (clusterEntry.present()) {
DataClusterMetadata metadata;
ObjectReader reader(clusterEntry.get().begin(), IncludeVersion());
reader.deserialize(metadata);
return metadata;
return DataClusterMetadata::decode(clusterEntry.get());
} else {
return Optional<DataClusterMetadata>();
}
@ -145,7 +153,7 @@ Future<Void> managementClusterUpdateClusterMetadata(Transaction tr, ClusterNameR
throw cluster_not_found();
}
tr->set(name.withPrefix(dataClusterMetadataPrefix), encodeDataClusterEntry(metadata.entry));
tr->set(name.withPrefix(dataClusterMetadataPrefix), metadata.entry.encode());
tr->set(name.withPrefix(dataClusterConnectionRecordPrefix), metadata.connectionString.toString());
return Void();
@ -207,7 +215,7 @@ Future<Void> managementClusterRegister(Transaction tr,
entry.allocated = ClusterUsage();
tr->set(dataClusterLastIdKey, DataClusterEntry::idToValue(entry.id));
tr->set(dataClusterMetadataKey, encodeDataClusterEntry(entry));
tr->set(dataClusterMetadataKey, entry.encode());
tr->set(dataClusterConnectionRecordKey, connectionString);
return Void();
@ -219,6 +227,8 @@ Future<Void> dataClusterRegister(Transaction tr, ClusterNameRef name) {
ManagementAPI::listTenantsTransaction(tr, ""_sr, "\xff\xff"_sr, 1);
state typename transaction_future_type<Transaction, RangeResult>::type existingDataFuture =
tr->getRange(normalKeys, 1);
state typename transaction_future_type<Transaction, Optional<Value>>::type clusterNameFuture =
tr->get(dataClusterRegistrationKey);
std::map<TenantName, TenantMapEntry> existingTenants = wait(safeThreadFutureToFuture(existingTenantsFuture));
if (!existingTenants.empty()) {
@ -232,7 +242,15 @@ Future<Void> dataClusterRegister(Transaction tr, ClusterNameRef name) {
throw cluster_not_empty();
}
std::vector<StringRef> tokens = { "tenant_mode=subordinate"_sr };
// TODO: this is not idempotent
Optional<Value> storedClusterName = wait(safeThreadFutureToFuture(clusterNameFuture));
if (storedClusterName.present()) {
throw cluster_already_registered();
}
tr->set(dataClusterRegistrationKey, DataClusterRegistrationEntry(name).encode());
std::vector<StringRef> tokens = { "tenant_mode=required"_sr };
ConfigurationResult configResult =
wait(ManagementAPI::changeConfigTransaction(tr, tokens, Optional<ConfigureAutoResult>(), false, false));
@ -244,8 +262,6 @@ Future<Void> dataClusterRegister(Transaction tr, ClusterNameRef name) {
throw cluster_configuration_failure();
}
// TODO: store the cluster name somewhere
return Void();
}
@ -401,20 +417,9 @@ Future<Void> managementClusterRemove(Transaction tr, ClusterNameRef name) {
return Void();
}
ACTOR template <class Transaction>
template <class Transaction>
Future<Void> dataClusterRemove(Transaction tr) {
// TODO: is there any other state to remove?
std::vector<StringRef> tokens = { "tenant_mode=required"_sr };
ConfigurationResult configResult =
wait(ManagementAPI::changeConfigTransaction(tr, tokens, Optional<ConfigureAutoResult>(), false, false));
if (configResult != ConfigurationResult::SUCCESS) {
TraceEvent(SevWarn, "CouldNotConfigureDataCluster").detail("ConfigurationResult", configResult);
throw cluster_configuration_failure();
}
tr->clear(dataClusterRegistrationKey);
return Void();
}
@ -509,8 +514,9 @@ Future<std::map<ClusterName, DataClusterMetadata>> managementClusterListClusters
std::map<ClusterName, DataClusterMetadata> clusters;
for (int i = 0; i < metadata.size(); ++i) {
clusters[metadata[i].key.removePrefix(dataClusterMetadataPrefix)] = DataClusterMetadata(
decodeDataClusterEntry(metadata[i].value), ClusterConnectionString(connectionStrings[i].value.toString()));
clusters[metadata[i].key.removePrefix(dataClusterMetadataPrefix)] =
DataClusterMetadata(DataClusterEntry::decode(metadata[i].value),
ClusterConnectionString(connectionStrings[i].value.toString()));
}
return clusters;
@ -538,8 +544,9 @@ Future<std::map<ClusterName, DataClusterMetadata>> listClustersTransaction(Trans
std::map<ClusterName, DataClusterMetadata> clusters;
for (int i = 0; i < metadata.size(); ++i) {
clusters[metadata[i].key.removePrefix(dataClusterMetadataPrefix)] = DataClusterMetadata(
decodeDataClusterEntry(metadata[i].value), ClusterConnectionString(connectionStrings[i].value.toString()));
clusters[metadata[i].key.removePrefix(dataClusterMetadataPrefix)] =
DataClusterMetadata(DataClusterEntry::decode(metadata[i].value),
ClusterConnectionString(connectionStrings[i].value.toString()));
}
return clusters;
@ -563,10 +570,7 @@ Future<std::map<ClusterName, DataClusterMetadata>> listClusters(Reference<DB> db
std::map<ClusterName, DataClusterMetadata> clusters;
for (auto result : results) {
DataClusterMetadata metadata;
ObjectReader reader(result.value.begin(), IncludeVersion());
reader.deserialize(metadata);
clusters[result.key.removePrefix(prefix)] = metadata;
clusters[result.key.removePrefix(prefix)] = DataClusterMetadata::decode(result.value);
}
return clusters;

View File

@ -820,8 +820,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"disabled",
"optional",
"required",
"management",
"subordinate"
"management"
]}
},
"data":{

View File

@ -1408,7 +1408,7 @@ const KeyRef tenantMapPrivatePrefix = "\xff\xff/tenantMap/"_sr;
const KeyRef tenantLastIdKey = "\xff/tenantLastId/"_sr;
const KeyRef tenantDataPrefixKey = "\xff/tenantDataPrefix"_sr;
// Metacluster keys
// Metacluster management cluster keys
const KeyRangeRef dataClusterMetadataKeys("\xff/metacluster/dataCluster/metadata/"_sr,
"\xff/metacluster/dataCluster/metadata0"_sr);
const KeyRef dataClusterMetadataPrefix = dataClusterMetadataKeys.begin;
@ -1417,16 +1417,8 @@ const KeyRangeRef dataClusterConnectionRecordKeys("\xff/metacluster/dataCluster/
const KeyRef dataClusterConnectionRecordPrefix = dataClusterConnectionRecordKeys.begin;
const KeyRef dataClusterLastIdKey = "\xff/metacluster/dataCluster/lastId/"_sr;
Value encodeDataClusterEntry(DataClusterEntry const& dataClusterEntry) {
return ObjectWriter::toValue(dataClusterEntry, IncludeVersion());
}
DataClusterEntry decodeDataClusterEntry(ValueRef const& value) {
DataClusterEntry entry;
ObjectReader reader(value.begin(), IncludeVersion());
reader.deserialize(entry);
return entry;
}
// Metacluster data cluster keys
const KeyRef dataClusterRegistrationKey = "\xff/metacluster/dataCluster/clusterName"_sr;
// for tests
void testSSISerdes(StorageServerInterface const& ssi) {

View File

@ -634,9 +634,7 @@ extern const KeyRef dataClusterMetadataPrefix;
extern const KeyRangeRef dataClusterConnectionRecordKeys;
extern const KeyRef dataClusterConnectionRecordPrefix;
extern const KeyRef dataClusterLastIdKey;
Value encodeDataClusterEntry(DataClusterEntry const& dataClusterEntry);
DataClusterEntry decodeDataClusterEntry(ValueRef const& value);
extern const KeyRef dataClusterRegistrationKey;
#pragma clang diagnostic pop

View File

@ -0,0 +1,303 @@
/*
* TenantManagement.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_TENANT_MANAGEMENT_ACTOR_G_H)
#define FDBCLIENT_TENANT_MANAGEMENT_ACTOR_G_H
#include "fdbclient/TenantManagement.actor.g.h"
#elif !defined(FDBCLIENT_TENANT_MANAGEMENT_ACTOR_H)
#define FDBCLIENT_TENANT_MANAGEMENT_ACTOR_H
#include <string>
#include <map>
#include "fdbclient/GenericTransactionHelper.h"
#include "fdbclient/SystemData.h"
#include "flow/actorcompiler.h" // has to be last include
namespace ManagementAPI {
ACTOR template <class Transaction>
Future<Optional<TenantMapEntry>> tryGetTenantTransaction(Transaction tr, TenantName name) {
state Key tenantMapKey = name.withPrefix(tenantMapPrefix);
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
state typename transaction_future_type<Transaction, Optional<Value>>::type tenantFuture = tr->get(tenantMapKey);
Optional<Value> val = wait(safeThreadFutureToFuture(tenantFuture));
return val.map<TenantMapEntry>([](Optional<Value> v) { return decodeTenantEntry(v.get()); });
}
ACTOR template <class DB>
Future<Optional<TenantMapEntry>> tryGetTenant(Reference<DB> db, TenantName name) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
loop {
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<TenantMapEntry> entry = wait(tryGetTenantTransaction(tr, name));
return entry;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
ACTOR template <class Transaction>
Future<TenantMapEntry> getTenantTransaction(Transaction tr, TenantName name) {
Optional<TenantMapEntry> entry = wait(tryGetTenantTransaction(tr, name));
if (!entry.present()) {
throw tenant_not_found();
}
return entry.get();
}
ACTOR template <class DB>
Future<TenantMapEntry> getTenant(Reference<DB> db, TenantName name) {
Optional<TenantMapEntry> entry = wait(tryGetTenant(db, name));
if (!entry.present()) {
throw tenant_not_found();
}
return entry.get();
}
// Creates a tenant with the given name. If the tenant already exists, an empty optional will be returned.
ACTOR template <class Transaction>
Future<Optional<TenantMapEntry>> createTenantTransaction(Transaction tr,
TenantNameRef name,
TenantMapEntry tenantEntry = TenantMapEntry()) {
state Key tenantMapKey = name.withPrefix(tenantMapPrefix);
if (name.startsWith("\xff"_sr)) {
throw invalid_tenant_name();
}
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Future<Optional<TenantMapEntry>> existingEntryFuture = tryGetTenantTransaction(tr, name);
state typename transaction_future_type<Transaction, Optional<Value>>::type tenantDataPrefixFuture =
tr->get(tenantDataPrefixKey);
state typename transaction_future_type<Transaction, Optional<Value>>::type lastIdFuture = tr->get(tenantLastIdKey);
state typename transaction_future_type<Transaction, Optional<Value>>::type tenantModeFuture =
tr->get(configKeysPrefix.withSuffix("tenant_mode"_sr));
Optional<Value> tenantMode = wait(safeThreadFutureToFuture(tenantModeFuture));
// TODO: disallow tenant creation directly on clusters in a metacluster
if (!tenantMode.present() || tenantMode.get() == StringRef(format("%d", TenantMode::DISABLED))) {
throw tenants_disabled();
}
Optional<TenantMapEntry> existingEntry = wait(existingEntryFuture);
if (existingEntry.present()) {
return Optional<TenantMapEntry>();
}
state Optional<Value> lastIdVal = wait(safeThreadFutureToFuture(lastIdFuture));
Optional<Value> tenantDataPrefix = wait(safeThreadFutureToFuture(tenantDataPrefixFuture));
if (tenantDataPrefix.present() &&
tenantDataPrefix.get().size() + TenantMapEntry::ROOT_PREFIX_SIZE > CLIENT_KNOBS->TENANT_PREFIX_SIZE_LIMIT) {
TraceEvent(SevWarnAlways, "TenantPrefixTooLarge")
.detail("TenantSubspace", tenantDataPrefix.get())
.detail("TenantSubspaceLength", tenantDataPrefix.get().size())
.detail("RootPrefixLength", TenantMapEntry::ROOT_PREFIX_SIZE)
.detail("MaxTenantPrefixSize", CLIENT_KNOBS->TENANT_PREFIX_SIZE_LIMIT);
throw client_invalid_operation();
}
tenantEntry.id = lastIdVal.present() ? TenantMapEntry::prefixToId(lastIdVal.get()) + 1 : 0;
tenantEntry.setSubspace(tenantDataPrefix.present() ? (KeyRef)tenantDataPrefix.get() : ""_sr);
state typename transaction_future_type<Transaction, RangeResult>::type prefixRangeFuture =
tr->getRange(prefixRange(tenantEntry.prefix), 1);
RangeResult contents = wait(safeThreadFutureToFuture(prefixRangeFuture));
if (!contents.empty()) {
throw tenant_prefix_allocator_conflict();
}
tr->set(tenantLastIdKey, TenantMapEntry::idToPrefix(tenantEntry.id));
tr->set(tenantMapKey, encodeTenantEntry(tenantEntry));
return tenantEntry;
}
ACTOR template <class DB>
Future<Void> createTenant(Reference<DB> db, TenantName name, TenantMapEntry tenantEntry = TenantMapEntry()) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
state bool firstTry = true;
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (firstTry) {
Optional<TenantMapEntry> entry = wait(tryGetTenantTransaction(tr, name));
if (entry.present()) {
throw tenant_already_exists();
}
firstTry = false;
}
state Optional<TenantMapEntry> newTenant = wait(createTenantTransaction(tr, name, tenantEntry));
if (BUGGIFY) {
throw commit_unknown_result();
}
wait(safeThreadFutureToFuture(tr->commit()));
if (BUGGIFY) {
throw commit_unknown_result();
}
TraceEvent("CreatedTenant")
.detail("Tenant", name)
.detail("TenantId", newTenant.present() ? newTenant.get().id : -1)
.detail("Prefix", newTenant.present() ? (StringRef)newTenant.get().prefix : "Unknown"_sr)
.detail("TenantGroup", tenantEntry.tenantGroup)
.detail("Version", tr->getCommittedVersion());
return Void();
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
ACTOR template <class Transaction>
Future<Void> deleteTenantTransaction(Transaction tr, TenantNameRef name) {
state Key tenantMapKey = name.withPrefix(tenantMapPrefix);
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Optional<TenantMapEntry> tenantEntry = wait(tryGetTenantTransaction(tr, name));
if (!tenantEntry.present()) {
return Void();
}
state typename transaction_future_type<Transaction, RangeResult>::type prefixRangeFuture =
tr->getRange(prefixRange(tenantEntry.get().prefix), 1);
RangeResult contents = wait(safeThreadFutureToFuture(prefixRangeFuture));
if (!contents.empty()) {
throw tenant_not_empty();
}
tr->clear(tenantMapKey);
return Void();
}
ACTOR template <class DB>
Future<Void> deleteTenant(Reference<DB> db, TenantName name) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
state bool firstTry = true;
loop {
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (firstTry) {
Optional<TenantMapEntry> entry = wait(tryGetTenantTransaction(tr, name));
if (!entry.present()) {
throw tenant_not_found();
}
firstTry = false;
}
wait(deleteTenantTransaction(tr, name));
if (BUGGIFY) {
throw commit_unknown_result();
}
wait(safeThreadFutureToFuture(tr->commit()));
if (BUGGIFY) {
throw commit_unknown_result();
}
TraceEvent("DeletedTenant").detail("Tenant", name).detail("Version", tr->getCommittedVersion());
return Void();
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
// This should only be called from a transaction that has already confirmed that the cluster entry
// is present. The updatedEntry should use the existing entry and modify only those fields that need
// to be changed.
template <class Transaction>
void configureTenantTransaction(Transaction tr, TenantNameRef tenantName, TenantMapEntry tenantEntry) {
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
tr->set(tenantName.withPrefix(tenantMapPrefix), encodeTenantEntry(tenantEntry));
}
ACTOR template <class Transaction>
Future<std::map<TenantName, TenantMapEntry>> listTenantsTransaction(Transaction tr,
TenantNameRef begin,
TenantNameRef end,
int limit) {
state KeyRange range = KeyRangeRef(begin, end).withPrefix(tenantMapPrefix);
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
state typename transaction_future_type<Transaction, RangeResult>::type listFuture =
tr->getRange(firstGreaterOrEqual(range.begin), firstGreaterOrEqual(range.end), limit);
RangeResult results = wait(safeThreadFutureToFuture(listFuture));
std::map<TenantName, TenantMapEntry> tenants;
for (auto kv : results) {
tenants[kv.key.removePrefix(tenantMapPrefix)] = decodeTenantEntry(kv.value);
}
return tenants;
}
ACTOR template <class DB>
Future<std::map<TenantName, TenantMapEntry>> listTenants(Reference<DB> db,
TenantName begin,
TenantName end,
int limit) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
loop {
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
std::map<TenantName, TenantMapEntry> tenants = wait(listTenantsTransaction(tr, begin, end, limit));
return tenants;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
} // namespace ManagementAPI
#include "flow/unactorcompiler.h"
#endif

View File

@ -237,6 +237,8 @@ ERROR( invalid_metacluster_operation, 2151, "Metacluster operation performed on
ERROR( cluster_already_exists, 2152, "A data cluster with the given name already exists" )
ERROR( cluster_not_found, 2153, "Data cluster does not exist" )
ERROR( cluster_not_empty, 2154, "Data cluster must be empty" )
ERROR( cluster_configuration_failure, 2155, "Could not configure cluster" )
ERROR( cluster_already_registered, 2156, "Data cluster is already registered with a metacluster" )
// 2200 - errors from bindings and official APIs
ERROR( api_version_unset, 2200, "API version is not set" )