Perf-optimize synchronous execution path

- Wrap timer ops into if blocks
- Pull key/value buffer out to upper level function
This commit is contained in:
Junhyun Shim 2022-04-15 02:01:31 +02:00
parent a994588e72
commit e74a676e26
1 changed files with 50 additions and 31 deletions

View File

@ -200,30 +200,34 @@ int populate(Transaction tx,
return 0;
}
/* run one transaction */
int runOneTask(Transaction tx, Arguments const& args, ThreadStatistics& stats, LatencySampleBinArray& sample_bins) {
// reuse memory for keys to avoid realloc overhead
auto key1 = ByteString{};
key1.reserve(args.key_length);
auto key2 = ByteString{};
key2.reserve(args.key_length);
auto val = ByteString{};
val.reserve(args.value_length);
auto watch_tx = Stopwatch(StartAtCtor{});
auto watch_task = Stopwatch(watch_tx.getStart());
/* run one iteration of configured task */
int runOneTask(Transaction tx,
Arguments const& args,
ThreadStatistics& stats,
LatencySampleBinArray& sample_bins,
ByteString& key1,
ByteString& key2,
ByteString& val) {
const auto do_sample = (stats.getOpCount(OP_TASK) % args.sampling) == 0;
auto watch_task = Stopwatch{};
auto watch_tx = Stopwatch{};
auto watch_op = Stopwatch{};
if (do_sample) {
watch_tx.start();
watch_task = Stopwatch(watch_tx.getStart());
}
auto op_iter = getOpBegin(args);
auto needs_commit = false;
auto watch_per_op = std::array<Stopwatch, MAX_OP>{};
const auto do_sample = (stats.getOpCount(OP_TASK) % args.sampling) == 0;
while (op_iter != OpEnd) {
const auto [op, count, step] = op_iter;
const auto step_kind = opTable[op].stepKind(step);
auto watch_step = Stopwatch{};
watch_step.start();
if (step == 0 /* first step */)
watch_per_op[op] = Stopwatch(watch_step.getStart());
if (do_sample) {
watch_step.start();
if (step == 0 /* first step */)
watch_op = Stopwatch(watch_step.getStart());
}
auto f = opTable[op].stepFunction(step)(tx, args, key1, key2, val);
auto future_rc = FutureRC::OK;
if (f) {
@ -235,7 +239,8 @@ int runOneTask(Transaction tx, Arguments const& args, ThreadStatistics& stats, L
}
if (auto postStepFn = opTable[op].postStepFunction(step))
postStepFn(f, tx, args, key1, key2, val);
watch_step.stop();
if (do_sample)
watch_step.stop();
if (future_rc != FutureRC::OK) {
if (future_rc == FutureRC::CONFLICT) {
stats.incrConflictCount();
@ -254,17 +259,17 @@ int runOneTask(Transaction tx, Arguments const& args, ThreadStatistics& stats, L
// step successful
if (step_kind == StepKind::COMMIT) {
// reset transaction boundary
const auto step_latency = watch_step.diff();
watch_tx.setStop(watch_step.getStop());
if (do_sample) {
const auto step_latency = watch_step.diff();
watch_tx.setStop(watch_step.getStop());
const auto tx_duration = watch_tx.diff();
stats.addLatency(OP_COMMIT, step_latency);
stats.addLatency(OP_TRANSACTION, tx_duration);
sample_bins[OP_COMMIT].put(step_latency);
sample_bins[OP_TRANSACTION].put(tx_duration);
watch_tx.startFromStop(); // new tx begins
}
tx.reset();
watch_tx.startFromStop(); // new tx begins
stats.incrOpCount(OP_COMMIT);
stats.incrOpCount(OP_TRANSACTION);
needs_commit = false;
@ -274,9 +279,9 @@ int runOneTask(Transaction tx, Arguments const& args, ThreadStatistics& stats, L
if (step + 1 == opTable[op].steps() /* last step */) {
if (opTable[op].needsCommit())
needs_commit = true;
watch_per_op[op].setStop(watch_step.getStop());
if (do_sample) {
const auto op_latency = watch_per_op[op].diff();
watch_op.setStop(watch_step.getStop());
const auto op_latency = watch_op.diff();
stats.addLatency(op, op_latency);
sample_bins[op].put(op_latency);
}
@ -287,14 +292,19 @@ int runOneTask(Transaction tx, Arguments const& args, ThreadStatistics& stats, L
// reached the end?
if (op_iter == OpEnd && (needs_commit || args.commit_get)) {
auto watch_commit = Stopwatch(StartAtCtor{});
auto watch_commit = Stopwatch();
if (do_sample)
watch_commit.start();
auto f = tx.commit();
const auto rc = waitAndHandleError(tx, f, "COMMIT_AT_TX_END");
watch_commit.stop();
watch_tx.setStop(watch_commit.getStop());
auto tx_resetter = ExitGuard([&watch_tx, &tx]() {
if (do_sample) {
watch_commit.stop();
watch_tx.setStop(watch_commit.getStop());
}
auto tx_resetter = ExitGuard([&do_sample, &watch_tx, &tx]() {
tx.reset();
watch_tx.startFromStop();
if (do_sample)
watch_tx.startFromStop();
});
if (rc == FutureRC::OK) {
if (do_sample) {
@ -322,8 +332,8 @@ int runOneTask(Transaction tx, Arguments const& args, ThreadStatistics& stats, L
}
}
// one task iteration has completed successfully
const auto task_duration = watch_task.stop().diff();
if (stats.getOpCount(OP_TASK) % args.sampling == 0) {
if (do_sample) {
const auto task_duration = watch_task.stop().diff();
sample_bins[OP_TASK].put(task_duration);
stats.addLatency(OP_TASK, task_duration);
}
@ -363,6 +373,15 @@ int runWorkload(Transaction tx,
auto rc = 0;
auto xacts = 0;
auto total_xacts = int64_t{};
// reuse memory for keys to avoid realloc overhead
auto key1 = ByteString{};
key1.reserve(args.key_length);
auto key2 = ByteString{};
key2.reserve(args.key_length);
auto val = ByteString{};
val.reserve(args.value_length);
/* main transaction loop */
while (1) {
while ((thread_tps > 0) && (xacts >= current_tps)) {
@ -413,7 +432,7 @@ int runWorkload(Transaction tx,
}
}
rc = runOneTask(tx, args, stats, sample_bins);
rc = runOneTask(tx, args, stats, sample_bins, key1, key2, val);
if (rc) {
logr.warn("runOneTask failed ({})", rc);
}