From 278009e1c897b6b920153f1132b4c577aaad52e8 Mon Sep 17 00:00:00 2001 From: Junhyun Shim Date: Wed, 20 Apr 2022 04:27:16 +0200 Subject: [PATCH] Let OP_TRANSACTION to measure one iteration of '-x' --- bindings/c/test/mako/async.cpp | 43 ++++++----------- bindings/c/test/mako/async.hpp | 5 +- bindings/c/test/mako/mako.cpp | 73 +++++++++++------------------ bindings/c/test/mako/mako.hpp | 3 +- bindings/c/test/mako/operations.cpp | 1 - bindings/c/test/mako/operations.hpp | 2 +- 6 files changed, 46 insertions(+), 81 deletions(-) diff --git a/bindings/c/test/mako/async.cpp b/bindings/c/test/mako/async.cpp index 14077f8735..90f4090655 100644 --- a/bindings/c/test/mako/async.cpp +++ b/bindings/c/test/mako/async.cpp @@ -65,19 +65,16 @@ void ResumableStateForPopulate::runOneTick() { // successfully committed watch_commit.stop(); watch_tx.setStop(watch_commit.getStop()); - if (stats.getOpCount(OP_TASK) % args.sampling == 0) { + if (stats.getOpCount(OP_TRANSACTION) % args.sampling == 0) { const auto commit_latency = watch_commit.diff(); const auto tx_duration = watch_tx.diff(); stats.addLatency(OP_COMMIT, commit_latency); stats.addLatency(OP_TRANSACTION, tx_duration); - stats.addLatency(OP_TASK, tx_duration); sample_bins[OP_COMMIT].put(commit_latency); sample_bins[OP_TRANSACTION].put(tx_duration); - sample_bins[OP_TASK].put(tx_duration); } stats.incrOpCount(OP_COMMIT); stats.incrOpCount(OP_TRANSACTION); - stats.incrOpCount(OP_TASK); tx.reset(); watch_tx.startFromStop(); key_checkpoint = i + 1; @@ -119,7 +116,7 @@ repeat_immediate_steps: updateStepStats(); iter = getOpNext(args, iter); if (iter == OpEnd) - onTaskSuccess(); + onTransactionSuccess(); else goto repeat_immediate_steps; } else { @@ -156,7 +153,7 @@ repeat_immediate_steps: updateStepStats(); iter = getOpNext(args, iter); if (iter == OpEnd) { - onTaskSuccess(); + onTransactionSuccess(); } else { postNextTick(); } @@ -186,22 +183,16 @@ void ResumableStateForRunWorkload::updateStepStats() { logr.debug("Step {}:{} succeeded", iter.opName(), iter.step); // step successful watch_step.stop(); - const auto do_sample = stats.getOpCount(OP_TASK) % args.sampling == 0; + const auto do_sample = stats.getOpCount(OP_TRANSACTION) % args.sampling == 0; if (iter.stepKind() == StepKind::COMMIT) { // reset transaction boundary const auto step_latency = watch_step.diff(); - watch_tx.setStop(watch_step.getStop()); if (do_sample) { - const auto tx_duration = watch_tx.diff(); stats.addLatency(OP_COMMIT, step_latency); - stats.addLatency(OP_TRANSACTION, tx_duration); sample_bins[OP_COMMIT].put(step_latency); - sample_bins[OP_TRANSACTION].put(tx_duration); } tx.reset(); - watch_tx.startFromStop(); stats.incrOpCount(OP_COMMIT); - stats.incrOpCount(OP_TRANSACTION); needs_commit = false; } // op completed successfully @@ -218,7 +209,7 @@ void ResumableStateForRunWorkload::updateStepStats() { } } -void ResumableStateForRunWorkload::onTaskSuccess() { +void ResumableStateForRunWorkload::onTransactionSuccess() { if (needs_commit || args.commit_get) { // task completed, need to commit before finish watch_commit.start(); @@ -251,24 +242,18 @@ void ResumableStateForRunWorkload::onTaskSuccess() { // commit successful watch_commit.stop(); watch_tx.setStop(watch_commit.getStop()); - watch_task.setStop(watch_commit.getStop()); - if (stats.getOpCount(OP_TASK) % args.sampling == 0) { + if (stats.getOpCount(OP_TRANSACTION) % args.sampling == 0) { const auto commit_latency = watch_commit.diff(); const auto tx_duration = watch_tx.diff(); - const auto task_duration = watch_task.diff(); stats.addLatency(OP_COMMIT, commit_latency); stats.addLatency(OP_TRANSACTION, commit_latency); - stats.addLatency(OP_TASK, task_duration); sample_bins[OP_COMMIT].put(commit_latency); sample_bins[OP_TRANSACTION].put(tx_duration); - sample_bins[OP_TASK].put(task_duration); } stats.incrOpCount(OP_COMMIT); stats.incrOpCount(OP_TRANSACTION); - stats.incrOpCount(OP_TASK); tx.reset(); watch_tx.startFromStop(); - watch_task.startFromStop(); if (ended()) { signalEnd(); } else { @@ -279,15 +264,15 @@ void ResumableStateForRunWorkload::onTaskSuccess() { } }); } else { - // task completed but no need to commit - watch_task.stop(); - if (stats.getOpCount(OP_TASK) % args.sampling == 0) { - const auto task_duration = watch_task.diff(); - stats.addLatency(OP_TASK, task_duration); - sample_bins[OP_TASK].put(task_duration); + // transaction completed but no need to commit + watch_tx.stop(); + if (stats.getOpCount(OP_TRANSACTION) % args.sampling == 0) { + const auto tx_duration = watch_tx.diff(); + stats.addLatency(OP_TRANSACTION, tx_duration); + sample_bins[OP_TRANSACTION].put(tx_duration); } - stats.incrOpCount(OP_TASK); - watch_task.startFromStop(); + stats.incrOpCount(OP_TRANSACTION); + watch_tx.startFromStop(); tx.reset(); if (ended()) { signalEnd(); diff --git a/bindings/c/test/mako/async.hpp b/bindings/c/test/mako/async.hpp index 7aea0120e2..a5025202d8 100644 --- a/bindings/c/test/mako/async.hpp +++ b/bindings/c/test/mako/async.hpp @@ -91,7 +91,6 @@ struct ResumableStateForRunWorkload : std::enable_shared_from_this= stats.getOpCount(OP_TASK)) || signal.load() == SIGNAL_RED; + return (max_iters != -1 && max_iters >= stats.getOpCount(OP_TRANSACTION)) || signal.load() == SIGNAL_RED; } void postNextTick(); void runOneTick(); void updateStepStats(); - void onTaskSuccess(); + void onTransactionSuccess(); }; using RunWorkloadStateHandle = std::shared_ptr; diff --git a/bindings/c/test/mako/mako.cpp b/bindings/c/test/mako/mako.cpp index 18e775c2f6..865e12c329 100644 --- a/bindings/c/test/mako/mako.cpp +++ b/bindings/c/test/mako/mako.cpp @@ -179,7 +179,7 @@ int populate(Transaction tx, /* commit every 100 inserts (default) or if this is the last key */ if (i == key_end || (i - key_begin + 1) % num_commit_every == 0) { - const auto do_sample = (stats.getOpCount(OP_TASK) % args.sampling) == 0; + const auto do_sample = (stats.getOpCount(OP_TRANSACTION) % args.sampling) == 0; auto watch_commit = Stopwatch(StartAtCtor{}); auto future_commit = tx.commit(); const auto rc = waitAndHandleError(tx, future_commit, "COMMIT_POPULATE_INSERT"); @@ -207,7 +207,6 @@ int populate(Transaction tx, stats.incrOpCount(OP_COMMIT); stats.incrOpCount(OP_TRANSACTION); - stats.incrOpCount(OP_TASK); xacts++; /* for throttling */ } } @@ -220,22 +219,21 @@ int populate(Transaction tx, return 0; } -/* run one iteration of configured task */ -int runOneTask(Transaction& tx, - Arguments const& args, - ThreadStatistics& stats, - LatencySampleBinArray& sample_bins, - ByteString& key1, - ByteString& key2, - ByteString& val) { - const auto do_sample = (stats.getOpCount(OP_TASK) % args.sampling) == 0; - auto watch_task = Stopwatch(StartAtCtor{}); - auto watch_tx = Stopwatch(watch_task.getStart()); +/* run one iteration of configured transaction */ +int runOneTransaction(Transaction& tx, + Arguments const& args, + ThreadStatistics& stats, + LatencySampleBinArray& sample_bins, + ByteString& key1, + ByteString& key2, + ByteString& val) { + const auto do_sample = (stats.getOpCount(OP_TRANSACTION) % args.sampling) == 0; + auto watch_tx = Stopwatch(StartAtCtor{}); auto watch_op = Stopwatch{}; auto op_iter = getOpBegin(args); auto needs_commit = false; -task_begin: +transaction_begin: while (op_iter != OpEnd) { const auto [op, count, step] = op_iter; const auto step_kind = opTable[op].stepKind(step); @@ -274,17 +272,11 @@ task_begin: // reset transaction boundary if (do_sample) { const auto step_latency = watch_step.diff(); - watch_tx.setStop(watch_step.getStop()); - const auto tx_duration = watch_tx.diff(); stats.addLatency(OP_COMMIT, step_latency); - stats.addLatency(OP_TRANSACTION, tx_duration); sample_bins[OP_COMMIT].put(step_latency); - sample_bins[OP_TRANSACTION].put(tx_duration); - watch_tx.startFromStop(); // new tx begins } tx.reset(); stats.incrOpCount(OP_COMMIT); - stats.incrOpCount(OP_TRANSACTION); needs_commit = false; } @@ -309,22 +301,14 @@ task_begin: auto f = tx.commit(); const auto rc = waitAndHandleError(tx, f, "COMMIT_AT_TX_END"); watch_commit.stop(); - watch_tx.setStop(watch_commit.getStop()); - auto tx_resetter = ExitGuard([&watch_tx, &tx]() { - tx.reset(); - watch_tx.startFromStop(); - }); + auto tx_resetter = ExitGuard([&tx]() { tx.reset(); }); if (rc == FutureRC::OK) { if (do_sample) { const auto commit_latency = watch_commit.diff(); - const auto tx_duration = watch_tx.diff(); stats.addLatency(OP_COMMIT, commit_latency); - stats.addLatency(OP_TRANSACTION, tx_duration); sample_bins[OP_COMMIT].put(commit_latency); - sample_bins[OP_TRANSACTION].put(tx_duration); } stats.incrOpCount(OP_COMMIT); - stats.incrOpCount(OP_TRANSACTION); } else { if (rc == FutureRC::CONFLICT) stats.incrConflictCount(); @@ -335,16 +319,16 @@ task_begin: } // restart from beginning op_iter = getOpBegin(args); - goto task_begin; + goto transaction_begin; } } - // one task iteration has completed successfully + // one transaction has completed successfully if (do_sample) { - const auto task_duration = watch_task.stop().diff(); - sample_bins[OP_TASK].put(task_duration); - stats.addLatency(OP_TASK, task_duration); + const auto tx_duration = watch_tx.stop().diff(); + sample_bins[OP_TRANSACTION].put(tx_duration); + stats.addLatency(OP_TRANSACTION, tx_duration); } - stats.incrOpCount(OP_TASK); + stats.incrOpCount(OP_TRANSACTION); /* make sure to reset transaction */ tx.reset(); return 0; @@ -439,13 +423,13 @@ int runWorkload(Transaction tx, } } - rc = runOneTask(tx, args, stats, sample_bins, key1, key2, val); + rc = runOneTransaction(tx, args, stats, sample_bins, key1, key2, val); if (rc) { - logr.warn("runOneTask failed ({})", rc); + logr.warn("runOneTransaction failed ({})", rc); } if (thread_iters != -1) { - if (thread_iters >= xacts) { + if (thread_iters >= total_xacts) { /* xact limit reached */ break; } @@ -552,7 +536,6 @@ void runAsyncWorkload(Arguments const& args, max_iters, getOpBegin(args)); states[i] = state; - state->watch_task.start(); state->watch_tx.start(); } while (shm.headerConst().signal.load() != SIGNAL_GREEN) @@ -1356,7 +1339,7 @@ void printStats(Arguments const& args, ThreadStatistics const* stats, double con } } /* TPS */ - const auto tps = (current.getOpCount(OP_TASK) - prev.getOpCount(OP_TASK)) / duration_sec; + const auto tps = (current.getOpCount(OP_TRANSACTION) - prev.getOpCount(OP_TRANSACTION)) / duration_sec; putFieldFloat(tps, 2); if (fp) { fprintf(fp, "\"tps\": %.2f,", tps); @@ -1473,9 +1456,9 @@ void printReport(Arguments const& args, break; } } - const auto tps_f = final_stats.getOpCount(OP_TASK) / duration_sec; + const auto tps_f = final_stats.getOpCount(OP_TRANSACTION) / duration_sec; const auto tps_i = static_cast(tps_f); - fmt::printf("Total Xacts: %8lu\n", final_stats.getOpCount(OP_TASK)); + fmt::printf("Total Xacts: %8lu\n", final_stats.getOpCount(OP_TRANSACTION)); fmt::printf("Total Conflicts: %8lu\n", final_stats.getConflictCount()); fmt::printf("Total Errors: %8lu\n", final_stats.getTotalErrorCount()); fmt::printf("Overall TPS: %8lu\n\n", tps_i); @@ -1487,7 +1470,7 @@ void printReport(Arguments const& args, fmt::fprintf(fp, "\"totalThreads\": %d,", args.num_threads); fmt::fprintf(fp, "\"totalAsyncXacts\": %d,", args.async_xacts); fmt::fprintf(fp, "\"targetTPS\": %d,", args.tpsmax); - fmt::fprintf(fp, "\"totalXacts\": %lu,", final_stats.getOpCount(OP_TASK)); + fmt::fprintf(fp, "\"totalXacts\": %lu,", final_stats.getOpCount(OP_TRANSACTION)); fmt::fprintf(fp, "\"totalConflicts\": %lu,", final_stats.getConflictCount()); fmt::fprintf(fp, "\"totalErrors\": %lu,", final_stats.getTotalErrorCount()); fmt::fprintf(fp, "\"overallTPS\": %lu,", tps_i); @@ -1503,7 +1486,7 @@ void printReport(Arguments const& args, } auto first_op = true; for (auto op = 0; op < MAX_OP; op++) { - if ((args.txnspec.ops[op][OP_COUNT] > 0 && op != OP_TASK && op != OP_TRANSACTION) || op == OP_COMMIT) { + if ((args.txnspec.ops[op][OP_COUNT] > 0 && op != OP_TRANSACTION) || op == OP_COMMIT) { putField(final_stats.getOpCount(op)); if (fp) { if (first_op) { @@ -1517,7 +1500,7 @@ void printReport(Arguments const& args, } /* TPS */ - const auto tps = final_stats.getOpCount(OP_TASK) / duration_sec; + const auto tps = final_stats.getOpCount(OP_TRANSACTION) / duration_sec; putFieldFloat(tps, 2); /* Conflicts */ diff --git a/bindings/c/test/mako/mako.hpp b/bindings/c/test/mako/mako.hpp index 4c56f33676..7c9c902b25 100644 --- a/bindings/c/test/mako/mako.hpp +++ b/bindings/c/test/mako/mako.hpp @@ -97,8 +97,7 @@ enum OpKind { OP_CLEARRANGE, OP_SETCLEARRANGE, OP_COMMIT, - OP_TRANSACTION, /* pseudo-operation - cumulative time for the operation + commit */ - OP_TASK, /* pseudo-operation - cumulative time for each iteraton in runWorkload */ + OP_TRANSACTION, /* pseudo-operation - time it takes to run one iteration of ops sequence */ OP_READ_BG, MAX_OP /* must be the last item */ }; diff --git a/bindings/c/test/mako/operations.cpp b/bindings/c/test/mako/operations.cpp index 34817dc4d7..46b34e8b50 100644 --- a/bindings/c/test/mako/operations.cpp +++ b/bindings/c/test/mako/operations.cpp @@ -275,7 +275,6 @@ const std::array opTable{ true }, { "COMMIT", { { StepKind::NONE, nullptr } }, 0, false }, { "TRANSACTION", { { StepKind::NONE, nullptr } }, 0, false }, - { "TASK", { { StepKind::NONE, nullptr } }, 0, false }, { "READBLOBGRANULE", { { StepKind::ON_ERROR, [](Transaction& tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) { diff --git a/bindings/c/test/mako/operations.hpp b/bindings/c/test/mako/operations.hpp index d973377aed..e029724de3 100644 --- a/bindings/c/test/mako/operations.hpp +++ b/bindings/c/test/mako/operations.hpp @@ -48,7 +48,7 @@ enum class StepKind { // Ops that doesn't have concrete steps to execute and are there for measurements only force_inline bool isAbstractOp(int op) noexcept { - return op == OP_COMMIT || op == OP_TRANSACTION; // || op == OP_TASK; + return op == OP_COMMIT || op == OP_TRANSACTION; } using StepFunction = fdb::Future (*)(fdb::Transaction& tx,