Start some work toward simulating a metacluster. Fix a few bugs found by the initial tests.
This commit is contained in:
parent
27aeb0fe48
commit
38b90910d6
|
@ -84,6 +84,7 @@ set(FDBCLIENT_SRCS
|
|||
ManagementAPI.actor.h
|
||||
Metacluster.cpp
|
||||
Metacluster.h
|
||||
MetaclusterManagement.actor.cpp
|
||||
MetaclusterManagement.actor.h
|
||||
MonitorLeader.actor.cpp
|
||||
MonitorLeader.h
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* MetaclusterManagement.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/MetaclusterManagement.actor.h"
|
||||
#include "fdbclient/ThreadSafeTransaction.h"
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
namespace MetaclusterAPI {
|
||||
|
||||
ACTOR Future<Reference<IDatabase>> openDatabase(ClusterConnectionString connectionString) {
|
||||
Reference<IClusterConnectionRecord> clusterFile = makeReference<ClusterConnectionMemoryRecord>(connectionString);
|
||||
if (g_network->isSimulated()) {
|
||||
Database nativeDb = Database::createDatabase(clusterFile, -1);
|
||||
Reference<IDatabase> threadSafeDb =
|
||||
wait(unsafeThreadFutureToFuture(ThreadSafeDatabase::createFromExistingDatabase(nativeDb)));
|
||||
return MultiVersionDatabase::debugCreateFromExistingDatabase(threadSafeDb);
|
||||
} else {
|
||||
return MultiVersionApi::api->createDatabase(clusterFile);
|
||||
}
|
||||
}
|
||||
}; // namespace MetaclusterAPI
|
|
@ -85,6 +85,8 @@ FDB_DECLARE_BOOLEAN_PARAM(RemoveMissingTenants);
|
|||
|
||||
namespace MetaclusterAPI {
|
||||
|
||||
ACTOR Future<Reference<IDatabase>> openDatabase(ClusterConnectionString connectionString);
|
||||
|
||||
ACTOR template <class Transaction>
|
||||
Future<Optional<DataClusterMetadata>> tryGetClusterTransaction(Transaction tr, ClusterNameRef name) {
|
||||
state Key dataClusterMetadataKey = name.withPrefix(dataClusterMetadataPrefix);
|
||||
|
@ -293,16 +295,7 @@ Future<Void> registerCluster(Reference<DB> db,
|
|||
wait(managementClusterRegisterPrecheck(precheckTr, name, Optional<DataClusterMetadata>()));
|
||||
metaclusterId = result.first;
|
||||
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
|
||||
wait(safeThreadFutureToFuture(precheckTr->commit()));
|
||||
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
|
||||
wait(buggifiedCommit(precheckTr, BUGGIFY));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(safeThreadFutureToFuture(precheckTr->onError(e)));
|
||||
|
@ -310,9 +303,7 @@ Future<Void> registerCluster(Reference<DB> db,
|
|||
}
|
||||
|
||||
// Step 2: Configure the data cluster as a subordinate cluster
|
||||
state Reference<IDatabase> dataClusterDb =
|
||||
MultiVersionApi::api->createDatabase(makeReference<ClusterConnectionMemoryRecord>(connectionString));
|
||||
|
||||
state Reference<IDatabase> dataClusterDb = wait(openDatabase(connectionString));
|
||||
state Reference<ITransaction> dataClusterTr = dataClusterDb->createTransaction();
|
||||
loop {
|
||||
try {
|
||||
|
@ -320,15 +311,7 @@ Future<Void> registerCluster(Reference<DB> db,
|
|||
UID clusterId = wait(dataClusterRegister(dataClusterTr, name, metaclusterId));
|
||||
entry.id = clusterId;
|
||||
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
|
||||
wait(safeThreadFutureToFuture(dataClusterTr->commit()));
|
||||
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
wait(buggifiedCommit(dataClusterTr, BUGGIFY));
|
||||
|
||||
TraceEvent("ConfiguredDataCluster")
|
||||
.detail("ClusterName", name)
|
||||
|
@ -349,16 +332,7 @@ Future<Void> registerCluster(Reference<DB> db,
|
|||
try {
|
||||
registerTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
wait(managementClusterRegister(registerTr, name, connectionString, entry));
|
||||
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
|
||||
wait(safeThreadFutureToFuture(registerTr->commit()));
|
||||
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
wait(buggifiedCommit(registerTr, BUGGIFY));
|
||||
|
||||
TraceEvent("RegisteredDataCluster")
|
||||
.detail("ClusterName", name)
|
||||
|
@ -403,15 +377,7 @@ Future<Void> restoreCluster(Reference<DB> db,
|
|||
state Optional<DataClusterEntry> newCluster =
|
||||
wait(restoreCluster(tr, name, connectionString, entry, addNewTenants, removeMissingTenants));
|
||||
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
|
||||
wait(safeThreadFutureToFuture(tr->commit()));
|
||||
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
wait(buggifiedCommit(tr, BUGGIFY));
|
||||
|
||||
TraceEvent("RestoredDataCluster")
|
||||
.detail("ClusterName", name)
|
||||
|
@ -469,16 +435,7 @@ Future<Void> removeCluster(Reference<DB> db, ClusterName name, bool forceRemove)
|
|||
}
|
||||
|
||||
metadata = _metadata.get();
|
||||
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
|
||||
wait(safeThreadFutureToFuture(tr->commit()));
|
||||
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
wait(buggifiedCommit(tr, BUGGIFY));
|
||||
|
||||
TraceEvent("RemovedDataCluster").detail("Name", name).detail("Version", tr->getCommittedVersion());
|
||||
break;
|
||||
|
@ -489,25 +446,14 @@ Future<Void> removeCluster(Reference<DB> db, ClusterName name, bool forceRemove)
|
|||
|
||||
// Step 2: Reconfigure data cluster and remove metadata.
|
||||
// Note that this is best effort; if it fails the cluster will still have been removed.
|
||||
state Reference<IDatabase> dataClusterDb =
|
||||
MultiVersionApi::api->createDatabase(makeReference<ClusterConnectionMemoryRecord>(metadata.connectionString));
|
||||
|
||||
state Reference<IDatabase> dataClusterDb = wait(openDatabase(metadata.connectionString));
|
||||
state Reference<ITransaction> dataClusterTr = dataClusterDb->createTransaction();
|
||||
loop {
|
||||
try {
|
||||
dataClusterTr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
||||
wait(dataClusterRemove(dataClusterTr));
|
||||
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
|
||||
wait(safeThreadFutureToFuture(dataClusterTr->commit()));
|
||||
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
wait(buggifiedCommit(dataClusterTr, BUGGIFY));
|
||||
|
||||
TraceEvent("ReconfiguredDataCluster")
|
||||
.detail("Name", name)
|
||||
|
@ -671,7 +617,8 @@ Future<Void> createTenant(Reference<DB> db, TenantName name, TenantMapEntry tena
|
|||
createdTenant = result.first;
|
||||
|
||||
if (!result.second) {
|
||||
if (!result.first.matchesConfiguration(tenantEntry)) {
|
||||
if (!result.first.matchesConfiguration(tenantEntry) ||
|
||||
result.first.tenantState != TenantState::REGISTERING) {
|
||||
throw tenant_already_exists();
|
||||
} else if (tenantEntry.assignedCluster != createdTenant.assignedCluster) {
|
||||
if (!result.first.assignedCluster.present()) {
|
||||
|
@ -687,7 +634,7 @@ Future<Void> createTenant(Reference<DB> db, TenantName name, TenantMapEntry tena
|
|||
clusterMetadata = actualMetadata.get();
|
||||
}
|
||||
} else {
|
||||
wait(safeThreadFutureToFuture(assignTr->commit()));
|
||||
wait(buggifiedCommit(assignTr, BUGGIFY));
|
||||
}
|
||||
|
||||
break;
|
||||
|
@ -697,9 +644,7 @@ Future<Void> createTenant(Reference<DB> db, TenantName name, TenantMapEntry tena
|
|||
}
|
||||
|
||||
// Step 2: store the tenant info in the data cluster
|
||||
state Reference<IDatabase> dataClusterDb = MultiVersionApi::api->createDatabase(
|
||||
makeReference<ClusterConnectionMemoryRecord>(clusterMetadata.connectionString));
|
||||
|
||||
state Reference<IDatabase> dataClusterDb = wait(openDatabase(clusterMetadata.connectionString));
|
||||
TenantMapEntry _ = wait(ManagementAPI::createTenant(
|
||||
dataClusterDb, name, createdTenant, ManagementAPI::TenantOperationType::DATA_CLUSTER));
|
||||
|
||||
|
@ -719,7 +664,7 @@ Future<Void> createTenant(Reference<DB> db, TenantName name, TenantMapEntry tena
|
|||
TenantMapEntry updatedEntry = managementEntry.get();
|
||||
updatedEntry.tenantState = TenantState::READY;
|
||||
ManagementAPI::configureTenantTransaction(finalizeTr, name, updatedEntry);
|
||||
wait(safeThreadFutureToFuture(finalizeTr->commit()));
|
||||
wait(buggifiedCommit(finalizeTr, BUGGIFY));
|
||||
}
|
||||
|
||||
break;
|
||||
|
@ -735,7 +680,6 @@ ACTOR template <class DB>
|
|||
Future<Void> deleteTenant(Reference<DB> db, TenantName name) {
|
||||
state int64_t tenantId;
|
||||
state DataClusterMetadata clusterMetadata;
|
||||
state Reference<IDatabase> dataClusterDb;
|
||||
state bool alreadyRemoving = false;
|
||||
|
||||
// Step 1: get the assigned location of the tenant
|
||||
|
@ -768,7 +712,7 @@ Future<Void> deleteTenant(Reference<DB> db, TenantName name) {
|
|||
ManagementAPI::configureTenantTransaction(managementTr, name, updatedEntry);
|
||||
wait(ManagementAPI::deleteTenantTransaction(
|
||||
managementTr, name, ManagementAPI::TenantOperationType::MANAGEMENT_CLUSTER));
|
||||
wait(safeThreadFutureToFuture(managementTr->commit()));
|
||||
wait(buggifiedCommit(managementTr, BUGGIFY));
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -778,8 +722,7 @@ Future<Void> deleteTenant(Reference<DB> db, TenantName name) {
|
|||
}
|
||||
}
|
||||
|
||||
dataClusterDb = MultiVersionApi::api->createDatabase(
|
||||
makeReference<ClusterConnectionMemoryRecord>(clusterMetadata.connectionString));
|
||||
state Reference<IDatabase> dataClusterDb = wait(openDatabase(clusterMetadata.connectionString));
|
||||
|
||||
if (!alreadyRemoving) {
|
||||
// Step 2: check that the tenant is empty
|
||||
|
@ -814,7 +757,7 @@ Future<Void> deleteTenant(Reference<DB> db, TenantName name) {
|
|||
TenantMapEntry updatedEntry = tenantEntry2.get();
|
||||
updatedEntry.tenantState = TenantState::REMOVING;
|
||||
ManagementAPI::configureTenantTransaction(managementTr, name, updatedEntry);
|
||||
wait(safeThreadFutureToFuture(managementTr->commit()));
|
||||
wait(buggifiedCommit(managementTr, BUGGIFY));
|
||||
}
|
||||
|
||||
break;
|
||||
|
@ -876,7 +819,7 @@ Future<Void> deleteTenant(Reference<DB> db, TenantName name) {
|
|||
}
|
||||
}
|
||||
|
||||
wait(safeThreadFutureToFuture(managementTr->commit()));
|
||||
wait(buggifiedCommit(managementTr, BUGGIFY));
|
||||
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
|
|
|
@ -189,33 +189,25 @@ Future<TenantMapEntry> createTenant(Reference<DB> db,
|
|||
TenantOperationType operationType = TenantOperationType::STANDALONE_CLUSTER) {
|
||||
state Reference<typename DB::TransactionT> tr = db->createTransaction();
|
||||
|
||||
state bool firstTry = true;
|
||||
state bool checkExistence = operationType != TenantOperationType::DATA_CLUSTER;
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
|
||||
if (firstTry) {
|
||||
if (checkExistence) {
|
||||
Optional<TenantMapEntry> entry = wait(tryGetTenantTransaction(tr, name));
|
||||
if (entry.present()) {
|
||||
throw tenant_already_exists();
|
||||
}
|
||||
|
||||
firstTry = false;
|
||||
checkExistence = false;
|
||||
}
|
||||
|
||||
state std::pair<TenantMapEntry, bool> newTenant =
|
||||
wait(createTenantTransaction(tr, name, tenantEntry, operationType));
|
||||
|
||||
if (newTenant.second) {
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
|
||||
wait(safeThreadFutureToFuture(tr->commit()));
|
||||
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
wait(buggifiedCommit(tr, BUGGIFY));
|
||||
|
||||
TraceEvent("CreatedTenant")
|
||||
.detail("Tenant", name)
|
||||
|
@ -245,14 +237,7 @@ Future<Void> deleteTenantTransaction(Transaction tr,
|
|||
tr->get(configKeysPrefix.withSuffix("tenant_mode"_sr));
|
||||
state typename transaction_future_type<Transaction, Optional<Value>>::type metaclusterRegistrationFuture =
|
||||
tr->get(dataClusterRegistrationKey);
|
||||
|
||||
state Optional<TenantMapEntry> tenantEntry = wait(tryGetTenantTransaction(tr, name));
|
||||
if (!tenantEntry.present()) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
state typename transaction_future_type<Transaction, RangeResult>::type prefixRangeFuture =
|
||||
tr->getRange(prefixRange(tenantEntry.get().prefix), 1);
|
||||
state Future<Optional<TenantMapEntry>> tenantEntryFuture = tryGetTenantTransaction(tr, name);
|
||||
|
||||
state Optional<Value> tenantMode = wait(safeThreadFutureToFuture(tenantModeFuture));
|
||||
Optional<Value> metaclusterRegistration = wait(safeThreadFutureToFuture(metaclusterRegistrationFuture));
|
||||
|
@ -261,12 +246,20 @@ Future<Void> deleteTenantTransaction(Transaction tr,
|
|||
throw tenants_disabled();
|
||||
}
|
||||
|
||||
state Optional<TenantMapEntry> tenantEntry = wait(tenantEntryFuture);
|
||||
if (!tenantEntry.present()) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
if (operationType == TenantOperationType::MANAGEMENT_CLUSTER &&
|
||||
tenantEntry.get().tenantState != TenantState::REMOVING) {
|
||||
// TODO: better error
|
||||
throw operation_failed();
|
||||
}
|
||||
|
||||
state typename transaction_future_type<Transaction, RangeResult>::type prefixRangeFuture =
|
||||
tr->getRange(prefixRange(tenantEntry.get().prefix), 1);
|
||||
|
||||
RangeResult contents = wait(safeThreadFutureToFuture(prefixRangeFuture));
|
||||
if (!contents.empty()) {
|
||||
throw tenant_not_empty();
|
||||
|
@ -301,16 +294,7 @@ Future<Void> deleteTenant(Reference<DB> db,
|
|||
}
|
||||
|
||||
wait(deleteTenantTransaction(tr, name, operationType));
|
||||
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
|
||||
wait(safeThreadFutureToFuture(tr->commit()));
|
||||
|
||||
if (BUGGIFY) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
wait(buggifiedCommit(tr, BUGGIFY));
|
||||
|
||||
TraceEvent("DeletedTenant").detail("Tenant", name).detail("Version", tr->getCommittedVersion());
|
||||
return Void();
|
||||
|
|
|
@ -124,9 +124,21 @@ Future<RangeResult> TenantRangeImpl::getRange(ReadYourWritesTransaction* ryw,
|
|||
}
|
||||
|
||||
ACTOR Future<Void> deleteTenantRange(ReadYourWritesTransaction* ryw, TenantName beginTenant, TenantName endTenant) {
|
||||
std::map<TenantName, TenantMapEntry> tenants = wait(
|
||||
state Future<Optional<Value>> tenantModeFuture =
|
||||
ryw->getTransaction().get(configKeysPrefix.withSuffix("tenant_mode"_sr));
|
||||
state Future<Optional<Value>> metaclusterRegistrationFuture = ryw->getTransaction().get(dataClusterRegistrationKey);
|
||||
|
||||
state std::map<TenantName, TenantMapEntry> tenants = wait(
|
||||
ManagementAPI::listTenantsTransaction(&ryw->getTransaction(), beginTenant, endTenant, CLIENT_KNOBS->TOO_MANY));
|
||||
|
||||
state Optional<Value> tenantMode = wait(safeThreadFutureToFuture(tenantModeFuture));
|
||||
Optional<Value> metaclusterRegistration = wait(safeThreadFutureToFuture(metaclusterRegistrationFuture));
|
||||
|
||||
if (!checkTenantMode(
|
||||
tenantMode, metaclusterRegistration.present(), ManagementAPI::TenantOperationType::STANDALONE_CLUSTER)) {
|
||||
throw tenants_disabled();
|
||||
}
|
||||
|
||||
if (tenants.size() == CLIENT_KNOBS->TOO_MANY) {
|
||||
TraceEvent(SevWarn, "DeleteTenantRangeTooLange")
|
||||
.detail("BeginTenant", beginTenant)
|
||||
|
|
|
@ -22,6 +22,9 @@
|
|||
#include <limits>
|
||||
#include "fdbclient/FDBOptions.g.h"
|
||||
#include "fdbclient/GenericManagementAPI.actor.h"
|
||||
#include "fdbclient/MetaclusterManagement.actor.h"
|
||||
#include "fdbclient/TenantManagement.actor.h"
|
||||
#include "fdbclient/ThreadSafeTransaction.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
|
@ -31,17 +34,17 @@
|
|||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
struct TenantManagementWorkload : TestWorkload {
|
||||
struct TenantState {
|
||||
struct TenantData {
|
||||
int64_t id;
|
||||
Optional<TenantGroupName> tenantGroup;
|
||||
bool empty;
|
||||
|
||||
TenantState() : id(-1), empty(true) {}
|
||||
TenantState(int64_t id, Optional<TenantGroupName> tenantGroup, bool empty)
|
||||
TenantData() : id(-1), empty(true) {}
|
||||
TenantData(int64_t id, Optional<TenantGroupName> tenantGroup, bool empty)
|
||||
: id(id), tenantGroup(tenantGroup), empty(empty) {}
|
||||
};
|
||||
|
||||
std::map<TenantName, TenantState> createdTenants;
|
||||
std::map<TenantName, TenantData> createdTenants;
|
||||
int64_t maxId = -1;
|
||||
Key tenantSubspace;
|
||||
|
||||
|
@ -61,17 +64,31 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
int maxTenants;
|
||||
int maxTenantGroups;
|
||||
double testDuration;
|
||||
bool useMetacluster;
|
||||
|
||||
enum class OperationType { SPECIAL_KEYS, MANAGEMENT_DATABASE, MANAGEMENT_TRANSACTION };
|
||||
Reference<IDatabase> mvDb;
|
||||
Database dataDb;
|
||||
|
||||
static OperationType randomOperationType() {
|
||||
int randomNum = deterministicRandom()->randomInt(0, 3);
|
||||
if (randomNum == 0) {
|
||||
return OperationType::SPECIAL_KEYS;
|
||||
} else if (randomNum == 1) {
|
||||
return OperationType::MANAGEMENT_DATABASE;
|
||||
// This test exercises multiple different ways to work with tenants
|
||||
enum class OperationType {
|
||||
// Use the special key-space APIs
|
||||
SPECIAL_KEYS,
|
||||
// Use the ManagementAPI functions that take a Database object and implement a retry loop
|
||||
MANAGEMENT_DATABASE,
|
||||
// Use the ManagementAPI functions that take a Transaction object
|
||||
MANAGEMENT_TRANSACTION,
|
||||
// Use the Metacluster API, if applicable. Note: not all APIs have a metacluster variant,
|
||||
// and if there isn't one this will choose one of the other options.
|
||||
METACLUSTER
|
||||
};
|
||||
|
||||
OperationType randomOperationType() {
|
||||
double metaclusterProb = useMetacluster ? 0.9 : 0.1;
|
||||
|
||||
if (deterministicRandom()->random01() < metaclusterProb) {
|
||||
return OperationType::METACLUSTER;
|
||||
} else {
|
||||
return OperationType::MANAGEMENT_TRANSACTION;
|
||||
return (OperationType)deterministicRandom()->randomInt(0, 3);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -81,14 +98,42 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
testDuration = getOption(options, "testDuration"_sr, 60.0);
|
||||
|
||||
localTenantNamePrefix = format("%stenant_%d_", tenantNamePrefix.toString().c_str(), clientId);
|
||||
useMetacluster = false;
|
||||
}
|
||||
|
||||
std::string description() const override { return "TenantManagement"; }
|
||||
|
||||
Future<Void> setup(Database const& cx) override { return _setup(cx, this); }
|
||||
ACTOR Future<Void> _setup(Database cx, TenantManagementWorkload* self) {
|
||||
state Transaction tr(cx);
|
||||
Reference<IDatabase> threadSafeHandle =
|
||||
wait(unsafeThreadFutureToFuture(ThreadSafeDatabase::createFromExistingDatabase(cx)));
|
||||
TraceEvent("CreatedThreadSafeHandle");
|
||||
|
||||
MultiVersionApi::api->selectApiVersion(cx->apiVersion);
|
||||
self->mvDb = MultiVersionDatabase::debugCreateFromExistingDatabase(threadSafeHandle);
|
||||
|
||||
if (self->useMetacluster) {
|
||||
|
||||
auto extraFile = makeReference<ClusterConnectionMemoryRecord>(*g_simulator.extraDB);
|
||||
self->dataDb = Database::createDatabase(extraFile, -1);
|
||||
|
||||
if (self->clientId == 0) {
|
||||
wait(success(ManagementAPI::changeConfig(cx.getReference(), "tenant_mode=management", true)));
|
||||
|
||||
DataClusterEntry entry;
|
||||
entry.capacity.numTenantGroups = 1e9;
|
||||
wait(MetaclusterAPI::registerCluster(self->mvDb, "cluster1"_sr, *g_simulator.extraDB, entry));
|
||||
}
|
||||
} else {
|
||||
self->dataDb = cx;
|
||||
}
|
||||
state Transaction tr(self->dataDb);
|
||||
if (self->clientId == 0) {
|
||||
// Configure the tenant subspace prefix that is applied to all tenants
|
||||
// This feature isn't supported in a metacluster, so we skip it if doing a metacluster test.
|
||||
if (self->useMetacluster) {
|
||||
self->tenantSubspace = ""_sr;
|
||||
} else {
|
||||
self->tenantSubspace = makeString(deterministicRandom()->randomInt(0, 10));
|
||||
loop {
|
||||
generateRandomData(mutateString(self->tenantSubspace), self->tenantSubspace.size());
|
||||
|
@ -96,6 +141,10 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set a key outside of all tenants to make sure that our tenants aren't writing to the regular key-space
|
||||
// Also communicates the chosen tenant subspace to all other clients by storing it in a key
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::RAW_ACCESS);
|
||||
|
@ -109,6 +158,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
// Read the tenant subspace chosen and saved by client 0
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::RAW_ACCESS);
|
||||
|
@ -119,6 +169,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
wait(delay(1.0));
|
||||
tr.reset();
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
|
@ -148,56 +199,172 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
return tenantGroup;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> createTenant(Database cx, TenantManagementWorkload* self) {
|
||||
state TenantName tenant = self->chooseTenantName(true);
|
||||
state Optional<TenantGroupName> tenantGroup = self->chooseTenantGroup();
|
||||
state bool alreadyExists = self->createdTenants.count(tenant);
|
||||
state OperationType operationType = TenantManagementWorkload::randomOperationType();
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
|
||||
|
||||
loop {
|
||||
try {
|
||||
// Creates tenant(s) using the specified operation type
|
||||
ACTOR Future<Void> createImpl(Database cx,
|
||||
Reference<ReadYourWritesTransaction> tr,
|
||||
std::map<TenantName, TenantMapEntry> tenantsToCreate,
|
||||
OperationType operationType,
|
||||
TenantManagementWorkload* self) {
|
||||
if (operationType == OperationType::SPECIAL_KEYS) {
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
for (auto [tenant, entry] : tenantsToCreate) {
|
||||
tr->set(self->specialKeysTenantMapPrefix.withSuffix(tenant), ""_sr);
|
||||
if (tenantGroup.present()) {
|
||||
if (entry.tenantGroup.present()) {
|
||||
tr->set(self->specialKeysTenantConfigPrefix.withSuffix("tenant_group/"_sr).withSuffix(tenant),
|
||||
tenantGroup.get());
|
||||
entry.tenantGroup.get());
|
||||
}
|
||||
}
|
||||
wait(tr->commit());
|
||||
} else if (operationType == OperationType::MANAGEMENT_DATABASE) {
|
||||
TenantMapEntry entry;
|
||||
entry.tenantGroup = tenantGroup;
|
||||
TenantMapEntry result = wait(ManagementAPI::createTenant(cx.getReference(), tenant, entry));
|
||||
} else {
|
||||
ASSERT(tenantsToCreate.size() == 1);
|
||||
TenantMapEntry result = wait(ManagementAPI::createTenant(
|
||||
self->dataDb.getReference(), tenantsToCreate.begin()->first, tenantsToCreate.begin()->second));
|
||||
} else if (operationType == OperationType::MANAGEMENT_TRANSACTION) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
TenantMapEntry entry;
|
||||
entry.tenantGroup = tenantGroup;
|
||||
std::pair<TenantMapEntry, bool> _ = wait(ManagementAPI::createTenantTransaction(tr, tenant, entry));
|
||||
std::vector<Future<Void>> createFutures;
|
||||
for (auto [tenant, entry] : tenantsToCreate) {
|
||||
createFutures.push_back(success(ManagementAPI::createTenantTransaction(tr, tenant, entry)));
|
||||
}
|
||||
wait(waitForAll(createFutures));
|
||||
wait(tr->commit());
|
||||
} else {
|
||||
ASSERT(tenantsToCreate.size() == 1);
|
||||
wait(MetaclusterAPI::createTenant(
|
||||
self->mvDb, tenantsToCreate.begin()->first, tenantsToCreate.begin()->second));
|
||||
TraceEvent("MetaclusterCreatedTenant");
|
||||
}
|
||||
}
|
||||
|
||||
if (operationType != OperationType::MANAGEMENT_DATABASE && alreadyExists) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
ASSERT(!alreadyExists);
|
||||
ASSERT(!tenant.startsWith("\xff"_sr));
|
||||
ACTOR Future<Void> createTenant(Database cx, TenantManagementWorkload* self) {
|
||||
state OperationType operationType = self->randomOperationType();
|
||||
int numTenants = 1;
|
||||
|
||||
// For transaction-based operations, test creating multiple tenants in the same transaction
|
||||
/*if (operationType == OperationType::SPECIAL_KEYS || operationType == OperationType::MANAGEMENT_TRANSACTION) {
|
||||
numTenants = deterministicRandom()->randomInt(1, 5);
|
||||
}*/
|
||||
|
||||
// Tracks whether any tenant exists in the database or not. This variable is updated if we have to retry
|
||||
// the creation.
|
||||
state bool alreadyExists = false;
|
||||
|
||||
// True if any tenant name starts with \xff
|
||||
state bool hasSystemTenant = false;
|
||||
|
||||
state std::map<TenantName, TenantMapEntry> tenantsToCreate;
|
||||
for (int i = 0; i < numTenants; ++i) {
|
||||
TenantName tenant = self->chooseTenantName(true);
|
||||
TenantMapEntry entry;
|
||||
entry.tenantGroup = self->chooseTenantGroup();
|
||||
tenantsToCreate[tenant] = entry;
|
||||
|
||||
alreadyExists = alreadyExists || self->createdTenants.count(tenant);
|
||||
hasSystemTenant = tenant.startsWith("\xff"_sr);
|
||||
}
|
||||
|
||||
// If any tenant existed at the start of this function, then we expect the creation to fail or be a no-op,
|
||||
// depending on the type of create operation being executed
|
||||
state bool existedAtStart = alreadyExists;
|
||||
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->dataDb);
|
||||
|
||||
loop {
|
||||
try {
|
||||
// First, attempt to create the tenants
|
||||
loop {
|
||||
try {
|
||||
Optional<Void> result =
|
||||
wait(timeout(self->createImpl(cx, tr, tenantsToCreate, operationType, self), 30));
|
||||
|
||||
if (result.present()) {
|
||||
break;
|
||||
}
|
||||
} catch (Error& e) {
|
||||
// If we retried the creation after our initial attempt succeeded, then we proceed with the rest
|
||||
// of the creation steps normally. Otherwise, the creation happened elsewhere and we failed
|
||||
// here, so we can rethrow the error.
|
||||
if (e.code() == error_code_tenant_already_exists && !existedAtStart) {
|
||||
ASSERT(operationType == OperationType::METACLUSTER ||
|
||||
operationType == OperationType::MANAGEMENT_DATABASE);
|
||||
ASSERT(alreadyExists);
|
||||
break;
|
||||
} else {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
// Check the state of the first created tenant
|
||||
Optional<TenantMapEntry> resultEntry =
|
||||
wait(ManagementAPI::tryGetTenant(cx.getReference(), tenantsToCreate.begin()->first));
|
||||
|
||||
if (resultEntry.present()) {
|
||||
if (resultEntry.get().tenantState == TenantState::READY) {
|
||||
// The tenant now exists, so we will retry and expect the creation to react accordingly
|
||||
alreadyExists = true;
|
||||
} else {
|
||||
// Only a metacluster tenant creation can end up in a partially created state
|
||||
// We should be able to retry and pick up where we left off
|
||||
ASSERT(operationType == OperationType::METACLUSTER);
|
||||
ASSERT(resultEntry.get().tenantState == TenantState::REGISTERING);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check that using the wrong creation type fails depending on whether we are using a metacluster
|
||||
ASSERT(self->useMetacluster == (operationType == OperationType::METACLUSTER));
|
||||
|
||||
// Transaction-based creation modes will not fail if the tenant already existed, so we can just return
|
||||
// instead.
|
||||
if ((operationType == OperationType::MANAGEMENT_TRANSACTION ||
|
||||
operationType == OperationType::SPECIAL_KEYS) &&
|
||||
existedAtStart) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
ASSERT(!existedAtStart);
|
||||
|
||||
// It is not legal to create a tenant starting with \xff
|
||||
ASSERT(!hasSystemTenant);
|
||||
|
||||
state std::map<TenantName, TenantMapEntry>::iterator tenantItr;
|
||||
for (tenantItr = tenantsToCreate.begin(); tenantItr != tenantsToCreate.end(); ++tenantItr) {
|
||||
// Read the created tenant object and verify that its state is correct
|
||||
state Optional<TenantMapEntry> entry =
|
||||
wait(ManagementAPI::tryGetTenant(cx.getReference(), tenantItr->first));
|
||||
|
||||
state Optional<TenantMapEntry> entry = wait(ManagementAPI::tryGetTenant(cx.getReference(), tenant));
|
||||
ASSERT(entry.present());
|
||||
ASSERT(entry.get().id > self->maxId);
|
||||
ASSERT(entry.get().prefix.startsWith(self->tenantSubspace));
|
||||
ASSERT(entry.get().tenantGroup == tenantItr->second.tenantGroup);
|
||||
ASSERT(entry.get().tenantState == TenantState::READY);
|
||||
|
||||
if (self->useMetacluster) {
|
||||
// In a metacluster, we should also see that the tenant was created on the data cluster
|
||||
Optional<TenantMapEntry> dataEntry =
|
||||
wait(ManagementAPI::tryGetTenant(self->dataDb.getReference(), tenantItr->first));
|
||||
ASSERT(dataEntry.present());
|
||||
ASSERT(dataEntry.get().id == entry.get().id);
|
||||
ASSERT(dataEntry.get().prefix.size() == 8);
|
||||
ASSERT(dataEntry.get().tenantGroup == entry.get().tenantGroup);
|
||||
ASSERT(dataEntry.get().tenantState == TenantState::READY);
|
||||
}
|
||||
|
||||
// Update our local tenant state to include the newly created one
|
||||
self->maxId = entry.get().id;
|
||||
self->createdTenants[tenant] = TenantState(entry.get().id, tenantGroup, true);
|
||||
self->createdTenants[tenantItr->first] =
|
||||
TenantData(entry.get().id, tenantItr->second.tenantGroup, true);
|
||||
|
||||
// Randomly decide to insert a key into the tenant
|
||||
state bool insertData = deterministicRandom()->random01() < 0.5;
|
||||
if (insertData) {
|
||||
state Transaction insertTr(cx, tenant);
|
||||
state Transaction insertTr(self->dataDb, tenantItr->first);
|
||||
loop {
|
||||
try {
|
||||
insertTr.set(self->keyName, tenant);
|
||||
// The value stored in the key will be the name of the tenant
|
||||
insertTr.set(self->keyName, tenantItr->first);
|
||||
wait(insertTr.commit());
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
|
@ -205,15 +372,17 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
self->createdTenants[tenant].empty = false;
|
||||
self->createdTenants[tenantItr->first].empty = false;
|
||||
|
||||
state Transaction checkTr(cx);
|
||||
// Make sure that the key inserted correctly concatenates the tenant prefix with the relative
|
||||
// key
|
||||
state Transaction checkTr(self->dataDb);
|
||||
loop {
|
||||
try {
|
||||
checkTr.setOption(FDBTransactionOptions::RAW_ACCESS);
|
||||
Optional<Value> val = wait(checkTr.get(self->keyName.withPrefix(entry.get().prefix)));
|
||||
ASSERT(val.present());
|
||||
ASSERT(val.get() == tenant);
|
||||
ASSERT(val.get() == tenantItr->first);
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
wait(checkTr.onError(e));
|
||||
|
@ -221,24 +390,42 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
wait(self->checkTenant(cx, self, tenant, self->createdTenants[tenant]));
|
||||
// Perform some final tenant validation
|
||||
wait(self->checkTenantContents(self, tenantItr->first, self->createdTenants[tenantItr->first]));
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_invalid_tenant_name) {
|
||||
ASSERT(tenant.startsWith("\xff"_sr));
|
||||
ASSERT(hasSystemTenant);
|
||||
return Void();
|
||||
} else if (operationType == OperationType::MANAGEMENT_DATABASE) {
|
||||
} else if (e.code() == error_code_tenants_disabled) {
|
||||
ASSERT((operationType == OperationType::METACLUSTER) != self->useMetacluster);
|
||||
return Void();
|
||||
} else if (e.code() == error_code_invalid_metacluster_operation) {
|
||||
ASSERT(operationType == OperationType::METACLUSTER && !self->useMetacluster);
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Database-based operations should not need to be retried
|
||||
else if (operationType == OperationType::MANAGEMENT_DATABASE ||
|
||||
operationType == OperationType::METACLUSTER) {
|
||||
if (e.code() == error_code_tenant_already_exists) {
|
||||
ASSERT(alreadyExists && operationType == OperationType::MANAGEMENT_DATABASE);
|
||||
ASSERT(existedAtStart);
|
||||
} else {
|
||||
TraceEvent(SevError, "CreateTenantFailure").error(e).detail("TenantName", tenant);
|
||||
TraceEvent(SevError, "CreateTenantFailure")
|
||||
.error(e)
|
||||
.detail("TenantName", tenantsToCreate.begin()->first);
|
||||
}
|
||||
return Void();
|
||||
} else {
|
||||
}
|
||||
|
||||
// Transaction-based operations should be retried
|
||||
else {
|
||||
try {
|
||||
wait(tr->onError(e));
|
||||
} catch (Error& e) {
|
||||
for (auto [tenant, _] : tenantsToCreate) {
|
||||
TraceEvent(SevError, "CreateTenantFailure").error(e).detail("TenantName", tenant);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
}
|
||||
|
@ -246,14 +433,53 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
// Deletes the tenant or tenant range using the specified operation type
|
||||
ACTOR Future<Void> deleteImpl(Database cx,
|
||||
Reference<ReadYourWritesTransaction> tr,
|
||||
TenantName beginTenant,
|
||||
Optional<TenantName> endTenant,
|
||||
std::vector<TenantName> tenants,
|
||||
OperationType operationType,
|
||||
TenantManagementWorkload* self) {
|
||||
state int tenantIndex;
|
||||
if (operationType == OperationType::SPECIAL_KEYS) {
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
Key key = self->specialKeysTenantMapPrefix.withSuffix(beginTenant);
|
||||
if (endTenant.present()) {
|
||||
tr->clear(KeyRangeRef(key, self->specialKeysTenantMapPrefix.withSuffix(endTenant.get())));
|
||||
} else {
|
||||
tr->clear(key);
|
||||
}
|
||||
wait(tr->commit());
|
||||
} else if (operationType == OperationType::MANAGEMENT_DATABASE) {
|
||||
ASSERT(!endTenant.present() && tenants.size() == 1);
|
||||
wait(ManagementAPI::deleteTenant(self->dataDb.getReference(), beginTenant));
|
||||
} else if (operationType == OperationType::MANAGEMENT_TRANSACTION) {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
std::vector<Future<Void>> deleteFutures;
|
||||
for (tenantIndex = 0; tenantIndex != tenants.size(); ++tenantIndex) {
|
||||
deleteFutures.push_back(ManagementAPI::deleteTenantTransaction(tr, tenants[tenantIndex]));
|
||||
}
|
||||
|
||||
wait(waitForAll(deleteFutures));
|
||||
wait(tr->commit());
|
||||
} else {
|
||||
ASSERT(!endTenant.present() && tenants.size() == 1);
|
||||
wait(MetaclusterAPI::deleteTenant(self->mvDb, beginTenant));
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> deleteTenant(Database cx, TenantManagementWorkload* self) {
|
||||
state TenantName beginTenant = self->chooseTenantName(true);
|
||||
state OperationType operationType = TenantManagementWorkload::randomOperationType();
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
|
||||
state OperationType operationType = self->randomOperationType();
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->dataDb);
|
||||
|
||||
state Optional<TenantName> endTenant = operationType != OperationType::MANAGEMENT_DATABASE &&
|
||||
!beginTenant.startsWith("\xff"_sr) &&
|
||||
deterministicRandom()->random01() < 0.2
|
||||
// For transaction-based deletion, we randomly allow the deletion of a range of tenants
|
||||
state Optional<TenantName> endTenant =
|
||||
operationType != OperationType::MANAGEMENT_DATABASE && operationType != OperationType::METACLUSTER &&
|
||||
!beginTenant.startsWith("\xff"_sr) && deterministicRandom()->random01() < 0.2
|
||||
? Optional<TenantName>(self->chooseTenantName(false))
|
||||
: Optional<TenantName>();
|
||||
|
||||
|
@ -264,9 +490,17 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
auto itr = self->createdTenants.find(beginTenant);
|
||||
|
||||
// True if the beginTenant should exist and be deletable. This is updated if a deletion fails and gets retried.
|
||||
state bool alreadyExists = itr != self->createdTenants.end();
|
||||
|
||||
// True if the beginTenant existed at the start of this function
|
||||
state bool existedAtStart = alreadyExists;
|
||||
|
||||
// True if all of the tenants in the range are empty and can be deleted
|
||||
state bool isEmpty = true;
|
||||
|
||||
// Collect a list of all tenants that we expect should be deleted by this operation
|
||||
state std::vector<TenantName> tenants;
|
||||
if (!endTenant.present()) {
|
||||
tenants.push_back(beginTenant);
|
||||
|
@ -278,12 +512,14 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
// Check whether each tenant is empty.
|
||||
state int tenantIndex;
|
||||
try {
|
||||
if (alreadyExists || endTenant.present()) {
|
||||
for (tenantIndex = 0; tenantIndex < tenants.size(); ++tenantIndex) {
|
||||
// For most tenants, we will delete the contents and make them empty
|
||||
if (deterministicRandom()->random01() < 0.9) {
|
||||
state Transaction clearTr(cx, tenants[tenantIndex]);
|
||||
state Transaction clearTr(self->dataDb, tenants[tenantIndex]);
|
||||
loop {
|
||||
try {
|
||||
clearTr.clear(self->keyName);
|
||||
|
@ -296,7 +532,9 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
wait(clearTr.onError(e));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
}
|
||||
// Otherwise, we will just report the current emptiness of the tenant
|
||||
else {
|
||||
auto itr = self->createdTenants.find(tenants[tenantIndex]);
|
||||
ASSERT(itr != self->createdTenants.end());
|
||||
isEmpty = isEmpty && itr->second.empty;
|
||||
|
@ -313,37 +551,70 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
|
||||
loop {
|
||||
try {
|
||||
if (operationType == OperationType::SPECIAL_KEYS) {
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
Key key = self->specialKeysTenantMapPrefix.withSuffix(beginTenant);
|
||||
if (endTenant.present()) {
|
||||
tr->clear(KeyRangeRef(key, self->specialKeysTenantMapPrefix.withSuffix(endTenant.get())));
|
||||
// Attempt to delete the tenant(s)
|
||||
loop {
|
||||
try {
|
||||
Optional<Void> result = wait(timeout(
|
||||
self->deleteImpl(cx, tr, beginTenant, endTenant, tenants, operationType, self), 30));
|
||||
|
||||
if (result.present()) {
|
||||
break;
|
||||
}
|
||||
} catch (Error& e) {
|
||||
// If we retried the deletion after our initial attempt succeeded, then we proceed with the rest
|
||||
// of the deletion steps normally. Otherwise, the deletion happened elsewhere and we failed
|
||||
// here, so we can rethrow the error.
|
||||
if (e.code() == error_code_tenant_not_found && existedAtStart) {
|
||||
ASSERT(operationType == OperationType::METACLUSTER ||
|
||||
operationType == OperationType::MANAGEMENT_DATABASE);
|
||||
ASSERT(!alreadyExists);
|
||||
break;
|
||||
} else {
|
||||
tr->clear(key);
|
||||
throw;
|
||||
}
|
||||
wait(tr->commit());
|
||||
} else if (operationType == OperationType::MANAGEMENT_DATABASE) {
|
||||
ASSERT(tenants.size() == 1);
|
||||
for (tenantIndex = 0; tenantIndex != tenants.size(); ++tenantIndex) {
|
||||
wait(ManagementAPI::deleteTenant(cx.getReference(), tenants[tenantIndex]));
|
||||
}
|
||||
|
||||
// Check the state of the first deleted tenant
|
||||
Optional<TenantMapEntry> resultEntry =
|
||||
wait(ManagementAPI::tryGetTenant(cx.getReference(), *tenants.begin()));
|
||||
|
||||
if (!resultEntry.present()) {
|
||||
alreadyExists = false;
|
||||
} else if (resultEntry.get().tenantState == TenantState::REMOVING) {
|
||||
ASSERT(operationType == OperationType::METACLUSTER);
|
||||
} else {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
std::vector<Future<Void>> deleteFutures;
|
||||
for (tenantIndex = 0; tenantIndex != tenants.size(); ++tenantIndex) {
|
||||
deleteFutures.push_back(ManagementAPI::deleteTenantTransaction(tr, tenants[tenantIndex]));
|
||||
ASSERT(resultEntry.get().tenantState == TenantState::READY);
|
||||
}
|
||||
}
|
||||
|
||||
wait(waitForAll(deleteFutures));
|
||||
wait(tr->commit());
|
||||
}
|
||||
|
||||
if (!alreadyExists && !endTenant.present() && operationType != OperationType::MANAGEMENT_DATABASE) {
|
||||
// The management transaction operation is a no-op if there are no tenants to delete in a range delete
|
||||
if (tenants.size() == 0 && operationType == OperationType::MANAGEMENT_TRANSACTION) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
ASSERT(alreadyExists || endTenant.present());
|
||||
// The special keys operation is a no-op if the begin and end tenant are equal (i.e. the range is empty)
|
||||
if (endTenant.present() && beginTenant == endTenant.get() &&
|
||||
operationType == OperationType::SPECIAL_KEYS) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Check that using the wrong deletion type fails depending on whether we are using a metacluster
|
||||
ASSERT(self->useMetacluster == (operationType == OperationType::METACLUSTER));
|
||||
|
||||
// Transaction-based operations do not fail if the tenant isn't present. If we attempted to delete a
|
||||
// single tenant that didn't exist, we can just return.
|
||||
if (!existedAtStart && !endTenant.present() &&
|
||||
(operationType == OperationType::MANAGEMENT_TRANSACTION ||
|
||||
operationType == OperationType::SPECIAL_KEYS)) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
ASSERT(existedAtStart || endTenant.present());
|
||||
|
||||
// Deletion should not succeed if any tenant in the range wasn't empty
|
||||
ASSERT(isEmpty);
|
||||
|
||||
// Update our local state to remove the deleted tenants
|
||||
for (auto tenant : tenants) {
|
||||
self->createdTenants.erase(tenant);
|
||||
}
|
||||
|
@ -352,9 +623,19 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
if (e.code() == error_code_tenant_not_empty) {
|
||||
ASSERT(!isEmpty);
|
||||
return Void();
|
||||
} else if (operationType == OperationType::MANAGEMENT_DATABASE) {
|
||||
} else if (e.code() == error_code_tenants_disabled) {
|
||||
ASSERT((operationType == OperationType::METACLUSTER) != self->useMetacluster);
|
||||
return Void();
|
||||
} else if (e.code() == error_code_invalid_metacluster_operation) {
|
||||
ASSERT(operationType == OperationType::METACLUSTER && !self->useMetacluster);
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Database-based operations do not need to be retried
|
||||
else if (operationType == OperationType::MANAGEMENT_DATABASE ||
|
||||
operationType == OperationType::METACLUSTER) {
|
||||
if (e.code() == error_code_tenant_not_found) {
|
||||
ASSERT(!alreadyExists && !endTenant.present());
|
||||
ASSERT(!existedAtStart && !endTenant.present());
|
||||
} else {
|
||||
TraceEvent(SevError, "DeleteTenantFailure")
|
||||
.error(e)
|
||||
|
@ -362,7 +643,10 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
.detail("EndTenant", endTenant);
|
||||
}
|
||||
return Void();
|
||||
} else {
|
||||
}
|
||||
|
||||
// Transaction-based operations should be retried
|
||||
else {
|
||||
try {
|
||||
wait(tr->onError(e));
|
||||
} catch (Error& e) {
|
||||
|
@ -377,17 +661,22 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> checkTenant(Database cx,
|
||||
TenantManagementWorkload* self,
|
||||
TenantName tenant,
|
||||
TenantState tenantState) {
|
||||
state Transaction tr(cx, tenant);
|
||||
// Performs some validation on a tenant's contents
|
||||
ACTOR Future<Void> checkTenantContents(TenantManagementWorkload* self, TenantName tenant, TenantData tenantData) {
|
||||
state Transaction tr(self->dataDb, tenant);
|
||||
loop {
|
||||
try {
|
||||
// We only every store a single key in each tenant. Therefore we expect a range read of the entire
|
||||
// tenant to return either 0 or 1 keys, depending on whether that key has been set.
|
||||
state RangeResult result = wait(tr.getRange(KeyRangeRef(""_sr, "\xff"_sr), 2));
|
||||
if (tenantState.empty) {
|
||||
|
||||
// An empty tenant should have no data
|
||||
if (tenantData.empty) {
|
||||
ASSERT(result.size() == 0);
|
||||
} else {
|
||||
}
|
||||
// A non-empty tenant should have our single key. The value of that key should be the name of the
|
||||
// tenant.
|
||||
else {
|
||||
ASSERT(result.size() == 1);
|
||||
ASSERT(result[0].key == self->keyName);
|
||||
ASSERT(result[0].value == tenant);
|
||||
|
@ -401,6 +690,8 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
return Void();
|
||||
}
|
||||
|
||||
// Convert the JSON document returned by the special-key space when reading tenant metadata
|
||||
// into a TenantMapEntry
|
||||
static TenantMapEntry jsonToTenantMapEntry(ValueRef tenantJson) {
|
||||
json_spirit::mValue jsonObject;
|
||||
json_spirit::read_string(tenantJson.toString(), jsonObject);
|
||||
|
@ -435,20 +726,12 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
return entry;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> getTenant(Database cx, TenantManagementWorkload* self) {
|
||||
state TenantName tenant = self->chooseTenantName(true);
|
||||
auto itr = self->createdTenants.find(tenant);
|
||||
state bool alreadyExists = itr != self->createdTenants.end();
|
||||
state OperationType operationType = TenantManagementWorkload::randomOperationType();
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
|
||||
|
||||
state TenantState tenantState;
|
||||
if (alreadyExists) {
|
||||
tenantState = itr->second;
|
||||
}
|
||||
|
||||
loop {
|
||||
try {
|
||||
// Gets the metadata for a tenant using the specified operation type
|
||||
ACTOR Future<TenantMapEntry> getImpl(Database cx,
|
||||
Reference<ReadYourWritesTransaction> tr,
|
||||
TenantName tenant,
|
||||
OperationType operationType,
|
||||
TenantManagementWorkload* self) {
|
||||
state TenantMapEntry entry;
|
||||
if (operationType == OperationType::SPECIAL_KEYS) {
|
||||
Key key = self->specialKeysTenantMapPrefix.withSuffix(tenant);
|
||||
|
@ -458,17 +741,42 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
entry = TenantManagementWorkload::jsonToTenantMapEntry(value.get());
|
||||
} else if (operationType == OperationType::MANAGEMENT_DATABASE) {
|
||||
TenantMapEntry _entry = wait(ManagementAPI::getTenant(cx.getReference(), tenant));
|
||||
TenantMapEntry _entry = wait(ManagementAPI::getTenant(self->dataDb.getReference(), tenant));
|
||||
entry = _entry;
|
||||
} else {
|
||||
} else if (operationType == OperationType::MANAGEMENT_TRANSACTION) {
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
TenantMapEntry _entry = wait(ManagementAPI::getTenantTransaction(tr, tenant));
|
||||
entry = _entry;
|
||||
} else {
|
||||
TenantMapEntry _entry = wait(ManagementAPI::getTenant(self->mvDb, tenant));
|
||||
entry = _entry;
|
||||
}
|
||||
|
||||
return entry;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> getTenant(Database cx, TenantManagementWorkload* self) {
|
||||
state TenantName tenant = self->chooseTenantName(true);
|
||||
state OperationType operationType = self->randomOperationType();
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->dataDb);
|
||||
|
||||
// True if the tenant should should exist and return a result
|
||||
auto itr = self->createdTenants.find(tenant);
|
||||
state bool alreadyExists = itr != self->createdTenants.end();
|
||||
|
||||
state TenantData tenantData;
|
||||
if (alreadyExists) {
|
||||
tenantData = itr->second;
|
||||
}
|
||||
|
||||
loop {
|
||||
try {
|
||||
// Get the tenant metadata and check that it matches our local state
|
||||
state TenantMapEntry entry = wait(self->getImpl(cx, tr, tenant, operationType, self));
|
||||
ASSERT(alreadyExists);
|
||||
ASSERT(entry.id == tenantState.id);
|
||||
ASSERT(entry.tenantGroup == tenantState.tenantGroup);
|
||||
wait(self->checkTenant(cx, self, tenant, tenantState));
|
||||
ASSERT(entry.id == tenantData.id);
|
||||
ASSERT(entry.tenantGroup == tenantData.tenantGroup);
|
||||
wait(self->checkTenantContents(self, tenant, tenantData));
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
state bool retry = true;
|
||||
|
@ -477,7 +785,11 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
if (e.code() == error_code_tenant_not_found) {
|
||||
ASSERT(!alreadyExists);
|
||||
return Void();
|
||||
} else if (operationType != OperationType::MANAGEMENT_DATABASE) {
|
||||
}
|
||||
|
||||
// Transaction-based operations should retry
|
||||
else if (operationType == OperationType::MANAGEMENT_TRANSACTION ||
|
||||
operationType == OperationType::SPECIAL_KEYS) {
|
||||
try {
|
||||
wait(tr->onError(e));
|
||||
} catch (Error& e) {
|
||||
|
@ -494,20 +806,16 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> listTenants(Database cx, TenantManagementWorkload* self) {
|
||||
state TenantName beginTenant = self->chooseTenantName(false);
|
||||
state TenantName endTenant = self->chooseTenantName(false);
|
||||
state int limit = std::min(CLIENT_KNOBS->TOO_MANY, deterministicRandom()->randomInt(1, self->maxTenants * 2));
|
||||
state OperationType operationType = TenantManagementWorkload::randomOperationType();
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
|
||||
|
||||
if (beginTenant > endTenant) {
|
||||
std::swap(beginTenant, endTenant);
|
||||
}
|
||||
|
||||
loop {
|
||||
try {
|
||||
// Gets a list of tenants using the specified operation type
|
||||
ACTOR Future<std::map<TenantName, TenantMapEntry>> listImpl(Database cx,
|
||||
Reference<ReadYourWritesTransaction> tr,
|
||||
TenantName beginTenant,
|
||||
TenantName endTenant,
|
||||
int limit,
|
||||
OperationType operationType,
|
||||
TenantManagementWorkload* self) {
|
||||
state std::map<TenantName, TenantMapEntry> tenants;
|
||||
|
||||
if (operationType == OperationType::SPECIAL_KEYS) {
|
||||
KeyRange range = KeyRangeRef(beginTenant, endTenant).withPrefix(self->specialKeysTenantMapPrefix);
|
||||
RangeResult results = wait(tr->getRange(range, limit));
|
||||
|
@ -517,17 +825,42 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
} else if (operationType == OperationType::MANAGEMENT_DATABASE) {
|
||||
std::map<TenantName, TenantMapEntry> _tenants =
|
||||
wait(ManagementAPI::listTenants(cx.getReference(), beginTenant, endTenant, limit));
|
||||
wait(ManagementAPI::listTenants(self->dataDb.getReference(), beginTenant, endTenant, limit));
|
||||
tenants = _tenants;
|
||||
} else {
|
||||
} else if (operationType == OperationType::MANAGEMENT_TRANSACTION) {
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
std::map<TenantName, TenantMapEntry> _tenants =
|
||||
wait(ManagementAPI::listTenantsTransaction(tr, beginTenant, endTenant, limit));
|
||||
tenants = _tenants;
|
||||
} else {
|
||||
std::map<TenantName, TenantMapEntry> _tenants =
|
||||
wait(ManagementAPI::listTenants(self->mvDb, beginTenant, endTenant, limit));
|
||||
tenants = _tenants;
|
||||
}
|
||||
|
||||
return tenants;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> listTenants(Database cx, TenantManagementWorkload* self) {
|
||||
state TenantName beginTenant = self->chooseTenantName(false);
|
||||
state TenantName endTenant = self->chooseTenantName(false);
|
||||
state int limit = std::min(CLIENT_KNOBS->TOO_MANY, deterministicRandom()->randomInt(1, self->maxTenants * 2));
|
||||
state OperationType operationType = self->randomOperationType();
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->dataDb);
|
||||
|
||||
if (beginTenant > endTenant) {
|
||||
std::swap(beginTenant, endTenant);
|
||||
}
|
||||
|
||||
loop {
|
||||
try {
|
||||
// Attempt to read the chosen list of tenants
|
||||
state std::map<TenantName, TenantMapEntry> tenants =
|
||||
wait(self->listImpl(cx, tr, beginTenant, endTenant, limit, operationType, self));
|
||||
|
||||
ASSERT(tenants.size() <= limit);
|
||||
|
||||
// Compare the resulting tenant list to the list we expected to get
|
||||
auto localItr = self->createdTenants.lower_bound(beginTenant);
|
||||
auto tenantMapItr = tenants.begin();
|
||||
for (; tenantMapItr != tenants.end(); ++tenantMapItr, ++localItr) {
|
||||
|
@ -535,18 +868,17 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
ASSERT(localItr->first == tenantMapItr->first);
|
||||
}
|
||||
|
||||
if (!(tenants.size() == limit || localItr == self->createdTenants.end())) {
|
||||
for (auto tenant : self->createdTenants) {
|
||||
TraceEvent("ExistingTenant").detail("Tenant", tenant.first);
|
||||
}
|
||||
}
|
||||
// Make sure the list terminated at the right spot
|
||||
ASSERT(tenants.size() == limit || localItr == self->createdTenants.end() ||
|
||||
localItr->first >= endTenant);
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
state bool retry = true;
|
||||
state Error error = e;
|
||||
if (operationType != OperationType::MANAGEMENT_DATABASE) {
|
||||
|
||||
// Transaction-based operations need to be retried
|
||||
if (operationType == OperationType::MANAGEMENT_TRANSACTION ||
|
||||
operationType == OperationType::SPECIAL_KEYS) {
|
||||
try {
|
||||
wait(tr->onError(e));
|
||||
} catch (Error& e) {
|
||||
|
@ -588,8 +920,11 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
|
||||
Future<bool> check(Database const& cx) override { return _check(cx, this); }
|
||||
ACTOR Future<bool> _check(Database cx, TenantManagementWorkload* self) {
|
||||
state Transaction tr(cx);
|
||||
state Transaction tr(self->dataDb);
|
||||
|
||||
// Check that the key we set outside of the tenant is present and has the correct value
|
||||
// This is the same key we set inside some of our tenants, so this checks that no tenant
|
||||
// writes accidentally happened in the raw key-space
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::RAW_ACCESS);
|
||||
|
@ -601,24 +936,43 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
state std::map<TenantName, TenantState>::iterator itr = self->createdTenants.begin();
|
||||
// Verify that the set of tenants in the database matches our local state
|
||||
state std::map<TenantName, TenantData>::iterator localItr = self->createdTenants.begin();
|
||||
state std::vector<Future<Void>> checkTenants;
|
||||
state TenantName beginTenant = ""_sr.withPrefix(self->localTenantNamePrefix);
|
||||
state TenantName endTenant = "\xff\xff"_sr.withPrefix(self->localTenantNamePrefix);
|
||||
|
||||
loop {
|
||||
std::map<TenantName, TenantMapEntry> tenants =
|
||||
// Read the tenant map from the primary cluster (either management cluster in a metacluster, or just the
|
||||
// cluster otherwise).
|
||||
state std::map<TenantName, TenantMapEntry> tenants =
|
||||
wait(ManagementAPI::listTenants(cx.getReference(), beginTenant, endTenant, 1000));
|
||||
|
||||
// Read the tenant map from the data cluster. If this is not a metacluster it will read from the same
|
||||
// database as above, making it superfluous but still correct.
|
||||
std::map<TenantName, TenantMapEntry> dataClusterTenants =
|
||||
wait(ManagementAPI::listTenants(self->dataDb.getReference(), beginTenant, endTenant, 1000));
|
||||
|
||||
auto managementItr = tenants.begin();
|
||||
auto dataItr = dataClusterTenants.begin();
|
||||
|
||||
TenantNameRef lastTenant;
|
||||
for (auto tenant : tenants) {
|
||||
ASSERT(itr != self->createdTenants.end());
|
||||
ASSERT(tenant.first == itr->first);
|
||||
checkTenants.push_back(self->checkTenant(cx, self, tenant.first, itr->second));
|
||||
lastTenant = tenant.first;
|
||||
++itr;
|
||||
while (managementItr != tenants.end()) {
|
||||
ASSERT(localItr != self->createdTenants.end());
|
||||
ASSERT(dataItr != dataClusterTenants.end());
|
||||
ASSERT(managementItr->first == localItr->first);
|
||||
ASSERT(managementItr->first == dataItr->first);
|
||||
|
||||
checkTenants.push_back(self->checkTenantContents(self, managementItr->first, localItr->second));
|
||||
lastTenant = managementItr->first;
|
||||
|
||||
++localItr;
|
||||
++managementItr;
|
||||
++dataItr;
|
||||
}
|
||||
|
||||
ASSERT(dataItr == dataClusterTenants.end());
|
||||
|
||||
if (tenants.size() < 1000) {
|
||||
break;
|
||||
} else {
|
||||
|
@ -626,7 +980,7 @@ struct TenantManagementWorkload : TestWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
ASSERT(itr == self->createdTenants.end());
|
||||
ASSERT(localItr == self->createdTenants.end());
|
||||
wait(waitForAll(checkTenants));
|
||||
|
||||
return true;
|
||||
|
|
|
@ -39,6 +39,7 @@
|
|||
#include "flow/Knobs.h"
|
||||
#include "flow/Util.h"
|
||||
#include "flow/IndexedSet.h"
|
||||
#include "flow/ThreadHelper.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
#ifdef _MSC_VER
|
||||
|
@ -1908,6 +1909,39 @@ Future<T> forward(Future<T> from, Promise<T> to) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR template <class Transaction>
|
||||
Future<Void> buggifiedCommit(Transaction tr, bool buggify, int delayDuration = 60.0) {
|
||||
state int buggifyPoint = 0;
|
||||
if (buggify && deterministicRandom()->random01() < 0.25) {
|
||||
buggifyPoint = deterministicRandom()->randomInt(1, 5);
|
||||
}
|
||||
|
||||
// Simulate an unknown result that didn't commit
|
||||
if (buggifyPoint == 1) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
|
||||
// Simulate a long delay before commit that could trigger a timeout
|
||||
if (buggifyPoint == 2) {
|
||||
wait(delay(delayDuration));
|
||||
}
|
||||
|
||||
wait(safeThreadFutureToFuture(tr->commit()));
|
||||
|
||||
// Simulate a long delay that could trigger a timeout where the transaction
|
||||
// successfully committed
|
||||
if (buggifyPoint == 3) {
|
||||
wait(delay(delayDuration));
|
||||
}
|
||||
|
||||
// Simulate an unknown result that did commit
|
||||
if (buggifyPoint == 4) {
|
||||
throw commit_unknown_result();
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Monad
|
||||
|
||||
ACTOR template <class Fun, class T>
|
||||
|
|
Loading…
Reference in New Issue