Merge pull request #7402 from sfc-gh-jfu/jfu-mako-active-tenants

Introduce concept of "active" versus "total" tenants in mako
This commit is contained in:
A.J. Beamon 2022-07-06 14:23:11 -07:00 committed by GitHub
commit ae6d6e0e72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 221 additions and 62 deletions

View File

@ -91,6 +91,21 @@ inline int intSize(BytesRef b) {
return static_cast<int>(b.size());
}
template <template <class...> class StringLike, class Char>
ByteString strinc(const StringLike<Char>& s) {
int index;
for (index = s.size() - 1; index >= 0; index--)
if (s[index] != 255)
break;
// Must not be called with a string that consists only of zero or more '\xff' bytes.
assert(index >= 0);
ByteString byteResult(s.substr(0, index + 1));
byteResult[byteResult.size() - 1]++;
return byteResult;
}
class Error {
public:
using CodeType = native::fdb_error_t;
@ -387,6 +402,7 @@ template <typename VarTraits>
class TypedFuture : public Future {
friend class Future;
friend class Transaction;
friend class Tenant;
using SelfType = TypedFuture<VarTraits>;
using Future::Future;
// hide type-unsafe inherited functions
@ -611,6 +627,13 @@ public:
tr.clear(toBytesRef(fmt::format("{}{}", tenantManagementMapPrefix, toCharsRef(name))));
}
static TypedFuture<future_var::ValueRef> getTenant(Transaction tr, BytesRef name) {
tr.setOption(FDBTransactionOption::FDB_TR_OPTION_READ_SYSTEM_KEYS, BytesRef());
tr.setOption(FDBTransactionOption::FDB_TR_OPTION_LOCK_AWARE, BytesRef());
tr.setOption(FDBTransactionOption::FDB_TR_OPTION_RAW_ACCESS, BytesRef());
return tr.get(toBytesRef(fmt::format("{}{}", tenantManagementMapPrefix, toCharsRef(name))), false);
}
Transaction createTransaction() {
auto tx_native = static_cast<native::FDBTransaction*>(nullptr);
auto err = Error(native::fdb_tenant_create_transaction(tenant.get(), &tx_native));

View File

@ -66,7 +66,8 @@ namespace mako {
struct alignas(64) ThreadArgs {
int worker_id;
int thread_id;
int tenants;
int active_tenants;
int total_tenants;
pid_t parent_id;
Arguments const* args;
shared_memory::Access shm;
@ -82,11 +83,11 @@ thread_local Logger logr = Logger(MainProcess{}, VERBOSE_DEFAULT);
Transaction createNewTransaction(Database db, Arguments const& args, int id = -1, Tenant* tenants = nullptr) {
// No tenants specified
if (args.tenants <= 0) {
if (args.active_tenants <= 0) {
return db.createTransaction();
}
// Create Tenant Transaction
int tenant_id = (id == -1) ? urand(0, args.tenants - 1) : id;
int tenant_id = (id == -1) ? urand(0, args.active_tenants - 1) : id;
// If provided tenants array, use it
if (tenants) {
return tenants[tenant_id].createTransaction();
@ -97,6 +98,23 @@ Transaction createNewTransaction(Database db, Arguments const& args, int id = -1
return t.createTransaction();
}
uint64_t byteswapHelper(uint64_t input) {
uint64_t output = 0;
for (int i = 0; i < 8; ++i) {
output <<= 8;
output += input & 0xFF;
input >>= 8;
}
return output;
}
void computeTenantPrefix(ByteString& s, uint64_t id) {
uint64_t swapped = byteswapHelper(id);
BytesRef temp = reinterpret_cast<const uint8_t*>(&swapped);
memcpy(&s[0], temp.data(), 8);
}
/* cleanup database */
int cleanup(Database db, Arguments const& args) {
const auto prefix_len = args.prefixpadding ? args.key_length - args.row_digits : intSize(KEY_PREFIX);
@ -117,12 +135,8 @@ int cleanup(Database db, Arguments const& args) {
auto watch = Stopwatch(StartAtCtor{});
int num_iterations = (args.tenants > 1) ? args.tenants : 1;
for (int i = 0; i < num_iterations; ++i) {
// If args.tenants is zero, this will use a non-tenant txn and perform a single range clear.
// If 1, it will use a tenant txn and do a single range clear instead.
// If > 1, it will perform a range clear with a different tenant txn per iteration.
Transaction tx = createNewTransaction(db, args, i);
Transaction tx = db.createTransaction();
if (args.total_tenants == 0) {
while (true) {
tx.clearRange(beginstr, endstr);
auto future_commit = tx.commit();
@ -136,23 +150,89 @@ int cleanup(Database db, Arguments const& args) {
return -1;
}
}
// If tenants are specified, also delete the tenant after clearing out its keyspace
if (args.tenants > 0) {
Transaction systemTx = db.createTransaction();
while (true) {
Tenant::deleteTenant(systemTx, toBytesRef("tenant" + std::to_string(i)));
auto future_commit = systemTx.commit();
const auto rc = waitAndHandleError(systemTx, future_commit, "DELETE_TENANT");
if (rc == FutureRC::OK) {
break;
} else if (rc == FutureRC::RETRY || rc == FutureRC::CONFLICT) {
// tx already reset
continue;
} else {
return -1;
} else {
int batch_size = args.tenant_batch_size;
int batches = (args.total_tenants + batch_size - 1) / batch_size;
// First loop to clear all tenant key ranges
for (int batch = 0; batch < batches; ++batch) {
fdb::TypedFuture<fdb::future_var::ValueRef> tenantResults[batch_size];
// Issue all tenant reads first
Transaction getTx = db.createTransaction();
for (int i = batch * batch_size; i < args.total_tenants && i < (batch + 1) * batch_size; ++i) {
std::string tenant_name = "tenant" + std::to_string(i);
tenantResults[i - (batch * batch_size)] = Tenant::getTenant(getTx, toBytesRef(tenant_name));
}
tx.setOption(FDBTransactionOption::FDB_TR_OPTION_LOCK_AWARE, BytesRef());
tx.setOption(FDBTransactionOption::FDB_TR_OPTION_RAW_ACCESS, BytesRef());
for (int i = batch * batch_size; i < args.total_tenants && i < (batch + 1) * batch_size; ++i) {
std::string tenant_name = "tenant" + std::to_string(i);
while (true) {
const auto rc = waitAndHandleError(getTx, tenantResults[i - (batch * batch_size)], "GET_TENANT");
if (rc == FutureRC::OK) {
// Read the tenant metadata for the prefix and issue a range clear
if (tenantResults[i - (batch * batch_size)].get().has_value()) {
ByteString val(tenantResults[i - (batch * batch_size)].get().value());
rapidjson::Document doc;
const char* metadata = reinterpret_cast<const char*>(val.c_str());
doc.Parse(metadata);
if (!doc.HasParseError()) {
// rapidjson does not decode the prefix as the same byte string that
// was passed as input. This is because we use a non-standard encoding.
// The encoding will likely change in the future.
// For a workaround, we take the id and compute the prefix on our own
rapidjson::Value& docVal = doc["id"];
uint64_t id = docVal.GetUint64();
ByteString tenantPrefix(8, '\0');
computeTenantPrefix(tenantPrefix, id);
ByteString tenantPrefixEnd = strinc(tenantPrefix);
tx.clearRange(toBytesRef(tenantPrefix), toBytesRef(tenantPrefixEnd));
}
}
break;
} else if (rc == FutureRC::RETRY) {
continue;
} else {
// Abort
return -1;
}
}
}
auto future_commit = tx.commit();
const auto rc = waitAndHandleError(tx, future_commit, "TENANT_COMMIT_CLEANUP");
if (rc == FutureRC::OK) {
// Keep going with reset transaction if commit was successful
tx.reset();
} else if (rc == FutureRC::RETRY) {
// We want to retry this batch, so decrement the number
// and go back through the loop to get the same value
// Transaction is already reset
--batch;
} else {
// Abort
return -1;
}
}
// Second loop to delete the tenants
tx.reset();
for (int batch = 0; batch < batches; ++batch) {
for (int i = batch * batch_size; i < args.total_tenants && i < (batch + 1) * batch_size; ++i) {
std::string tenant_name = "tenant" + std::to_string(i);
Tenant::deleteTenant(tx, toBytesRef(tenant_name));
}
auto future_commit = tx.commit();
const auto rc = waitAndHandleError(tx, future_commit, "DELETE_TENANT");
if (rc == FutureRC::OK) {
// Keep going with reset transaction if commit was successful
tx.reset();
} else if (rc == FutureRC::RETRY) {
// We want to retry this batch, so decrement the number
// and go back through the loop to get the same value
// Transaction is already reset
--batch;
} else {
// Abort
return -1;
}
}
}
@ -168,7 +248,6 @@ int populate(Database db,
int thread_tps,
ThreadStatistics& stats) {
auto xacts = 0;
auto keystr = ByteString{};
auto valstr = ByteString{};
keystr.resize(args.key_length);
@ -180,39 +259,65 @@ int populate(Database db,
auto watch_tx = Stopwatch(watch_total.getStart());
auto watch_trace = Stopwatch(watch_total.getStart());
Transaction systemTx = db.createTransaction();
for (int i = 0; i < args.tenants; ++i) {
while (true) {
// Until this issue https://github.com/apple/foundationdb/issues/7260 is resolved
// we have to commit each tenant creation transaction one-by-one
// while (i % 10 == 9 || i == args.tenants - 1) {
std::string tenant_name = "tenant" + std::to_string(i);
Tenant::createTenant(systemTx, toBytesRef(tenant_name));
auto future_commit = systemTx.commit();
const auto rc = waitAndHandleError(systemTx, future_commit, "CREATE_TENANT");
if (rc == FutureRC::RETRY) {
continue;
} else {
// Keep going if commit was successful (FutureRC::OK)
// If not a retryable error, expected to be the error
// tenant_already_exists, meaning another thread finished creating it
systemTx.reset();
break;
if (args.total_tenants > 0) {
Transaction systemTx = db.createTransaction();
// Have one thread create all the tenants, then let the rest help with data population
if (worker_id == 0 && thread_id == 0) {
int batch_size = args.tenant_batch_size;
int batches = (args.total_tenants + batch_size - 1) / batch_size;
for (int batch = 0; batch < batches; ++batch) {
for (int i = batch * batch_size; i < args.total_tenants && i < (batch + 1) * batch_size; ++i) {
std::string tenant_name = "tenant" + std::to_string(i);
Tenant::createTenant(systemTx, toBytesRef(tenant_name));
}
auto future_commit = systemTx.commit();
const auto rc = waitAndHandleError(systemTx, future_commit, "CREATE_TENANT");
if (rc == FutureRC::OK) {
// Keep going with reset transaction if commit was successful
systemTx.reset();
} else if (rc == FutureRC::RETRY) {
// We want to retry this batch, so decrement the number
// and go back through the loop to get the same value
// Transaction is already reset
--batch;
} else {
// Abort
return -1;
}
}
} else {
std::string last_tenant_name = "tenant" + std::to_string(args.total_tenants - 1);
while (true) {
auto result = Tenant::getTenant(systemTx, toBytesRef(last_tenant_name));
const auto rc = waitAndHandleError(systemTx, result, "GET_TENANT");
if (rc == FutureRC::OK) {
// If we get valid tenant metadata, the main thread has finished
if (result.get().has_value()) {
break;
}
systemTx.reset();
} else if (rc == FutureRC::RETRY) {
continue;
} else {
// Abort
return -1;
}
usleep(1000);
}
}
}
// mimic typical tenant usage: keep tenants in memory
// and create transactions as needed
Tenant tenants[args.tenants];
for (int i = 0; i < args.tenants; ++i) {
Tenant tenants[args.active_tenants];
for (int i = 0; i < args.active_tenants; ++i) {
std::string tenantStr = "tenant" + std::to_string(i);
BytesRef tenant_name = toBytesRef(tenantStr);
tenants[i] = db.openTenant(tenant_name);
}
int populate_iters = args.tenants > 0 ? args.tenants : 1;
int populate_iters = args.active_tenants > 0 ? args.active_tenants : 1;
// Each tenant should have the same range populated
for (auto t_id = 0; t_id < populate_iters; ++t_id) {
Transaction tx = createNewTransaction(db, args, t_id, args.tenants > 0 ? tenants : nullptr);
Transaction tx = createNewTransaction(db, args, t_id, args.active_tenants > 0 ? tenants : nullptr);
const auto key_begin = insertBegin(args.rows, worker_id, thread_id, args.num_processes, args.num_threads);
const auto key_end = insertEnd(args.rows, worker_id, thread_id, args.num_processes, args.num_threads);
auto key_checkpoint = key_begin; // in case of commit failure, restart from this key
@ -261,7 +366,7 @@ int populate(Database db,
auto tx_restarter = ExitGuard([&watch_tx]() { watch_tx.startFromStop(); });
if (rc == FutureRC::OK) {
key_checkpoint = i + 1; // restart on failures from next key
tx = createNewTransaction(db, args, t_id, args.tenants > 0 ? tenants : nullptr);
tx = createNewTransaction(db, args, t_id, args.active_tenants > 0 ? tenants : nullptr);
} else if (rc == FutureRC::ABORT) {
return -1;
} else {
@ -440,8 +545,8 @@ int runWorkload(Database db,
// mimic typical tenant usage: keep tenants in memory
// and create transactions as needed
Tenant tenants[args.tenants];
for (int i = 0; i < args.tenants; ++i) {
Tenant tenants[args.active_tenants];
for (int i = 0; i < args.active_tenants; ++i) {
std::string tenantStr = "tenant" + std::to_string(i);
BytesRef tenant_name = toBytesRef(tenantStr);
tenants[i] = db.openTenant(tenant_name);
@ -449,7 +554,7 @@ int runWorkload(Database db,
/* main transaction loop */
while (1) {
Transaction tx = createNewTransaction(db, args, -1, args.tenants > 0 ? tenants : nullptr);
Transaction tx = createNewTransaction(db, args, -1, args.active_tenants > 0 ? tenants : nullptr);
while ((thread_tps > 0) && (xacts >= current_tps)) {
/* throttle on */
const auto time_now = steady_clock::now();
@ -809,7 +914,8 @@ int workerProcessMain(Arguments const& args, int worker_id, shared_memory::Acces
this_args.worker_id = worker_id;
this_args.thread_id = i;
this_args.parent_id = pid_main;
this_args.tenants = args.tenants;
this_args.active_tenants = args.active_tenants;
this_args.total_tenants = args.total_tenants;
this_args.args = &args;
this_args.shm = shm;
this_args.database = databases[i % args.num_databases];
@ -878,7 +984,9 @@ int initArguments(Arguments& args) {
args.sampling = 1000;
args.key_length = 32;
args.value_length = 16;
args.tenants = 0;
args.active_tenants = 0;
args.total_tenants = 0;
args.tenant_batch_size = 10000;
args.zipf = 0;
args.commit_get = 0;
args.verbose = 1;
@ -1053,7 +1161,9 @@ void usage() {
printf("%-24s %s\n", "", "This option cannot be specified with --seconds.");
printf("%-24s %s\n", " --keylen=LENGTH", "Specify the key lengths");
printf("%-24s %s\n", " --vallen=LENGTH", "Specify the value lengths");
printf("%-24s %s\n", " --tenants=TENANTS", "Specify the number of tenants to use");
printf("%-24s %s\n", " --active_tenants=ACTIVE_TENANTS", "Specify the number of tenants to use");
printf("%-24s %s\n", " --total_tenants=TOTAL_TENANTS", "Specify the number of tenants to create");
printf("%-24s %s\n", " --tenant_batch_size=SIZE", "Specify how many tenants to create/delete per transaction");
printf("%-24s %s\n", "-x, --transaction=SPEC", "Transaction specification");
printf("%-24s %s\n", " --tps|--tpsmax=TPS", "Specify the target max TPS");
printf("%-24s %s\n", " --tpsmin=TPS", "Specify the target min TPS");
@ -1109,7 +1219,9 @@ int parseArguments(int argc, char* argv[], Arguments& args) {
{ "iteration", required_argument, NULL, 'i' },
{ "keylen", required_argument, NULL, ARG_KEYLEN },
{ "vallen", required_argument, NULL, ARG_VALLEN },
{ "tenants", required_argument, NULL, ARG_TENANTS },
{ "active_tenants", required_argument, NULL, ARG_ACTIVE_TENANTS },
{ "total_tenants", required_argument, NULL, ARG_TOTAL_TENANTS },
{ "tenant_batch_size", required_argument, NULL, ARG_TENANT_BATCH_SIZE },
{ "transaction", required_argument, NULL, 'x' },
{ "tps", required_argument, NULL, ARG_TPS },
{ "tpsmax", required_argument, NULL, ARG_TPSMAX },
@ -1226,8 +1338,14 @@ int parseArguments(int argc, char* argv[], Arguments& args) {
case ARG_VALLEN:
args.value_length = atoi(optarg);
break;
case ARG_TENANTS:
args.tenants = atoi(optarg);
case ARG_ACTIVE_TENANTS:
args.active_tenants = atoi(optarg);
break;
case ARG_TOTAL_TENANTS:
args.total_tenants = atoi(optarg);
break;
case ARG_TENANT_BATCH_SIZE:
args.tenant_batch_size = atoi(optarg);
break;
case ARG_TPS:
case ARG_TPSMAX:
@ -1418,6 +1536,14 @@ int validateArguments(Arguments const& args) {
4 + args.row_digits);
return -1;
}
if (args.active_tenants > args.total_tenants) {
logr.error("--active_tenants must be less than or equal to --total_tenants");
return -1;
}
if (args.tenant_batch_size < 1) {
logr.error("--tenant_batch_size must be at least 1");
return -1;
}
if (args.mode == MODE_RUN) {
if ((args.seconds > 0) && (args.iteration > 0)) {
logr.error("Cannot specify seconds and iteration together");
@ -1984,7 +2110,8 @@ int statsProcessMain(Arguments const& args,
fmt::fprintf(fp, "\"sampling\": %d,", args.sampling);
fmt::fprintf(fp, "\"key_length\": %d,", args.key_length);
fmt::fprintf(fp, "\"value_length\": %d,", args.value_length);
fmt::fprintf(fp, "\"tenants\": %d,", args.tenants);
fmt::fprintf(fp, "\"active_tenants\": %d,", args.active_tenants);
fmt::fprintf(fp, "\"total_tenants\": %d,", args.total_tenants);
fmt::fprintf(fp, "\"commit_get\": %d,", args.commit_get);
fmt::fprintf(fp, "\"verbose\": %d,", args.verbose);
fmt::fprintf(fp, "\"cluster_files\": \"%s\",", args.cluster_files[0]);
@ -2108,11 +2235,16 @@ int main(int argc, char* argv[]) {
/* usage printed */
return 0;
}
if (args.tenants > 1) {
args.rows = args.rows / args.tenants;
if (args.active_tenants > 1) {
args.rows = args.rows / args.active_tenants;
args.row_digits = digits(args.rows);
}
// Allow specifying only the number of active tenants, in which case # active = # total
if (args.active_tenants > 0 && args.total_tenants == 0) {
args.total_tenants = args.active_tenants;
}
rc = validateArguments(args);
if (rc < 0)
return -1;

View File

@ -50,7 +50,9 @@ constexpr const int MODE_REPORT = 3;
enum ArgKind {
ARG_KEYLEN,
ARG_VALLEN,
ARG_TENANTS,
ARG_ACTIVE_TENANTS,
ARG_TOTAL_TENANTS,
ARG_TENANT_BATCH_SIZE,
ARG_TPS,
ARG_ASYNC,
ARG_COMMITGET,
@ -145,7 +147,9 @@ struct Arguments {
int sampling;
int key_length;
int value_length;
int tenants;
int active_tenants;
int total_tenants;
int tenant_batch_size;
int zipf;
int commit_get;
int verbose;