Add the ability to configure tenant groups in a metacluster
This commit is contained in:
parent
860d3843cc
commit
8f77048e95
|
@ -95,15 +95,6 @@ void applyConfigurationToSpecialKeys(Reference<ITransaction> tr,
|
|||
}
|
||||
}
|
||||
|
||||
void applyConfigurationToTenantMapEntry(std::map<Standalone<StringRef>, Optional<Value>> configuration,
|
||||
TenantMapEntry& entry) {
|
||||
for (auto [configName, value] : configuration) {
|
||||
if (configName == "tenant_group"_sr) {
|
||||
entry.tenantGroup = value;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// createtenant command
|
||||
ACTOR Future<bool> createTenantCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens) {
|
||||
if (tokens.size() < 2 || tokens.size() > 3) {
|
||||
|
@ -123,28 +114,27 @@ ACTOR Future<bool> createTenantCommandActor(Reference<IDatabase> db, std::vector
|
|||
}
|
||||
|
||||
loop {
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
|
||||
try {
|
||||
state Future<ClusterType> clusterTypeFuture = TenantAPI::getClusterType(tr);
|
||||
|
||||
if (!doneExistenceCheck) {
|
||||
// Hold the reference to the standalone's memory
|
||||
state ThreadFuture<Optional<Value>> existingTenantFuture = tr->get(tenantNameKey);
|
||||
Optional<Value> existingTenant = wait(safeThreadFutureToFuture(existingTenantFuture));
|
||||
if (existingTenant.present()) {
|
||||
throw tenant_already_exists();
|
||||
}
|
||||
doneExistenceCheck = true;
|
||||
}
|
||||
|
||||
ClusterType clusterType = wait(clusterTypeFuture);
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
state ClusterType clusterType = wait(TenantAPI::getClusterType(tr));
|
||||
if (clusterType == ClusterType::METACLUSTER_MANAGEMENT) {
|
||||
TenantMapEntry tenantEntry;
|
||||
applyConfigurationToTenantMapEntry(configuration.get(), tenantEntry);
|
||||
for (auto const& [name, value] : configuration.get()) {
|
||||
tenantEntry.configure(name, value);
|
||||
}
|
||||
wait(MetaclusterAPI::createTenant(db, tokens[1], tenantEntry));
|
||||
} else {
|
||||
if (!doneExistenceCheck) {
|
||||
// Hold the reference to the standalone's memory
|
||||
state ThreadFuture<Optional<Value>> existingTenantFuture = tr->get(tenantNameKey);
|
||||
Optional<Value> existingTenant = wait(safeThreadFutureToFuture(existingTenantFuture));
|
||||
if (existingTenant.present()) {
|
||||
throw tenant_already_exists();
|
||||
}
|
||||
doneExistenceCheck = true;
|
||||
}
|
||||
|
||||
tr->set(tenantNameKey, ValueRef());
|
||||
applyConfigurationToSpecialKeys(tr, tokens[1], configuration.get());
|
||||
wait(safeThreadFutureToFuture(tr->commit()));
|
||||
|
@ -185,28 +175,27 @@ ACTOR Future<bool> deleteTenantCommandActor(Reference<IDatabase> db, std::vector
|
|||
state bool doneExistenceCheck = false;
|
||||
|
||||
loop {
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
try {
|
||||
state Future<ClusterType> clusterTypeFuture = TenantAPI::getClusterType(tr);
|
||||
|
||||
if (!doneExistenceCheck) {
|
||||
// Hold the reference to the standalone's memory
|
||||
state ThreadFuture<Optional<Value>> existingTenantFuture = tr->get(tenantNameKey);
|
||||
Optional<Value> existingTenant = wait(safeThreadFutureToFuture(existingTenantFuture));
|
||||
if (!existingTenant.present()) {
|
||||
throw tenant_not_found();
|
||||
}
|
||||
doneExistenceCheck = true;
|
||||
}
|
||||
|
||||
ClusterType clusterType = wait(clusterTypeFuture);
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
state ClusterType clusterType = wait(TenantAPI::getClusterType(tr));
|
||||
if (clusterType == ClusterType::METACLUSTER_MANAGEMENT) {
|
||||
wait(MetaclusterAPI::deleteTenant(db, tokens[1]));
|
||||
} else {
|
||||
if (!doneExistenceCheck) {
|
||||
// Hold the reference to the standalone's memory
|
||||
state ThreadFuture<Optional<Value>> existingTenantFuture = tr->get(tenantNameKey);
|
||||
Optional<Value> existingTenant = wait(safeThreadFutureToFuture(existingTenantFuture));
|
||||
if (!existingTenant.present()) {
|
||||
throw tenant_not_found();
|
||||
}
|
||||
doneExistenceCheck = true;
|
||||
}
|
||||
|
||||
tr->clear(tenantNameKey);
|
||||
wait(safeThreadFutureToFuture(tr->commit()));
|
||||
}
|
||||
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
state Error err(e);
|
||||
|
@ -237,8 +226,8 @@ ACTOR Future<bool> listTenantsCommandActor(Reference<IDatabase> db, std::vector<
|
|||
return false;
|
||||
}
|
||||
|
||||
StringRef beginTenant = ""_sr;
|
||||
StringRef endTenant = "\xff\xff"_sr;
|
||||
state StringRef beginTenant = ""_sr;
|
||||
state StringRef endTenant = "\xff\xff"_sr;
|
||||
state int limit = 100;
|
||||
|
||||
if (tokens.size() >= 2) {
|
||||
|
@ -265,12 +254,26 @@ ACTOR Future<bool> listTenantsCommandActor(Reference<IDatabase> db, std::vector<
|
|||
|
||||
loop {
|
||||
try {
|
||||
// Hold the reference to the standalone's memory
|
||||
state ThreadFuture<RangeResult> kvsFuture =
|
||||
tr->getRange(firstGreaterOrEqual(beginTenantKey), firstGreaterOrEqual(endTenantKey), limit);
|
||||
RangeResult tenants = wait(safeThreadFutureToFuture(kvsFuture));
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
state ClusterType clusterType = wait(TenantAPI::getClusterType(tr));
|
||||
state std::vector<TenantNameRef> tenantNames;
|
||||
if (clusterType == ClusterType::METACLUSTER_MANAGEMENT) {
|
||||
std::vector<std::pair<TenantName, TenantMapEntry>> tenants =
|
||||
wait(MetaclusterAPI::listTenantsTransaction(tr, beginTenant, endTenant, limit));
|
||||
for (auto tenant : tenants) {
|
||||
tenantNames.push_back(tenant.first);
|
||||
}
|
||||
} else {
|
||||
// Hold the reference to the standalone's memory
|
||||
state ThreadFuture<RangeResult> kvsFuture =
|
||||
tr->getRange(firstGreaterOrEqual(beginTenantKey), firstGreaterOrEqual(endTenantKey), limit);
|
||||
RangeResult tenants = wait(safeThreadFutureToFuture(kvsFuture));
|
||||
for (auto tenant : tenants) {
|
||||
tenantNames.push_back(tenant.key.removePrefix(tenantMapSpecialKeyRange.begin));
|
||||
}
|
||||
}
|
||||
|
||||
if (tenants.empty()) {
|
||||
if (tenantNames.empty()) {
|
||||
if (tokens.size() == 1) {
|
||||
fmt::print("The cluster has no tenants\n");
|
||||
} else {
|
||||
|
@ -279,9 +282,8 @@ ACTOR Future<bool> listTenantsCommandActor(Reference<IDatabase> db, std::vector<
|
|||
}
|
||||
|
||||
int index = 0;
|
||||
for (auto tenant : tenants) {
|
||||
fmt::print(
|
||||
" {}. {}\n", ++index, printable(tenant.key.removePrefix(tenantMapSpecialKeyRange.begin)).c_str());
|
||||
for (auto tenantName : tenantNames) {
|
||||
fmt::print(" {}. {}\n", ++index, printable(tenantName).c_str());
|
||||
}
|
||||
|
||||
return true;
|
||||
|
@ -317,15 +319,24 @@ ACTOR Future<bool> getTenantCommandActor(Reference<IDatabase> db, std::vector<St
|
|||
|
||||
loop {
|
||||
try {
|
||||
// Hold the reference to the standalone's memory
|
||||
state ThreadFuture<Optional<Value>> tenantFuture = tr->get(tenantNameKey);
|
||||
Optional<Value> tenant = wait(safeThreadFutureToFuture(tenantFuture));
|
||||
if (!tenant.present()) {
|
||||
throw tenant_not_found();
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
state ClusterType clusterType = wait(TenantAPI::getClusterType(tr));
|
||||
state std::string tenantJson;
|
||||
if (clusterType == ClusterType::METACLUSTER_MANAGEMENT) {
|
||||
TenantMapEntry entry = wait(MetaclusterAPI::getTenantTransaction(tr, tokens[1]));
|
||||
tenantJson = entry.toJson(apiVersion);
|
||||
} else {
|
||||
// Hold the reference to the standalone's memory
|
||||
state ThreadFuture<Optional<Value>> tenantFuture = tr->get(tenantNameKey);
|
||||
Optional<Value> tenant = wait(safeThreadFutureToFuture(tenantFuture));
|
||||
if (!tenant.present()) {
|
||||
throw tenant_not_found();
|
||||
}
|
||||
tenantJson = tenant.get().toString();
|
||||
}
|
||||
|
||||
json_spirit::mValue jsonObject;
|
||||
json_spirit::read_string(tenant.get().toString(), jsonObject);
|
||||
json_spirit::read_string(tenantJson, jsonObject);
|
||||
|
||||
if (useJson) {
|
||||
json_spirit::mObject resultObj;
|
||||
|
@ -420,10 +431,17 @@ ACTOR Future<bool> configureTenantCommandActor(Reference<IDatabase> db, std::vec
|
|||
state Reference<ITransaction> tr = db->createTransaction();
|
||||
|
||||
loop {
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
try {
|
||||
applyConfigurationToSpecialKeys(tr, tokens[1], configuration.get());
|
||||
wait(safeThreadFutureToFuture(tr->commit()));
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
ClusterType clusterType = wait(TenantAPI::getClusterType(tr));
|
||||
if (clusterType == ClusterType::METACLUSTER_MANAGEMENT) {
|
||||
TenantMapEntry tenantEntry;
|
||||
wait(MetaclusterAPI::configureTenant(db, tokens[1], configuration.get()));
|
||||
} else {
|
||||
applyConfigurationToSpecialKeys(tr, tokens[1], configuration.get());
|
||||
wait(safeThreadFutureToFuture(tr->commit()));
|
||||
}
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
state Error err(e);
|
||||
|
|
|
@ -18,8 +18,10 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/Tenant.h"
|
||||
#include "fdbclient/libb64/encode.h"
|
||||
#include "flow/UnitTest.h"
|
||||
|
||||
Key TenantMapEntry::idToPrefix(int64_t id) {
|
||||
|
@ -43,6 +45,8 @@ std::string TenantMapEntry::tenantStateToString(TenantState tenantState) {
|
|||
return "ready";
|
||||
case TenantState::REMOVING:
|
||||
return "removing";
|
||||
case TenantState::UPDATING_CONFIGURATION:
|
||||
return "updating configuration";
|
||||
case TenantState::ERROR:
|
||||
return "error";
|
||||
default:
|
||||
|
@ -57,6 +61,8 @@ TenantState TenantMapEntry::stringToTenantState(std::string stateStr) {
|
|||
return TenantState::READY;
|
||||
} else if (stateStr == "removing") {
|
||||
return TenantState::REMOVING;
|
||||
} else if (stateStr == "updating configuration") {
|
||||
return TenantState::UPDATING_CONFIGURATION;
|
||||
} else if (stateStr == "error") {
|
||||
return TenantState::ERROR;
|
||||
}
|
||||
|
@ -92,6 +98,51 @@ bool TenantMapEntry::matchesConfiguration(TenantMapEntry const& other) const {
|
|||
return tenantGroup == other.tenantGroup;
|
||||
}
|
||||
|
||||
void TenantMapEntry::configure(Standalone<StringRef> parameter, Optional<Value> value) {
|
||||
if (parameter == "tenant_group"_sr) {
|
||||
tenantGroup = value;
|
||||
} else {
|
||||
TraceEvent(SevWarnAlways, "UnknownTenantConfigurationParameter").detail("Parameter", parameter);
|
||||
throw invalid_tenant_configuration();
|
||||
}
|
||||
}
|
||||
|
||||
std::string TenantMapEntry::toJson(int apiVersion) const {
|
||||
json_spirit::mObject tenantEntry;
|
||||
tenantEntry["id"] = id;
|
||||
|
||||
if (apiVersion >= 720 || apiVersion == Database::API_VERSION_LATEST) {
|
||||
json_spirit::mObject prefixObject;
|
||||
std::string encodedPrefix = base64::encoder::from_string(prefix.toString());
|
||||
// Remove trailing newline
|
||||
encodedPrefix.resize(encodedPrefix.size() - 1);
|
||||
|
||||
prefixObject["base64"] = encodedPrefix;
|
||||
prefixObject["printable"] = printable(prefix);
|
||||
tenantEntry["prefix"] = prefixObject;
|
||||
} else {
|
||||
// This is not a standard encoding in JSON, and some libraries may not be able to easily decode it
|
||||
tenantEntry["prefix"] = prefix.toString();
|
||||
}
|
||||
|
||||
tenantEntry["tenant_state"] = TenantMapEntry::tenantStateToString(tenantState);
|
||||
if (assignedCluster.present()) {
|
||||
tenantEntry["assigned_cluster"] = assignedCluster.get().toString();
|
||||
}
|
||||
if (tenantGroup.present()) {
|
||||
json_spirit::mObject tenantGroupObject;
|
||||
std::string encodedTenantGroup = base64::encoder::from_string(tenantGroup.get().toString());
|
||||
// Remove trailing newline
|
||||
encodedTenantGroup.resize(encodedTenantGroup.size() - 1);
|
||||
|
||||
tenantGroupObject["base64"] = encodedTenantGroup;
|
||||
tenantGroupObject["printable"] = printable(tenantGroup.get());
|
||||
tenantEntry["tenant_group"] = tenantGroupObject;
|
||||
}
|
||||
|
||||
return json_spirit::write_string(json_spirit::mValue(tenantEntry));
|
||||
}
|
||||
|
||||
TEST_CASE("/fdbclient/TenantMapEntry/Serialization") {
|
||||
TenantMapEntry entry1(1, ""_sr, TenantState::READY);
|
||||
ASSERT(entry1.prefix == "\x00\x00\x00\x00\x00\x00\x00\x01"_sr);
|
||||
|
|
|
@ -1071,17 +1071,20 @@ Future<Void> deleteTenant(Reference<DB> db, TenantName name) {
|
|||
state int64_t tenantId;
|
||||
state DataClusterMetadata clusterMetadata;
|
||||
state bool alreadyRemoving = false;
|
||||
state Future<Void> tenantModeCheck;
|
||||
state Future<ClusterType> clusterTypeFuture;
|
||||
|
||||
// Step 1: get the assigned location of the tenant
|
||||
state Reference<typename DB::TransactionT> managementTr = db->createTransaction();
|
||||
loop {
|
||||
try {
|
||||
managementTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tenantModeCheck = TenantAPI::checkTenantMode(managementTr, ClusterType::METACLUSTER_MANAGEMENT);
|
||||
clusterTypeFuture = TenantAPI::getClusterType(managementTr);
|
||||
|
||||
state Optional<TenantMapEntry> tenantEntry1 = wait(tryGetTenantTransaction(managementTr, name));
|
||||
wait(tenantModeCheck);
|
||||
ClusterType clusterType = wait(clusterTypeFuture);
|
||||
if (clusterType != ClusterType::METACLUSTER_MANAGEMENT) {
|
||||
throw invalid_metacluster_operation();
|
||||
}
|
||||
|
||||
if (!tenantEntry1.present()) {
|
||||
throw tenant_not_found();
|
||||
|
@ -1166,9 +1169,12 @@ Future<Void> deleteTenant(Reference<DB> db, TenantName name) {
|
|||
loop {
|
||||
try {
|
||||
managementTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tenantModeCheck = TenantAPI::checkTenantMode(managementTr, ClusterType::METACLUSTER_MANAGEMENT);
|
||||
clusterTypeFuture = TenantAPI::getClusterType(managementTr);
|
||||
state Optional<TenantMapEntry> tenantEntry3 = wait(tryGetTenantTransaction(managementTr, name));
|
||||
wait(tenantModeCheck);
|
||||
ClusterType clusterType = wait(clusterTypeFuture);
|
||||
if (clusterType != ClusterType::METACLUSTER_MANAGEMENT) {
|
||||
throw invalid_metacluster_operation();
|
||||
}
|
||||
|
||||
if (!tenantEntry3.present() || tenantEntry3.get().id != tenantId) {
|
||||
return Void();
|
||||
|
@ -1261,6 +1267,189 @@ Future<std::vector<std::pair<TenantName, TenantMapEntry>>> listTenants(Reference
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
template <class DB>
|
||||
struct ConfigureTenantImpl {
|
||||
// Initialization parameters
|
||||
Reference<DB> managementDb;
|
||||
TenantName tenantName;
|
||||
std::map<Standalone<StringRef>, Optional<Value>> configurationParameters;
|
||||
|
||||
// Parameters set in updateManagementCluster
|
||||
TenantMapEntry updatedEntry;
|
||||
DataClusterMetadata clusterMetadata;
|
||||
|
||||
ConfigureTenantImpl(Reference<DB> managementDb,
|
||||
TenantName tenantName,
|
||||
std::map<Standalone<StringRef>, Optional<Value>> configurationParameters)
|
||||
: managementDb(managementDb), tenantName(tenantName), configurationParameters(configurationParameters) {}
|
||||
|
||||
ACTOR static Future<bool> checkTenantGroup(Optional<TenantGroupName> currentGroup,
|
||||
Optional<TenantGroupName> desiredGroup) {
|
||||
if (!desiredGroup.present() || currentGroup == desiredGroup) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// TODO: check where desired group is assigned and allow if the cluster is the same
|
||||
// SOMEDAY: It should also be possible to change the tenant group when we support tenant movement.
|
||||
wait(delay(0));
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// Updates the configuration in the management cluster and marks it as being in the UPDATING_CONFIGURATION state
|
||||
// Returns true if the update is complete and false if it needs to proceed to the next stage
|
||||
ACTOR static Future<bool> updateManagementCluster(ConfigureTenantImpl* self) {
|
||||
state Reference<typename DB::TransactionT> tr = self->managementDb->createTransaction();
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
state Future<ClusterType> clusterTypeFuture = TenantAPI::getClusterType(tr);
|
||||
|
||||
state Optional<TenantMapEntry> tenantEntry = wait(tryGetTenantTransaction(tr, self->tenantName));
|
||||
ClusterType clusterType = wait(clusterTypeFuture);
|
||||
if (clusterType != ClusterType::METACLUSTER_MANAGEMENT) {
|
||||
throw invalid_metacluster_operation();
|
||||
}
|
||||
|
||||
if (!tenantEntry.present()) {
|
||||
throw tenant_not_found();
|
||||
}
|
||||
|
||||
if (tenantEntry.get().tenantState != TenantState::READY &&
|
||||
tenantEntry.get().tenantState != TenantState::UPDATING_CONFIGURATION) {
|
||||
throw invalid_tenant_state();
|
||||
}
|
||||
|
||||
self->updatedEntry = tenantEntry.get();
|
||||
state std::map<Standalone<StringRef>, Optional<Value>>::iterator configItr;
|
||||
for (configItr = self->configurationParameters.begin();
|
||||
configItr != self->configurationParameters.end();
|
||||
++configItr) {
|
||||
if (configItr->first == "tenant_group"_sr) {
|
||||
bool canChangeTenantGroup =
|
||||
wait(checkTenantGroup(self->updatedEntry.tenantGroup, configItr->second));
|
||||
if (!canChangeTenantGroup) {
|
||||
TraceEvent(SevWarnAlways, "InvalidTenantGroupChange")
|
||||
.detail("Tenant", self->tenantName)
|
||||
.detail("CurrentTenantGroup", self->updatedEntry.tenantGroup)
|
||||
.detail("DesiredTenantGroup", configItr->second);
|
||||
// TODO: surface better error to fdbcli?
|
||||
throw invalid_tenant_configuration();
|
||||
}
|
||||
}
|
||||
self->updatedEntry.configure(configItr->first, configItr->second);
|
||||
}
|
||||
|
||||
if (tenantEntry.get().assignedCluster.present()) {
|
||||
Optional<DataClusterMetadata> _clusterMetadata =
|
||||
wait(tryGetClusterTransaction(tr, tenantEntry.get().assignedCluster.get()));
|
||||
|
||||
// A cluster cannot be removed through these APIs unless it has no tenants assigned to it.
|
||||
ASSERT(_clusterMetadata.present());
|
||||
|
||||
self->clusterMetadata = _clusterMetadata.get();
|
||||
self->updatedEntry.tenantState = TenantState::UPDATING_CONFIGURATION;
|
||||
}
|
||||
|
||||
++self->updatedEntry.configurationSequenceNum;
|
||||
ManagementClusterMetadata::tenantMetadata.tenantMap.set(tr, self->tenantName, self->updatedEntry);
|
||||
wait(buggifiedCommit(tr, BUGGIFY));
|
||||
|
||||
// If there is no assigned cluster, then we can terminate early
|
||||
return !tenantEntry.get().assignedCluster.present();
|
||||
} catch (Error& e) {
|
||||
wait(safeThreadFutureToFuture(tr->onError(e)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Updates the configuration in the data cluster
|
||||
ACTOR static Future<Void> updateDataCluster(ConfigureTenantImpl* self) {
|
||||
state Reference<IDatabase> dataClusterDb = wait(openDatabase(self->clusterMetadata.connectionString));
|
||||
state Reference<typename DB::TransactionT> tr = dataClusterDb->createTransaction();
|
||||
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
||||
state Future<Optional<MetaclusterRegistrationEntry>> metaclusterRegistrationFuture =
|
||||
MetaclusterMetadata::metaclusterRegistration.get(tr);
|
||||
|
||||
state Optional<TenantMapEntry> tenantEntry =
|
||||
wait(TenantAPI::tryGetTenantTransaction(tr, self->tenantName));
|
||||
state Optional<MetaclusterRegistrationEntry> metaclusterRegistration =
|
||||
wait(metaclusterRegistrationFuture);
|
||||
|
||||
if (!tenantEntry.present() || tenantEntry.get().id != self->updatedEntry.id ||
|
||||
tenantEntry.get().configurationSequenceNum >= self->updatedEntry.configurationSequenceNum ||
|
||||
!metaclusterRegistration.present() ||
|
||||
metaclusterRegistration.get().clusterType != ClusterType::METACLUSTER_DATA) {
|
||||
// If the tenant or cluster isn't in the metacluster, it must have been concurrently removed
|
||||
return Void();
|
||||
}
|
||||
|
||||
self->updatedEntry.tenantState = TenantState::READY;
|
||||
TenantAPI::configureTenantTransaction(tr, self->tenantName, self->updatedEntry);
|
||||
wait(buggifiedCommit(tr, BUGGIFY));
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(safeThreadFutureToFuture(tr->onError(e)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Updates the tenant state in the management cluster to READY
|
||||
ACTOR static Future<Void> markManagementTenantAsReady(ConfigureTenantImpl* self) {
|
||||
state Reference<typename DB::TransactionT> tr = self->managementDb->createTransaction();
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
state Future<ClusterType> clusterTypeFuture = TenantAPI::getClusterType(tr);
|
||||
|
||||
state Optional<TenantMapEntry> tenantEntry = wait(tryGetTenantTransaction(tr, self->tenantName));
|
||||
ClusterType clusterType = wait(clusterTypeFuture);
|
||||
if (clusterType != ClusterType::METACLUSTER_MANAGEMENT) {
|
||||
throw invalid_metacluster_operation();
|
||||
}
|
||||
|
||||
if (!tenantEntry.present() || tenantEntry.get().id != self->updatedEntry.id ||
|
||||
tenantEntry.get().tenantState != TenantState::UPDATING_CONFIGURATION ||
|
||||
tenantEntry.get().configurationSequenceNum > self->updatedEntry.configurationSequenceNum) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
tenantEntry.get().tenantState = TenantState::READY;
|
||||
ManagementClusterMetadata::tenantMetadata.tenantMap.set(tr, self->tenantName, self->updatedEntry);
|
||||
wait(buggifiedCommit(tr, BUGGIFY));
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(safeThreadFutureToFuture(tr->onError(e)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> run(ConfigureTenantImpl* self) {
|
||||
bool updateIsComplete = wait(updateManagementCluster(self));
|
||||
if (!updateIsComplete) {
|
||||
wait(updateDataCluster(self));
|
||||
wait(markManagementTenantAsReady(self));
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
Future<Void> run() { return run(this); }
|
||||
};
|
||||
|
||||
ACTOR template <class DB>
|
||||
Future<Void> configureTenant(Reference<DB> db,
|
||||
TenantName name,
|
||||
std::map<Standalone<StringRef>, Optional<Value>> configurationParameters) {
|
||||
state ConfigureTenantImpl<DB> impl(db, name, configurationParameters);
|
||||
wait(impl.run());
|
||||
return Void();
|
||||
}
|
||||
|
||||
}; // namespace MetaclusterAPI
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
|
|
|
@ -32,7 +32,7 @@ typedef Standalone<TenantNameRef> TenantName;
|
|||
typedef StringRef TenantGroupNameRef;
|
||||
typedef Standalone<TenantGroupNameRef> TenantGroupName;
|
||||
|
||||
enum class TenantState { REGISTERING, READY, REMOVING, ERROR };
|
||||
enum class TenantState { REGISTERING, READY, REMOVING, UPDATING_CONFIGURATION, ERROR };
|
||||
|
||||
struct TenantMapEntry {
|
||||
constexpr static FileIdentifier file_identifier = 12247338;
|
||||
|
@ -50,6 +50,7 @@ struct TenantMapEntry {
|
|||
TenantState tenantState = TenantState::READY;
|
||||
// TODO: fix this type
|
||||
Optional<Standalone<StringRef>> assignedCluster;
|
||||
int64_t configurationSequenceNum = 0;
|
||||
|
||||
constexpr static int ROOT_PREFIX_SIZE = sizeof(id);
|
||||
|
||||
|
@ -60,6 +61,10 @@ struct TenantMapEntry {
|
|||
void setSubspace(KeyRef subspace);
|
||||
bool matchesConfiguration(TenantMapEntry const& other) const;
|
||||
|
||||
void configure(Standalone<StringRef> parameter, Optional<Value> value);
|
||||
|
||||
std::string toJson(int apiVersion) const;
|
||||
|
||||
Value encode() const { return ObjectWriter::toValue(*this, IncludeVersion(ProtocolVersion::withTenantGroups())); }
|
||||
|
||||
static TenantMapEntry decode(ValueRef const& value) {
|
||||
|
@ -74,7 +79,7 @@ struct TenantMapEntry {
|
|||
KeyRef subspace;
|
||||
if (ar.isDeserializing) {
|
||||
if (ar.protocolVersion().hasTenantGroups()) {
|
||||
serializer(ar, id, subspace, tenantGroup, tenantState, assignedCluster);
|
||||
serializer(ar, id, subspace, tenantGroup, tenantState, assignedCluster, configurationSequenceNum);
|
||||
ASSERT(tenantState >= TenantState::REGISTERING && tenantState <= TenantState::ERROR);
|
||||
} else {
|
||||
serializer(ar, id, subspace);
|
||||
|
@ -89,7 +94,7 @@ struct TenantMapEntry {
|
|||
subspace = prefix.substr(0, prefix.size() - 8);
|
||||
}
|
||||
ASSERT(tenantState >= TenantState::REGISTERING && tenantState <= TenantState::ERROR);
|
||||
serializer(ar, id, subspace, tenantGroup, tenantState, assignedCluster);
|
||||
serializer(ar, id, subspace, tenantGroup, tenantState, assignedCluster, configurationSequenceNum);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
|
@ -323,9 +323,9 @@ Future<Void> deleteTenant(Reference<DB> db,
|
|||
}
|
||||
}
|
||||
|
||||
// This should only be called from a transaction that has already confirmed that the cluster entry
|
||||
// is present. The updatedEntry should use the existing entry and modify only those fields that need
|
||||
// to be changed.
|
||||
// This should only be called from a transaction that has already confirmed that the tenant entry
|
||||
// is present. The tenantEntry should start with the existing entry and modify only those fields that need
|
||||
// to be changed. This must only be called on a non-management cluster.
|
||||
template <class Transaction>
|
||||
void configureTenantTransaction(Transaction tr, TenantNameRef tenantName, TenantMapEntry tenantEntry) {
|
||||
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
|
||||
|
|
|
@ -32,7 +32,6 @@
|
|||
#include "fdbclient/SpecialKeySpace.actor.h"
|
||||
#include "fdbclient/TenantManagement.actor.h"
|
||||
#include "fdbclient/Tuple.h"
|
||||
#include "fdbclient/libb64/encode.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/UnitTest.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
@ -77,41 +76,8 @@ private:
|
|||
wait(TenantAPI::listTenantsTransaction(&ryw->getTransaction(), kr.begin, kr.end, limitsHint.rows));
|
||||
|
||||
for (auto tenant : tenants) {
|
||||
json_spirit::mObject tenantEntry;
|
||||
tenantEntry["id"] = tenant.second.id;
|
||||
|
||||
if (ryw->getDatabase()->apiVersionAtLeast(720)) {
|
||||
json_spirit::mObject prefixObject;
|
||||
std::string encodedPrefix = base64::encoder::from_string(tenant.second.prefix.toString());
|
||||
// Remove trailing newline
|
||||
encodedPrefix.resize(encodedPrefix.size() - 1);
|
||||
|
||||
prefixObject["base64"] = encodedPrefix;
|
||||
prefixObject["printable"] = printable(tenant.second.prefix);
|
||||
tenantEntry["prefix"] = prefixObject;
|
||||
} else {
|
||||
// This is not a standard encoding in JSON, and some libraries may not be able to easily decode it
|
||||
tenantEntry["prefix"] = tenant.second.prefix.toString();
|
||||
}
|
||||
|
||||
tenantEntry["tenant_state"] = TenantMapEntry::tenantStateToString(tenant.second.tenantState);
|
||||
if (tenant.second.assignedCluster.present()) {
|
||||
tenantEntry["assigned_cluster"] = tenant.second.assignedCluster.get().toString();
|
||||
}
|
||||
if (tenant.second.tenantGroup.present()) {
|
||||
json_spirit::mObject tenantGroupObject;
|
||||
std::string encodedTenantGroup =
|
||||
base64::encoder::from_string(tenant.second.tenantGroup.get().toString());
|
||||
// Remove trailing newline
|
||||
encodedTenantGroup.resize(encodedTenantGroup.size() - 1);
|
||||
|
||||
tenantGroupObject["base64"] = encodedTenantGroup;
|
||||
tenantGroupObject["printable"] = printable(tenant.second.tenantGroup.get());
|
||||
tenantEntry["tenant_group"] = tenantGroupObject;
|
||||
}
|
||||
|
||||
std::string tenantEntryString = json_spirit::write_string(json_spirit::mValue(tenantEntry));
|
||||
ValueRef tenantEntryBytes(results->arena(), tenantEntryString);
|
||||
std::string jsonString = tenant.second.toJson(ryw->getDatabase()->apiVersion);
|
||||
ValueRef tenantEntryBytes(results->arena(), jsonString);
|
||||
results->push_back(results->arena(),
|
||||
KeyValueRef(withTenantMapPrefix(tenant.first, results->arena()), tenantEntryBytes));
|
||||
}
|
||||
|
@ -141,75 +107,16 @@ private:
|
|||
return results;
|
||||
}
|
||||
|
||||
ACTOR static Future<bool> checkTenantGroup(ReadYourWritesTransaction* ryw,
|
||||
Optional<TenantGroupName> currentGroup,
|
||||
Optional<TenantGroupName> desiredGroup) {
|
||||
if (!desiredGroup.present() || currentGroup == desiredGroup) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// TODO: check where desired group is assigned and allow if the cluster is the same
|
||||
// SOMEDAY: It should also be possible to change the tenant group when we support tenant movement.
|
||||
wait(delay(0));
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> applyTenantConfig(
|
||||
ReadYourWritesTransaction* ryw,
|
||||
TenantNameRef tenantName,
|
||||
std::vector<std::pair<Standalone<StringRef>, Optional<Value>>> configEntries,
|
||||
TenantMapEntry* tenantEntry,
|
||||
bool creatingTenant) {
|
||||
|
||||
state std::vector<std::pair<Standalone<StringRef>, Optional<Value>>>::iterator configItr;
|
||||
for (configItr = configEntries.begin(); configItr != configEntries.end(); ++configItr) {
|
||||
if (configItr->first == "tenant_group"_sr) {
|
||||
state bool isValidTenantGroup = true;
|
||||
if (!creatingTenant) {
|
||||
bool result = wait(checkTenantGroup(ryw, tenantEntry->tenantGroup, configItr->second));
|
||||
isValidTenantGroup = result;
|
||||
}
|
||||
if (isValidTenantGroup) {
|
||||
tenantEntry->tenantGroup = configItr->second;
|
||||
} else {
|
||||
TraceEvent(SevWarn, "CannotChangeTenantGroup")
|
||||
.detail("TenantName", tenantName)
|
||||
.detail("CurrentTenantGroup", tenantEntry->tenantGroup)
|
||||
.detail("DesiredTenantGroup", configItr->second);
|
||||
ryw->setSpecialKeySpaceErrorMsg(ManagementAPIError::toJsonString(
|
||||
false,
|
||||
"set tenant configuration",
|
||||
format("cannot change tenant group for tenant `%s'", tenantName.toString().c_str())));
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
} else {
|
||||
TraceEvent(SevWarn, "InvalidTenantConfig")
|
||||
.detail("TenantName", tenantName)
|
||||
.detail("ConfigName", configItr->first);
|
||||
ryw->setSpecialKeySpaceErrorMsg(
|
||||
ManagementAPIError::toJsonString(false,
|
||||
"set tenant configuration",
|
||||
format("invalid tenant configuration option `%s' for tenant `%s'",
|
||||
configItr->first.toString().c_str(),
|
||||
tenantName.toString().c_str())));
|
||||
throw special_keys_api_failure();
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> createTenant(
|
||||
ReadYourWritesTransaction* ryw,
|
||||
TenantNameRef tenantName,
|
||||
Optional<std::vector<std::pair<Standalone<StringRef>, Optional<Value>>>> configMutations,
|
||||
std::vector<std::pair<Standalone<StringRef>, Optional<Value>>> configMutations,
|
||||
int64_t tenantId) {
|
||||
state TenantMapEntry tenantEntry;
|
||||
tenantEntry.id = tenantId;
|
||||
|
||||
if (configMutations.present()) {
|
||||
wait(applyTenantConfig(ryw, tenantName, configMutations.get(), &tenantEntry, true));
|
||||
for (auto const& [name, value] : configMutations) {
|
||||
tenantEntry.configure(name, value);
|
||||
}
|
||||
|
||||
std::pair<Optional<TenantMapEntry>, bool> entry =
|
||||
|
@ -220,7 +127,7 @@ private:
|
|||
|
||||
ACTOR static Future<Void> createTenants(
|
||||
ReadYourWritesTransaction* ryw,
|
||||
std::map<TenantName, Optional<std::vector<std::pair<Standalone<StringRef>, Optional<Value>>>>> tenants) {
|
||||
std::map<TenantName, std::vector<std::pair<Standalone<StringRef>, Optional<Value>>>> tenants) {
|
||||
int64_t _nextId = wait(TenantAPI::getNextTenantId(&ryw->getTransaction()));
|
||||
int64_t nextId = _nextId;
|
||||
|
||||
|
@ -236,10 +143,12 @@ private:
|
|||
|
||||
ACTOR static Future<Void> changeTenantConfig(
|
||||
ReadYourWritesTransaction* ryw,
|
||||
TenantNameRef tenantName,
|
||||
TenantName tenantName,
|
||||
std::vector<std::pair<Standalone<StringRef>, Optional<Value>>> configEntries) {
|
||||
state TenantMapEntry tenantEntry = wait(TenantAPI::getTenantTransaction(ryw, tenantName));
|
||||
wait(applyTenantConfig(ryw, tenantName, configEntries, &tenantEntry, false));
|
||||
state TenantMapEntry tenantEntry = wait(TenantAPI::getTenantTransaction(&ryw->getTransaction(), tenantName));
|
||||
for (auto const& [name, value] : configEntries) {
|
||||
tenantEntry.configure(name, value);
|
||||
}
|
||||
TenantAPI::configureTenantTransaction(&ryw->getTransaction(), tenantName, tenantEntry);
|
||||
|
||||
return Void();
|
||||
|
@ -248,13 +157,9 @@ private:
|
|||
ACTOR static Future<Void> deleteTenantRange(ReadYourWritesTransaction* ryw,
|
||||
TenantName beginTenant,
|
||||
TenantName endTenant) {
|
||||
state Future<Void> tenantModeCheck =
|
||||
TenantAPI::checkTenantMode(&ryw->getTransaction(), ClusterType::STANDALONE);
|
||||
state std::vector<std::pair<TenantName, TenantMapEntry>> tenants = wait(TenantAPI::listTenantsTransaction(
|
||||
&ryw->getTransaction(), beginTenant, endTenant, CLIENT_KNOBS->MAX_TENANTS_PER_CLUSTER + 1));
|
||||
|
||||
wait(tenantModeCheck);
|
||||
|
||||
if (tenants.size() > CLIENT_KNOBS->MAX_TENANTS_PER_CLUSTER) {
|
||||
TraceEvent(SevWarn, "DeleteTenantRangeTooLange")
|
||||
.detail("BeginTenant", beginTenant)
|
||||
|
@ -296,6 +201,8 @@ public:
|
|||
std::vector<std::pair<KeyRangeRef, Optional<Value>>> mapMutations;
|
||||
std::map<TenantName, std::vector<std::pair<Standalone<StringRef>, Optional<Value>>>> configMutations;
|
||||
|
||||
tenantManagementFutures.push_back(TenantAPI::checkTenantMode(&ryw->getTransaction(), ClusterType::STANDALONE));
|
||||
|
||||
for (auto range : ranges) {
|
||||
if (!range.value().first) {
|
||||
continue;
|
||||
|
@ -328,11 +235,11 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
std::map<TenantName, Optional<std::vector<std::pair<Standalone<StringRef>, Optional<Value>>>>> tenantsToCreate;
|
||||
std::map<TenantName, std::vector<std::pair<Standalone<StringRef>, Optional<Value>>>> tenantsToCreate;
|
||||
for (auto mapMutation : mapMutations) {
|
||||
TenantNameRef tenantName = mapMutation.first.begin;
|
||||
if (mapMutation.second.present()) {
|
||||
Optional<std::vector<std::pair<Standalone<StringRef>, Optional<Value>>>> createMutations;
|
||||
std::vector<std::pair<Standalone<StringRef>, Optional<Value>>> createMutations;
|
||||
auto itr = configMutations.find(tenantName);
|
||||
if (itr != configMutations.end()) {
|
||||
createMutations = itr->second;
|
||||
|
|
|
@ -9,6 +9,7 @@ For details, see http://sourceforge.net/projects/libb64
|
|||
#define BASE64_DECODE_H
|
||||
|
||||
#include <iostream>
|
||||
#include "fdbclient/libb64/encode.h"
|
||||
|
||||
namespace base64 {
|
||||
extern "C" {
|
||||
|
|
|
@ -259,17 +259,17 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
// Creates tenant(s) using the specified operation type
|
||||
ACTOR Future<Void> createImpl(Database cx,
|
||||
Reference<ReadYourWritesTransaction> tr,
|
||||
std::map<TenantName, TenantMapEntry> tenantsToCreate,
|
||||
OperationType operationType,
|
||||
TenantManagementWorkload* self) {
|
||||
ACTOR static Future<Void> createImpl(Reference<ReadYourWritesTransaction> tr,
|
||||
std::map<TenantName, TenantMapEntry> tenantsToCreate,
|
||||
OperationType operationType,
|
||||
TenantManagementWorkload* self) {
|
||||
if (operationType == OperationType::SPECIAL_KEYS) {
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
for (auto [tenant, entry] : tenantsToCreate) {
|
||||
tr->set(self->specialKeysTenantMapPrefix.withSuffix(tenant), ""_sr);
|
||||
if (entry.tenantGroup.present()) {
|
||||
tr->set(self->specialKeysTenantConfigPrefix.withSuffix("tenant_group/"_sr).withSuffix(tenant),
|
||||
tr->set(self->specialKeysTenantConfigPrefix.withSuffix(
|
||||
Tuple::makeTuple(tenant, "tenant_group"_sr).pack()),
|
||||
entry.tenantGroup.get());
|
||||
}
|
||||
}
|
||||
|
@ -300,7 +300,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> createTenant(Database cx, TenantManagementWorkload* self) {
|
||||
ACTOR static Future<Void> createTenant(TenantManagementWorkload* self) {
|
||||
state OperationType operationType = self->randomOperationType();
|
||||
int numTenants = 1;
|
||||
|
||||
|
@ -339,9 +339,8 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
state bool retried = false;
|
||||
loop {
|
||||
try {
|
||||
Optional<Void> result =
|
||||
wait(timeout(self->createImpl(cx, tr, tenantsToCreate, operationType, self),
|
||||
deterministicRandom()->randomInt(1, 30)));
|
||||
Optional<Void> result = wait(timeout(createImpl(tr, tenantsToCreate, operationType, self),
|
||||
deterministicRandom()->randomInt(1, 30)));
|
||||
|
||||
if (result.present()) {
|
||||
// Database operations shouldn't get here if the tenant already exists
|
||||
|
@ -461,7 +460,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
// Perform some final tenant validation
|
||||
wait(self->checkTenantContents(self, tenantItr->first, self->createdTenants[tenantItr->first]));
|
||||
wait(checkTenantContents(self, tenantItr->first, self->createdTenants[tenantItr->first]));
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
@ -507,13 +506,12 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
// Deletes the tenant or tenant range using the specified operation type
|
||||
ACTOR Future<Void> deleteImpl(Database cx,
|
||||
Reference<ReadYourWritesTransaction> tr,
|
||||
TenantName beginTenant,
|
||||
Optional<TenantName> endTenant,
|
||||
std::vector<TenantName> tenants,
|
||||
OperationType operationType,
|
||||
TenantManagementWorkload* self) {
|
||||
ACTOR static Future<Void> deleteImpl(Reference<ReadYourWritesTransaction> tr,
|
||||
TenantName beginTenant,
|
||||
Optional<TenantName> endTenant,
|
||||
std::vector<TenantName> tenants,
|
||||
OperationType operationType,
|
||||
TenantManagementWorkload* self) {
|
||||
state int tenantIndex;
|
||||
if (operationType == OperationType::SPECIAL_KEYS) {
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
|
@ -544,7 +542,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> deleteTenant(Database cx, TenantManagementWorkload* self) {
|
||||
ACTOR static Future<Void> deleteTenant(TenantManagementWorkload* self) {
|
||||
state TenantName beginTenant = self->chooseTenantName(true);
|
||||
state OperationType operationType = self->randomOperationType();
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->dataDb);
|
||||
|
@ -630,7 +628,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
loop {
|
||||
try {
|
||||
Optional<Void> result =
|
||||
wait(timeout(self->deleteImpl(cx, tr, beginTenant, endTenant, tenants, operationType, self),
|
||||
wait(timeout(deleteImpl(tr, beginTenant, endTenant, tenants, operationType, self),
|
||||
deterministicRandom()->randomInt(1, 30)));
|
||||
|
||||
if (result.present()) {
|
||||
|
@ -748,7 +746,9 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
// Performs some validation on a tenant's contents
|
||||
ACTOR Future<Void> checkTenantContents(TenantManagementWorkload* self, TenantName tenant, TenantData tenantData) {
|
||||
ACTOR static Future<Void> checkTenantContents(TenantManagementWorkload* self,
|
||||
TenantName tenant,
|
||||
TenantData tenantData) {
|
||||
state Transaction tr(self->dataDb, tenant);
|
||||
loop {
|
||||
try {
|
||||
|
@ -826,11 +826,10 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
// Gets the metadata for a tenant using the specified operation type
|
||||
ACTOR Future<TenantMapEntry> getImpl(Database cx,
|
||||
Reference<ReadYourWritesTransaction> tr,
|
||||
TenantName tenant,
|
||||
OperationType operationType,
|
||||
TenantManagementWorkload* self) {
|
||||
ACTOR static Future<TenantMapEntry> getImpl(Reference<ReadYourWritesTransaction> tr,
|
||||
TenantName tenant,
|
||||
OperationType operationType,
|
||||
TenantManagementWorkload* self) {
|
||||
state TenantMapEntry entry;
|
||||
if (operationType == OperationType::SPECIAL_KEYS) {
|
||||
Key key = self->specialKeysTenantMapPrefix.withSuffix(tenant);
|
||||
|
@ -854,7 +853,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
return entry;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> getTenant(Database cx, TenantManagementWorkload* self) {
|
||||
ACTOR static Future<Void> getTenant(TenantManagementWorkload* self) {
|
||||
state TenantName tenant = self->chooseTenantName(true);
|
||||
state OperationType operationType = self->randomOperationType();
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->dataDb);
|
||||
|
@ -868,11 +867,11 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
loop {
|
||||
try {
|
||||
// Get the tenant metadata and check that it matches our local state
|
||||
state TenantMapEntry entry = wait(self->getImpl(cx, tr, tenant, operationType, self));
|
||||
state TenantMapEntry entry = wait(getImpl(tr, tenant, operationType, self));
|
||||
ASSERT(alreadyExists);
|
||||
ASSERT(entry.id == tenantData.id);
|
||||
ASSERT(entry.tenantGroup == tenantData.tenantGroup);
|
||||
wait(self->checkTenantContents(self, tenant, tenantData));
|
||||
wait(checkTenantContents(self, tenant, tenantData));
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
state bool retry = false;
|
||||
|
@ -904,13 +903,13 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
// Gets a list of tenants using the specified operation type
|
||||
ACTOR Future<std::vector<std::pair<TenantName, TenantMapEntry>>> listImpl(Database cx,
|
||||
Reference<ReadYourWritesTransaction> tr,
|
||||
TenantName beginTenant,
|
||||
TenantName endTenant,
|
||||
int limit,
|
||||
OperationType operationType,
|
||||
TenantManagementWorkload* self) {
|
||||
ACTOR static Future<std::vector<std::pair<TenantName, TenantMapEntry>>> listImpl(
|
||||
Reference<ReadYourWritesTransaction> tr,
|
||||
TenantName beginTenant,
|
||||
TenantName endTenant,
|
||||
int limit,
|
||||
OperationType operationType,
|
||||
TenantManagementWorkload* self) {
|
||||
state std::vector<std::pair<TenantName, TenantMapEntry>> tenants;
|
||||
|
||||
if (operationType == OperationType::SPECIAL_KEYS) {
|
||||
|
@ -938,7 +937,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
return tenants;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> listTenants(Database cx, TenantManagementWorkload* self) {
|
||||
ACTOR static Future<Void> listTenants(TenantManagementWorkload* self) {
|
||||
state TenantName beginTenant = self->chooseTenantName(false);
|
||||
state TenantName endTenant = self->chooseTenantName(false);
|
||||
state int limit = std::min(CLIENT_KNOBS->MAX_TENANTS_PER_CLUSTER + 1,
|
||||
|
@ -954,7 +953,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
try {
|
||||
// Attempt to read the chosen list of tenants
|
||||
state std::vector<std::pair<TenantName, TenantMapEntry>> tenants =
|
||||
wait(self->listImpl(cx, tr, beginTenant, endTenant, limit, operationType, self));
|
||||
wait(listImpl(tr, beginTenant, endTenant, limit, operationType, self));
|
||||
|
||||
// Attempting to read the list of tenants using the metacluster API in a non-metacluster should
|
||||
// return nothing in this test
|
||||
|
@ -1005,7 +1004,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> renameTenant(Database cx, TenantManagementWorkload* self) {
|
||||
ACTOR static Future<Void> renameTenant(TenantManagementWorkload* self) {
|
||||
// Currently only supporting MANAGEMENT_DATABASE op, so numTenants should always be 1
|
||||
// state OperationType operationType = TenantManagementWorkload::randomOperationType();
|
||||
int numTenants = 1;
|
||||
|
@ -1035,7 +1034,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
state TenantName oldTenantName = oldTenantNames[tenantIndex];
|
||||
state TenantName newTenantName = newTenantNames[tenantIndex];
|
||||
// Perform rename, then check against the DB for the new results
|
||||
wait(TenantAPI::renameTenant(cx.getReference(), oldTenantName, newTenantName));
|
||||
wait(TenantAPI::renameTenant(self->dataDb.getReference(), oldTenantName, newTenantName));
|
||||
ASSERT(!tenantNotFound && !tenantExists);
|
||||
state Optional<TenantMapEntry> oldTenantEntry =
|
||||
wait(self->tryGetTenant(oldTenantName, OperationType::SPECIAL_KEYS));
|
||||
|
@ -1049,7 +1048,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
self->createdTenants[newTenantName] = tData;
|
||||
self->createdTenants.erase(oldTenantName);
|
||||
if (!tData.empty) {
|
||||
state Transaction insertTr(cx, newTenantName);
|
||||
state Transaction insertTr(self->dataDb, newTenantName);
|
||||
loop {
|
||||
try {
|
||||
insertTr.set(self->keyName, newTenantName);
|
||||
|
@ -1060,7 +1059,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
}
|
||||
wait(self->checkTenantContents(self, newTenantName, self->createdTenants[newTenantName]));
|
||||
wait(checkTenantContents(self, newTenantName, self->createdTenants[newTenantName]));
|
||||
}
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
|
@ -1086,31 +1085,18 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> configureTenant(Database cx, TenantManagementWorkload* self) {
|
||||
state TenantName tenant = self->chooseTenantName(true);
|
||||
auto itr = self->createdTenants.find(tenant);
|
||||
state bool exists = itr != self->createdTenants.end();
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
|
||||
// Gets a list of tenants using the specified operation type
|
||||
ACTOR static Future<bool> configureImpl(Reference<ReadYourWritesTransaction> tr,
|
||||
TenantName tenant,
|
||||
std::map<Standalone<StringRef>, Optional<Value>> configParameters,
|
||||
OperationType operationType,
|
||||
TenantManagementWorkload* self) {
|
||||
if (operationType == OperationType::SPECIAL_KEYS) {
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
state bool hasInvalidSpecialKeyTuple = deterministicRandom()->random01() < 0.05;
|
||||
|
||||
state std::map<Standalone<StringRef>, Optional<Value>> configuration;
|
||||
state Optional<TenantGroupName> newTenantGroup;
|
||||
state bool hasInvalidOption = deterministicRandom()->random01() < 0.1;
|
||||
|
||||
if (!hasInvalidOption || deterministicRandom()->coinflip()) {
|
||||
newTenantGroup = self->chooseTenantGroup();
|
||||
configuration["tenant_group"_sr] = newTenantGroup;
|
||||
}
|
||||
if (hasInvalidOption) {
|
||||
configuration["invalid_option"_sr] = ""_sr;
|
||||
hasInvalidOption = true;
|
||||
}
|
||||
|
||||
state bool hasInvalidSpecialKeyTuple = deterministicRandom()->random01() < 0.05;
|
||||
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
for (auto [config, value] : configuration) {
|
||||
for (auto const& [config, value] : configParameters) {
|
||||
Tuple t;
|
||||
if (hasInvalidSpecialKeyTuple) {
|
||||
// Wrong number of items
|
||||
|
@ -1142,12 +1128,64 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
wait(tr->commit());
|
||||
|
||||
ASSERT(exists);
|
||||
ASSERT(!hasInvalidOption);
|
||||
ASSERT(!hasInvalidSpecialKeyTuple);
|
||||
return true;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_special_keys_api_failure && hasInvalidSpecialKeyTuple) {
|
||||
return false;
|
||||
}
|
||||
|
||||
self->createdTenants[tenant].tenantGroup = newTenantGroup;
|
||||
throw;
|
||||
}
|
||||
} else if (operationType == OperationType::METACLUSTER) {
|
||||
wait(MetaclusterAPI::configureTenant(self->mvDb, tenant, configParameters));
|
||||
return true;
|
||||
} else {
|
||||
// We don't have a transaction or database variant of this function
|
||||
ASSERT(false);
|
||||
throw internal_error();
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> configureTenant(TenantManagementWorkload* self) {
|
||||
state OperationType operationType =
|
||||
deterministicRandom()->coinflip() ? OperationType::SPECIAL_KEYS : OperationType::METACLUSTER;
|
||||
|
||||
state TenantName tenant = self->chooseTenantName(true);
|
||||
auto itr = self->createdTenants.find(tenant);
|
||||
state bool exists = itr != self->createdTenants.end();
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->dataDb);
|
||||
|
||||
state std::map<Standalone<StringRef>, Optional<Value>> configuration;
|
||||
state Optional<TenantGroupName> newTenantGroup;
|
||||
state bool hasInvalidOption = deterministicRandom()->random01() < 0.1;
|
||||
|
||||
state bool hasInvalidTenantGroupChange = false;
|
||||
|
||||
if (!hasInvalidOption || deterministicRandom()->coinflip()) {
|
||||
newTenantGroup = self->chooseTenantGroup();
|
||||
configuration["tenant_group"_sr] = newTenantGroup;
|
||||
|
||||
// We cannot currently change tenant groups in a metacluster
|
||||
if (operationType == OperationType::METACLUSTER && exists && newTenantGroup.present() &&
|
||||
newTenantGroup != itr->second.tenantGroup) {
|
||||
hasInvalidTenantGroupChange = true;
|
||||
}
|
||||
}
|
||||
if (hasInvalidOption) {
|
||||
configuration["invalid_option"_sr] = ""_sr;
|
||||
}
|
||||
|
||||
loop {
|
||||
try {
|
||||
bool configured = wait(configureImpl(tr, tenant, configuration, operationType, self));
|
||||
|
||||
if (configured) {
|
||||
ASSERT(exists);
|
||||
ASSERT(!hasInvalidOption);
|
||||
ASSERT(!hasInvalidTenantGroupChange);
|
||||
self->createdTenants[tenant].tenantGroup = newTenantGroup;
|
||||
}
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
state Error error = e;
|
||||
|
@ -1155,7 +1193,16 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
ASSERT(!exists);
|
||||
return Void();
|
||||
} else if (e.code() == error_code_special_keys_api_failure) {
|
||||
ASSERT(hasInvalidSpecialKeyTuple || hasInvalidOption);
|
||||
ASSERT(hasInvalidOption);
|
||||
return Void();
|
||||
} else if (e.code() == error_code_invalid_tenant_configuration) {
|
||||
ASSERT(hasInvalidOption || hasInvalidTenantGroupChange);
|
||||
return Void();
|
||||
} else if (e.code() == error_code_tenants_disabled) {
|
||||
ASSERT((operationType == OperationType::METACLUSTER) != self->useMetacluster);
|
||||
return Void();
|
||||
} else if (e.code() == error_code_invalid_metacluster_operation) {
|
||||
ASSERT(operationType == OperationType::METACLUSTER && !self->useMetacluster);
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -1177,18 +1224,18 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
while (now() < start + self->testDuration) {
|
||||
state int operation = deterministicRandom()->randomInt(0, 6);
|
||||
if (operation == 0) {
|
||||
wait(self->createTenant(cx, self));
|
||||
wait(createTenant(self));
|
||||
} else if (operation == 1) {
|
||||
wait(self->deleteTenant(cx, self));
|
||||
wait(deleteTenant(self));
|
||||
} else if (operation == 2) {
|
||||
wait(self->getTenant(cx, self));
|
||||
wait(getTenant(self));
|
||||
} else if (operation == 3) {
|
||||
wait(self->listTenants(cx, self));
|
||||
wait(listTenants(self));
|
||||
} else if (operation == 4 && !self->useMetacluster) {
|
||||
// TODO: reenable this for metacluster once it is supported
|
||||
wait(self->renameTenant(cx, self));
|
||||
wait(renameTenant(self));
|
||||
} else if (operation == 5) {
|
||||
wait(self->configureTenant(cx, self));
|
||||
wait(configureTenant(self));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1247,7 +1294,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
ASSERT(managementItr->second.tenantGroup == localItr->second.tenantGroup);
|
||||
ASSERT(managementItr->second.matchesConfiguration(dataItr->second));
|
||||
|
||||
checkTenants.push_back(self->checkTenantContents(self, managementItr->first, localItr->second));
|
||||
checkTenants.push_back(checkTenantContents(self, managementItr->first, localItr->second));
|
||||
lastTenant = managementItr->first;
|
||||
|
||||
++localItr;
|
||||
|
|
|
@ -235,6 +235,8 @@ ERROR( unknown_tenant, 2137, "Tenant is not available from this server" )
|
|||
ERROR( illegal_tenant_access, 2138, "Illegal tenant access" )
|
||||
ERROR( invalid_tenant_group_name, 2139, "Tenant group name cannot begin with \\xff" )
|
||||
ERROR( tenant_removed, 2140, "The tenant was removed" )
|
||||
ERROR( invalid_tenant_configuration, 2141, "Tenant configuration is invalid" )
|
||||
ERROR( invalid_tenant_state, 2142, "Operation cannot be applied to tenant in its current state" )
|
||||
|
||||
ERROR( invalid_cluster_name, 2150, "Data cluster name cannot begin with \\xff" )
|
||||
ERROR( invalid_metacluster_operation, 2151, "Metacluster operation performed on non-metacluster" )
|
||||
|
|
Loading…
Reference in New Issue