Merge pull request #7387 from sfc-gh-jfu/jfu-mako-tenant-rows
When provided tenants in mako, divide number of rows by number of tenants.
This commit is contained in:
commit
b891b424ea
|
@ -87,7 +87,7 @@ Transaction createNewTransaction(Database db, Arguments const& args, int id = -1
|
|||
}
|
||||
// Create Tenant Transaction
|
||||
int tenant_id = (id == -1) ? urand(0, args.tenants - 1) : id;
|
||||
// If provided tenants array (only necessary in runWorkload), use it
|
||||
// If provided tenants array, use it
|
||||
if (tenants) {
|
||||
return tenants[tenant_id].createTransaction();
|
||||
}
|
||||
|
@ -167,8 +167,6 @@ int populate(Database db,
|
|||
int thread_id,
|
||||
int thread_tps,
|
||||
ThreadStatistics& stats) {
|
||||
const auto key_begin = insertBegin(args.rows, worker_id, thread_id, args.num_processes, args.num_threads);
|
||||
const auto key_end = insertEnd(args.rows, worker_id, thread_id, args.num_processes, args.num_threads);
|
||||
auto xacts = 0;
|
||||
|
||||
auto keystr = ByteString{};
|
||||
|
@ -181,7 +179,6 @@ int populate(Database db,
|
|||
auto watch_throttle = Stopwatch(watch_total.getStart());
|
||||
auto watch_tx = Stopwatch(watch_total.getStart());
|
||||
auto watch_trace = Stopwatch(watch_total.getStart());
|
||||
auto key_checkpoint = key_begin; // in case of commit failure, restart from this key
|
||||
|
||||
Transaction systemTx = db.createTransaction();
|
||||
for (int i = 0; i < args.tenants; ++i) {
|
||||
|
@ -204,77 +201,92 @@ int populate(Database db,
|
|||
}
|
||||
}
|
||||
}
|
||||
Transaction tx = createNewTransaction(db, args);
|
||||
for (auto i = key_begin; i <= key_end; i++) {
|
||||
/* sequential keys */
|
||||
genKey(keystr.data(), KEY_PREFIX, args, i);
|
||||
/* random values */
|
||||
randomString(valstr.data(), args.value_length);
|
||||
|
||||
while (thread_tps > 0 && xacts >= thread_tps /* throttle */) {
|
||||
if (toIntegerSeconds(watch_throttle.stop().diff()) >= 1) {
|
||||
xacts = 0;
|
||||
watch_throttle.startFromStop();
|
||||
} else {
|
||||
usleep(1000);
|
||||
}
|
||||
}
|
||||
if (num_seconds_trace_every) {
|
||||
if (toIntegerSeconds(watch_trace.stop().diff()) >= num_seconds_trace_every) {
|
||||
watch_trace.startFromStop();
|
||||
logr.debug("txn tracing {}", toCharsRef(keystr));
|
||||
auto err = Error{};
|
||||
err = tx.setOptionNothrow(FDB_TR_OPTION_DEBUG_TRANSACTION_IDENTIFIER, keystr);
|
||||
if (err) {
|
||||
logr.error("setOption(TR_OPTION_DEBUG_TRANSACTION_IDENTIFIER): {}", err.what());
|
||||
}
|
||||
err = tx.setOptionNothrow(FDB_TR_OPTION_LOG_TRANSACTION, BytesRef());
|
||||
if (err) {
|
||||
logr.error("setOption(TR_OPTION_LOG_TRANSACTION): {}", err.what());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* insert (SET) */
|
||||
tx.set(keystr, valstr);
|
||||
stats.incrOpCount(OP_INSERT);
|
||||
|
||||
/* commit every 100 inserts (default) or if this is the last key */
|
||||
if ((i % num_commit_every == 0) || i == key_end) {
|
||||
const auto do_sample = (stats.getOpCount(OP_TRANSACTION) % args.sampling) == 0;
|
||||
auto watch_commit = Stopwatch(StartAtCtor{});
|
||||
auto future_commit = tx.commit();
|
||||
const auto rc = waitAndHandleError(tx, future_commit, "COMMIT_POPULATE_INSERT");
|
||||
watch_commit.stop();
|
||||
watch_tx.setStop(watch_commit.getStop());
|
||||
auto tx_restarter = ExitGuard([&watch_tx]() { watch_tx.startFromStop(); });
|
||||
if (rc == FutureRC::OK) {
|
||||
key_checkpoint = i + 1; // restart on failures from next key
|
||||
tx = createNewTransaction(db, args);
|
||||
} else if (rc == FutureRC::ABORT) {
|
||||
return -1;
|
||||
} else {
|
||||
i = key_checkpoint - 1; // restart from last committed
|
||||
continue;
|
||||
}
|
||||
/* xact latency stats */
|
||||
if (do_sample) {
|
||||
const auto commit_latency = watch_commit.diff();
|
||||
const auto tx_duration = watch_tx.diff();
|
||||
stats.addLatency(OP_COMMIT, commit_latency);
|
||||
stats.addLatency(OP_TRANSACTION, tx_duration);
|
||||
}
|
||||
stats.incrOpCount(OP_COMMIT);
|
||||
stats.incrOpCount(OP_TRANSACTION);
|
||||
|
||||
xacts++; /* for throttling */
|
||||
}
|
||||
// mimic typical tenant usage: keep tenants in memory
|
||||
// and create transactions as needed
|
||||
Tenant tenants[args.tenants];
|
||||
for (int i = 0; i < args.tenants; ++i) {
|
||||
std::string tenantStr = "tenant" + std::to_string(i);
|
||||
BytesRef tenant_name = toBytesRef(tenantStr);
|
||||
tenants[i] = db.openTenant(tenant_name);
|
||||
}
|
||||
int populate_iters = args.tenants > 0 ? args.tenants : 1;
|
||||
// Each tenant should have the same range populated
|
||||
for (auto t_id = 0; t_id < populate_iters; ++t_id) {
|
||||
Transaction tx = createNewTransaction(db, args, t_id, args.tenants > 0 ? tenants : nullptr);
|
||||
const auto key_begin = insertBegin(args.rows, worker_id, thread_id, args.num_processes, args.num_threads);
|
||||
const auto key_end = insertEnd(args.rows, worker_id, thread_id, args.num_processes, args.num_threads);
|
||||
auto key_checkpoint = key_begin; // in case of commit failure, restart from this key
|
||||
for (auto i = key_begin; i <= key_end; i++) {
|
||||
/* sequential keys */
|
||||
genKey(keystr.data(), KEY_PREFIX, args, i);
|
||||
/* random values */
|
||||
randomString(valstr.data(), args.value_length);
|
||||
|
||||
while (thread_tps > 0 && xacts >= thread_tps /* throttle */) {
|
||||
if (toIntegerSeconds(watch_throttle.stop().diff()) >= 1) {
|
||||
xacts = 0;
|
||||
watch_throttle.startFromStop();
|
||||
} else {
|
||||
usleep(1000);
|
||||
}
|
||||
}
|
||||
if (num_seconds_trace_every) {
|
||||
if (toIntegerSeconds(watch_trace.stop().diff()) >= num_seconds_trace_every) {
|
||||
watch_trace.startFromStop();
|
||||
logr.debug("txn tracing {}", toCharsRef(keystr));
|
||||
auto err = Error{};
|
||||
err = tx.setOptionNothrow(FDB_TR_OPTION_DEBUG_TRANSACTION_IDENTIFIER, keystr);
|
||||
if (err) {
|
||||
logr.error("setOption(TR_OPTION_DEBUG_TRANSACTION_IDENTIFIER): {}", err.what());
|
||||
}
|
||||
err = tx.setOptionNothrow(FDB_TR_OPTION_LOG_TRANSACTION, BytesRef());
|
||||
if (err) {
|
||||
logr.error("setOption(TR_OPTION_LOG_TRANSACTION): {}", err.what());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* insert (SET) */
|
||||
tx.set(keystr, valstr);
|
||||
stats.incrOpCount(OP_INSERT);
|
||||
|
||||
/* commit every 100 inserts (default) or if this is the last key */
|
||||
if ((i % num_commit_every == 0) || i == key_end) {
|
||||
const auto do_sample = (stats.getOpCount(OP_TRANSACTION) % args.sampling) == 0;
|
||||
auto watch_commit = Stopwatch(StartAtCtor{});
|
||||
auto future_commit = tx.commit();
|
||||
const auto rc = waitAndHandleError(tx, future_commit, "COMMIT_POPULATE_INSERT");
|
||||
watch_commit.stop();
|
||||
watch_tx.setStop(watch_commit.getStop());
|
||||
auto tx_restarter = ExitGuard([&watch_tx]() { watch_tx.startFromStop(); });
|
||||
if (rc == FutureRC::OK) {
|
||||
key_checkpoint = i + 1; // restart on failures from next key
|
||||
tx = createNewTransaction(db, args, t_id, args.tenants > 0 ? tenants : nullptr);
|
||||
} else if (rc == FutureRC::ABORT) {
|
||||
return -1;
|
||||
} else {
|
||||
i = key_checkpoint - 1; // restart from last committed
|
||||
continue;
|
||||
}
|
||||
/* xact latency stats */
|
||||
if (do_sample) {
|
||||
const auto commit_latency = watch_commit.diff();
|
||||
const auto tx_duration = watch_tx.diff();
|
||||
stats.addLatency(OP_COMMIT, commit_latency);
|
||||
stats.addLatency(OP_TRANSACTION, tx_duration);
|
||||
}
|
||||
stats.incrOpCount(OP_COMMIT);
|
||||
stats.incrOpCount(OP_TRANSACTION);
|
||||
|
||||
xacts++; /* for throttling */
|
||||
}
|
||||
}
|
||||
logr.debug("Populated {} rows [{}, {}]: {:6.3f} sec",
|
||||
key_end - key_begin + 1,
|
||||
key_begin,
|
||||
key_end,
|
||||
toDoubleSeconds(watch_total.stop().diff()));
|
||||
}
|
||||
logr.debug("Populated {} rows [{}, {}]: {:6.3f} sec",
|
||||
key_end - key_begin + 1,
|
||||
key_begin,
|
||||
key_end,
|
||||
toDoubleSeconds(watch_total.stop().diff()));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -2096,6 +2108,10 @@ int main(int argc, char* argv[]) {
|
|||
/* usage printed */
|
||||
return 0;
|
||||
}
|
||||
if (args.tenants > 1) {
|
||||
args.rows = args.rows / args.tenants;
|
||||
args.row_digits = digits(args.rows);
|
||||
}
|
||||
|
||||
rc = validateArguments(args);
|
||||
if (rc < 0)
|
||||
|
|
Loading…
Reference in New Issue