introduce tenant_batch_size argument and change tenant creation/deletion to do many in one transaction. Also introduce getTenant to fdb_api.hpp
This commit is contained in:
parent
9158d79e8e
commit
4bd586a8e4
|
@ -397,6 +397,7 @@ template <typename VarTraits>
|
|||
class TypedFuture : public Future {
|
||||
friend class Future;
|
||||
friend class Transaction;
|
||||
friend class Tenant;
|
||||
using SelfType = TypedFuture<VarTraits>;
|
||||
using Future::Future;
|
||||
// hide type-unsafe inherited functions
|
||||
|
@ -589,6 +590,7 @@ class Tenant final {
|
|||
std::shared_ptr<native::FDBTenant> tenant;
|
||||
|
||||
static constexpr CharsRef tenantManagementMapPrefix = "\xff\xff/management/tenant_map/";
|
||||
static constexpr CharsRef tenantMapPrefix = "\xff/tenantMap/";
|
||||
|
||||
explicit Tenant(native::FDBTenant* tenant_raw) {
|
||||
if (tenant_raw)
|
||||
|
@ -613,6 +615,13 @@ public:
|
|||
tr.clear(toBytesRef(fmt::format("{}{}", tenantManagementMapPrefix, toCharsRef(name))));
|
||||
}
|
||||
|
||||
static TypedFuture<future_var::KeyRef> getTenant(Transaction tr, BytesRef name) {
|
||||
tr.setOption(FDBTransactionOption::FDB_TR_OPTION_READ_SYSTEM_KEYS, BytesRef());
|
||||
tr.setOption(FDBTransactionOption::FDB_TR_OPTION_LOCK_AWARE, BytesRef());
|
||||
tr.setOption(FDBTransactionOption::FDB_TR_OPTION_RAW_ACCESS, BytesRef());
|
||||
return tr.get(toBytesRef(fmt::format("{}{}", tenantMapPrefix, toCharsRef(name))), false);
|
||||
}
|
||||
|
||||
Transaction createTransaction() {
|
||||
auto tx_native = static_cast<native::FDBTransaction*>(nullptr);
|
||||
auto err = Error(native::fdb_tenant_create_transaction(tenant.get(), &tx_native));
|
||||
|
|
|
@ -120,7 +120,8 @@ int cleanup(Database db, Arguments const& args) {
|
|||
|
||||
auto watch = Stopwatch(StartAtCtor{});
|
||||
|
||||
int num_iterations = (args.total_tenants > 1) ? args.total_tenants : 1;
|
||||
// Only need to issue range clears to the active tenants
|
||||
int num_iterations = (args.active_tenants > 1) ? args.active_tenants : 1;
|
||||
for (int i = 0; i < num_iterations; ++i) {
|
||||
// If args.total_tenants is zero, this will use a non-tenant txn and perform a single range clear.
|
||||
// If 1, it will use a tenant txn and do a single range clear instead.
|
||||
|
@ -139,22 +140,31 @@ int cleanup(Database db, Arguments const& args) {
|
|||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If tenants are specified, also delete the tenant after clearing out its keyspace
|
||||
if (args.total_tenants > 0) {
|
||||
Transaction systemTx = db.createTransaction();
|
||||
while (true) {
|
||||
Tenant::deleteTenant(systemTx, toBytesRef("tenant" + std::to_string(i)));
|
||||
auto future_commit = systemTx.commit();
|
||||
const auto rc = waitAndHandleError(systemTx, future_commit, "DELETE_TENANT");
|
||||
if (rc == FutureRC::OK) {
|
||||
break;
|
||||
} else if (rc == FutureRC::RETRY || rc == FutureRC::CONFLICT) {
|
||||
// tx already reset
|
||||
continue;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
// Delete tenants in batches after each keyspace is clean
|
||||
if (args.total_tenants > 0) {
|
||||
Transaction systemTx = db.createTransaction();
|
||||
int batch_size = args.tenant_batch_size;
|
||||
int batches = (args.total_tenants + batch_size - 1) / batch_size;
|
||||
for (int batch = 0; batch < batches; ++batch) {
|
||||
for (int i = batch * batch_size; i < args.total_tenants && i < (batch + 1) * batch_size; ++i) {
|
||||
std::string tenant_name = "tenant" + std::to_string(i);
|
||||
Tenant::deleteTenant(systemTx, toBytesRef(tenant_name));
|
||||
}
|
||||
auto future_commit = systemTx.commit();
|
||||
const auto rc = waitAndHandleError(systemTx, future_commit, "DELETE_TENANT");
|
||||
if (rc == FutureRC::OK) {
|
||||
// Keep going with reset transaction if commit was successful
|
||||
systemTx.reset();
|
||||
} else if (rc == FutureRC::RETRY) {
|
||||
// We want to retry this batch, so decrement the number
|
||||
// and go back through the loop to get the same value
|
||||
// Transaction is already reset
|
||||
--batch;
|
||||
} else {
|
||||
// Abort
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -182,27 +192,52 @@ int populate(Database db,
|
|||
auto watch_tx = Stopwatch(watch_total.getStart());
|
||||
auto watch_trace = Stopwatch(watch_total.getStart());
|
||||
|
||||
Transaction systemTx = db.createTransaction();
|
||||
// This can be parameterized for mako or just left hardcoded to
|
||||
// whatever value we find suitable
|
||||
int batch_size = 1000;
|
||||
int batches = (args.total_tenants + batch_size - 1) / batch_size;
|
||||
for (int batch = 0; batch < batches; ++batch) {
|
||||
for (int i = batch * batch_size; i < args.total_tenants && i < (batch + 1) * batch_size; ++i) {
|
||||
std::string tenant_name = "tenant" + std::to_string(i);
|
||||
Tenant::createTenant(systemTx, toBytesRef(tenant_name));
|
||||
if (args.total_tenants > 0) {
|
||||
Transaction systemTx = db.createTransaction();
|
||||
// Have one thread create all the tenants, then let the rest help with data population
|
||||
if (worker_id == 0 && thread_id == 0) {
|
||||
int batch_size = args.tenant_batch_size;
|
||||
int batches = (args.total_tenants + batch_size - 1) / batch_size;
|
||||
for (int batch = 0; batch < batches; ++batch) {
|
||||
for (int i = batch * batch_size; i < args.total_tenants && i < (batch + 1) * batch_size; ++i) {
|
||||
std::string tenant_name = "tenant" + std::to_string(i);
|
||||
Tenant::createTenant(systemTx, toBytesRef(tenant_name));
|
||||
}
|
||||
auto future_commit = systemTx.commit();
|
||||
const auto rc = waitAndHandleError(systemTx, future_commit, "CREATE_TENANT");
|
||||
if (rc == FutureRC::OK) {
|
||||
// Keep going with reset transaction if commit was successful
|
||||
systemTx.reset();
|
||||
} else if (rc == FutureRC::RETRY) {
|
||||
// We want to retry this batch, so decrement the number
|
||||
// and go back through the loop to get the same value
|
||||
// Transaction is already reset
|
||||
--batch;
|
||||
} else {
|
||||
// Abort
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
std::string last_tenant_name = "tenant" + std::to_string(args.total_tenants - 1);
|
||||
while (true) {
|
||||
auto result = Tenant::getTenant(systemTx, toBytesRef(last_tenant_name));
|
||||
const auto rc = waitAndHandleError(systemTx, result, "GET_TENANT");
|
||||
if (rc == FutureRC::OK) {
|
||||
// If we get valid tenant metadata, the main thread has finished
|
||||
if (result.get() != BytesRef()) {
|
||||
break;
|
||||
}
|
||||
systemTx.reset();
|
||||
} else if (rc == FutureRC::RETRY) {
|
||||
continue;
|
||||
} else {
|
||||
// Abort
|
||||
return -1;
|
||||
}
|
||||
usleep(1000);
|
||||
}
|
||||
}
|
||||
auto future_commit = systemTx.commit();
|
||||
const auto rc = waitAndHandleError(systemTx, future_commit, "CREATE_TENANT");
|
||||
if (rc == FutureRC::RETRY) {
|
||||
// We want to retry this batch, so decrement the number
|
||||
// and go back through the loop to get the same value
|
||||
--batch;
|
||||
}
|
||||
// Otherwise, keep going if commit was successful (FutureRC::OK)
|
||||
// If not a retryable error, expected to be the error
|
||||
// tenant_already_exists, meaning another thread finished creating it
|
||||
systemTx.reset();
|
||||
}
|
||||
// mimic typical tenant usage: keep tenants in memory
|
||||
// and create transactions as needed
|
||||
|
@ -884,6 +919,7 @@ int initArguments(Arguments& args) {
|
|||
args.value_length = 16;
|
||||
args.active_tenants = 0;
|
||||
args.total_tenants = 0;
|
||||
args.tenant_batch_size = 10000;
|
||||
args.zipf = 0;
|
||||
args.commit_get = 0;
|
||||
args.verbose = 1;
|
||||
|
@ -1060,6 +1096,7 @@ void usage() {
|
|||
printf("%-24s %s\n", " --vallen=LENGTH", "Specify the value lengths");
|
||||
printf("%-24s %s\n", " --active_tenants=ACTIVE_TENANTS", "Specify the number of tenants to use");
|
||||
printf("%-24s %s\n", " --total_tenants=TOTAL_TENANTS", "Specify the number of tenants to create");
|
||||
printf("%-24s %s\n", " --tenant_batch_size=SIZE", "Specify how many tenants to create/delete per transaction");
|
||||
printf("%-24s %s\n", "-x, --transaction=SPEC", "Transaction specification");
|
||||
printf("%-24s %s\n", " --tps|--tpsmax=TPS", "Specify the target max TPS");
|
||||
printf("%-24s %s\n", " --tpsmin=TPS", "Specify the target min TPS");
|
||||
|
@ -1117,6 +1154,7 @@ int parseArguments(int argc, char* argv[], Arguments& args) {
|
|||
{ "vallen", required_argument, NULL, ARG_VALLEN },
|
||||
{ "active_tenants", required_argument, NULL, ARG_ACTIVE_TENANTS },
|
||||
{ "total_tenants", required_argument, NULL, ARG_TOTAL_TENANTS },
|
||||
{ "tenant_batch_size", required_argument, NULL, ARG_TENANT_BATCH_SIZE },
|
||||
{ "transaction", required_argument, NULL, 'x' },
|
||||
{ "tps", required_argument, NULL, ARG_TPS },
|
||||
{ "tpsmax", required_argument, NULL, ARG_TPSMAX },
|
||||
|
@ -1239,6 +1277,9 @@ int parseArguments(int argc, char* argv[], Arguments& args) {
|
|||
case ARG_TOTAL_TENANTS:
|
||||
args.total_tenants = atoi(optarg);
|
||||
break;
|
||||
case ARG_TENANT_BATCH_SIZE:
|
||||
args.tenant_batch_size = atoi(optarg);
|
||||
break;
|
||||
case ARG_TPS:
|
||||
case ARG_TPSMAX:
|
||||
args.tpsmax = atoi(optarg);
|
||||
|
@ -1432,6 +1473,10 @@ int validateArguments(Arguments const& args) {
|
|||
logr.error("--active_tenants must be less than or equal to --total_tenants");
|
||||
return -1;
|
||||
}
|
||||
if (args.tenant_batch_size < 1) {
|
||||
logr.error("--tenant_batch_size must be at least 1");
|
||||
return -1;
|
||||
}
|
||||
if (args.mode == MODE_RUN) {
|
||||
if ((args.seconds > 0) && (args.iteration > 0)) {
|
||||
logr.error("Cannot specify seconds and iteration together");
|
||||
|
|
|
@ -52,6 +52,7 @@ enum ArgKind {
|
|||
ARG_VALLEN,
|
||||
ARG_ACTIVE_TENANTS,
|
||||
ARG_TOTAL_TENANTS,
|
||||
ARG_TENANT_BATCH_SIZE,
|
||||
ARG_TPS,
|
||||
ARG_ASYNC,
|
||||
ARG_COMMITGET,
|
||||
|
@ -148,6 +149,7 @@ struct Arguments {
|
|||
int value_length;
|
||||
int active_tenants;
|
||||
int total_tenants;
|
||||
int tenant_batch_size;
|
||||
int zipf;
|
||||
int commit_get;
|
||||
int verbose;
|
||||
|
|
Loading…
Reference in New Issue