Let OP_TRANSACTION to measure one iteration of '-x'

This commit is contained in:
Junhyun Shim 2022-04-20 04:27:16 +02:00
parent 0cf1349c37
commit 278009e1c8
6 changed files with 46 additions and 81 deletions

View File

@ -65,19 +65,16 @@ void ResumableStateForPopulate::runOneTick() {
// successfully committed
watch_commit.stop();
watch_tx.setStop(watch_commit.getStop());
if (stats.getOpCount(OP_TASK) % args.sampling == 0) {
if (stats.getOpCount(OP_TRANSACTION) % args.sampling == 0) {
const auto commit_latency = watch_commit.diff();
const auto tx_duration = watch_tx.diff();
stats.addLatency(OP_COMMIT, commit_latency);
stats.addLatency(OP_TRANSACTION, tx_duration);
stats.addLatency(OP_TASK, tx_duration);
sample_bins[OP_COMMIT].put(commit_latency);
sample_bins[OP_TRANSACTION].put(tx_duration);
sample_bins[OP_TASK].put(tx_duration);
}
stats.incrOpCount(OP_COMMIT);
stats.incrOpCount(OP_TRANSACTION);
stats.incrOpCount(OP_TASK);
tx.reset();
watch_tx.startFromStop();
key_checkpoint = i + 1;
@ -119,7 +116,7 @@ repeat_immediate_steps:
updateStepStats();
iter = getOpNext(args, iter);
if (iter == OpEnd)
onTaskSuccess();
onTransactionSuccess();
else
goto repeat_immediate_steps;
} else {
@ -156,7 +153,7 @@ repeat_immediate_steps:
updateStepStats();
iter = getOpNext(args, iter);
if (iter == OpEnd) {
onTaskSuccess();
onTransactionSuccess();
} else {
postNextTick();
}
@ -186,22 +183,16 @@ void ResumableStateForRunWorkload::updateStepStats() {
logr.debug("Step {}:{} succeeded", iter.opName(), iter.step);
// step successful
watch_step.stop();
const auto do_sample = stats.getOpCount(OP_TASK) % args.sampling == 0;
const auto do_sample = stats.getOpCount(OP_TRANSACTION) % args.sampling == 0;
if (iter.stepKind() == StepKind::COMMIT) {
// reset transaction boundary
const auto step_latency = watch_step.diff();
watch_tx.setStop(watch_step.getStop());
if (do_sample) {
const auto tx_duration = watch_tx.diff();
stats.addLatency(OP_COMMIT, step_latency);
stats.addLatency(OP_TRANSACTION, tx_duration);
sample_bins[OP_COMMIT].put(step_latency);
sample_bins[OP_TRANSACTION].put(tx_duration);
}
tx.reset();
watch_tx.startFromStop();
stats.incrOpCount(OP_COMMIT);
stats.incrOpCount(OP_TRANSACTION);
needs_commit = false;
}
// op completed successfully
@ -218,7 +209,7 @@ void ResumableStateForRunWorkload::updateStepStats() {
}
}
void ResumableStateForRunWorkload::onTaskSuccess() {
void ResumableStateForRunWorkload::onTransactionSuccess() {
if (needs_commit || args.commit_get) {
// task completed, need to commit before finish
watch_commit.start();
@ -251,24 +242,18 @@ void ResumableStateForRunWorkload::onTaskSuccess() {
// commit successful
watch_commit.stop();
watch_tx.setStop(watch_commit.getStop());
watch_task.setStop(watch_commit.getStop());
if (stats.getOpCount(OP_TASK) % args.sampling == 0) {
if (stats.getOpCount(OP_TRANSACTION) % args.sampling == 0) {
const auto commit_latency = watch_commit.diff();
const auto tx_duration = watch_tx.diff();
const auto task_duration = watch_task.diff();
stats.addLatency(OP_COMMIT, commit_latency);
stats.addLatency(OP_TRANSACTION, commit_latency);
stats.addLatency(OP_TASK, task_duration);
sample_bins[OP_COMMIT].put(commit_latency);
sample_bins[OP_TRANSACTION].put(tx_duration);
sample_bins[OP_TASK].put(task_duration);
}
stats.incrOpCount(OP_COMMIT);
stats.incrOpCount(OP_TRANSACTION);
stats.incrOpCount(OP_TASK);
tx.reset();
watch_tx.startFromStop();
watch_task.startFromStop();
if (ended()) {
signalEnd();
} else {
@ -279,15 +264,15 @@ void ResumableStateForRunWorkload::onTaskSuccess() {
}
});
} else {
// task completed but no need to commit
watch_task.stop();
if (stats.getOpCount(OP_TASK) % args.sampling == 0) {
const auto task_duration = watch_task.diff();
stats.addLatency(OP_TASK, task_duration);
sample_bins[OP_TASK].put(task_duration);
// transaction completed but no need to commit
watch_tx.stop();
if (stats.getOpCount(OP_TRANSACTION) % args.sampling == 0) {
const auto tx_duration = watch_tx.diff();
stats.addLatency(OP_TRANSACTION, tx_duration);
sample_bins[OP_TRANSACTION].put(tx_duration);
}
stats.incrOpCount(OP_TASK);
watch_task.startFromStop();
stats.incrOpCount(OP_TRANSACTION);
watch_tx.startFromStop();
tx.reset();
if (ended()) {
signalEnd();

View File

@ -91,7 +91,6 @@ struct ResumableStateForRunWorkload : std::enable_shared_from_this<ResumableStat
Stopwatch watch_op;
Stopwatch watch_commit;
Stopwatch watch_tx;
Stopwatch watch_task;
bool needs_commit;
ResumableStateForRunWorkload(Logger logr,
@ -112,12 +111,12 @@ struct ResumableStateForRunWorkload : std::enable_shared_from_this<ResumableStat
}
void signalEnd() noexcept { stopcount.fetch_add(1); }
bool ended() noexcept {
return (max_iters != -1 && max_iters >= stats.getOpCount(OP_TASK)) || signal.load() == SIGNAL_RED;
return (max_iters != -1 && max_iters >= stats.getOpCount(OP_TRANSACTION)) || signal.load() == SIGNAL_RED;
}
void postNextTick();
void runOneTick();
void updateStepStats();
void onTaskSuccess();
void onTransactionSuccess();
};
using RunWorkloadStateHandle = std::shared_ptr<ResumableStateForRunWorkload>;

View File

@ -179,7 +179,7 @@ int populate(Transaction tx,
/* commit every 100 inserts (default) or if this is the last key */
if (i == key_end || (i - key_begin + 1) % num_commit_every == 0) {
const auto do_sample = (stats.getOpCount(OP_TASK) % args.sampling) == 0;
const auto do_sample = (stats.getOpCount(OP_TRANSACTION) % args.sampling) == 0;
auto watch_commit = Stopwatch(StartAtCtor{});
auto future_commit = tx.commit();
const auto rc = waitAndHandleError(tx, future_commit, "COMMIT_POPULATE_INSERT");
@ -207,7 +207,6 @@ int populate(Transaction tx,
stats.incrOpCount(OP_COMMIT);
stats.incrOpCount(OP_TRANSACTION);
stats.incrOpCount(OP_TASK);
xacts++; /* for throttling */
}
}
@ -220,22 +219,21 @@ int populate(Transaction tx,
return 0;
}
/* run one iteration of configured task */
int runOneTask(Transaction& tx,
Arguments const& args,
ThreadStatistics& stats,
LatencySampleBinArray& sample_bins,
ByteString& key1,
ByteString& key2,
ByteString& val) {
const auto do_sample = (stats.getOpCount(OP_TASK) % args.sampling) == 0;
auto watch_task = Stopwatch(StartAtCtor{});
auto watch_tx = Stopwatch(watch_task.getStart());
/* run one iteration of configured transaction */
int runOneTransaction(Transaction& tx,
Arguments const& args,
ThreadStatistics& stats,
LatencySampleBinArray& sample_bins,
ByteString& key1,
ByteString& key2,
ByteString& val) {
const auto do_sample = (stats.getOpCount(OP_TRANSACTION) % args.sampling) == 0;
auto watch_tx = Stopwatch(StartAtCtor{});
auto watch_op = Stopwatch{};
auto op_iter = getOpBegin(args);
auto needs_commit = false;
task_begin:
transaction_begin:
while (op_iter != OpEnd) {
const auto [op, count, step] = op_iter;
const auto step_kind = opTable[op].stepKind(step);
@ -274,17 +272,11 @@ task_begin:
// reset transaction boundary
if (do_sample) {
const auto step_latency = watch_step.diff();
watch_tx.setStop(watch_step.getStop());
const auto tx_duration = watch_tx.diff();
stats.addLatency(OP_COMMIT, step_latency);
stats.addLatency(OP_TRANSACTION, tx_duration);
sample_bins[OP_COMMIT].put(step_latency);
sample_bins[OP_TRANSACTION].put(tx_duration);
watch_tx.startFromStop(); // new tx begins
}
tx.reset();
stats.incrOpCount(OP_COMMIT);
stats.incrOpCount(OP_TRANSACTION);
needs_commit = false;
}
@ -309,22 +301,14 @@ task_begin:
auto f = tx.commit();
const auto rc = waitAndHandleError(tx, f, "COMMIT_AT_TX_END");
watch_commit.stop();
watch_tx.setStop(watch_commit.getStop());
auto tx_resetter = ExitGuard([&watch_tx, &tx]() {
tx.reset();
watch_tx.startFromStop();
});
auto tx_resetter = ExitGuard([&tx]() { tx.reset(); });
if (rc == FutureRC::OK) {
if (do_sample) {
const auto commit_latency = watch_commit.diff();
const auto tx_duration = watch_tx.diff();
stats.addLatency(OP_COMMIT, commit_latency);
stats.addLatency(OP_TRANSACTION, tx_duration);
sample_bins[OP_COMMIT].put(commit_latency);
sample_bins[OP_TRANSACTION].put(tx_duration);
}
stats.incrOpCount(OP_COMMIT);
stats.incrOpCount(OP_TRANSACTION);
} else {
if (rc == FutureRC::CONFLICT)
stats.incrConflictCount();
@ -335,16 +319,16 @@ task_begin:
}
// restart from beginning
op_iter = getOpBegin(args);
goto task_begin;
goto transaction_begin;
}
}
// one task iteration has completed successfully
// one transaction has completed successfully
if (do_sample) {
const auto task_duration = watch_task.stop().diff();
sample_bins[OP_TASK].put(task_duration);
stats.addLatency(OP_TASK, task_duration);
const auto tx_duration = watch_tx.stop().diff();
sample_bins[OP_TRANSACTION].put(tx_duration);
stats.addLatency(OP_TRANSACTION, tx_duration);
}
stats.incrOpCount(OP_TASK);
stats.incrOpCount(OP_TRANSACTION);
/* make sure to reset transaction */
tx.reset();
return 0;
@ -439,13 +423,13 @@ int runWorkload(Transaction tx,
}
}
rc = runOneTask(tx, args, stats, sample_bins, key1, key2, val);
rc = runOneTransaction(tx, args, stats, sample_bins, key1, key2, val);
if (rc) {
logr.warn("runOneTask failed ({})", rc);
logr.warn("runOneTransaction failed ({})", rc);
}
if (thread_iters != -1) {
if (thread_iters >= xacts) {
if (thread_iters >= total_xacts) {
/* xact limit reached */
break;
}
@ -552,7 +536,6 @@ void runAsyncWorkload(Arguments const& args,
max_iters,
getOpBegin(args));
states[i] = state;
state->watch_task.start();
state->watch_tx.start();
}
while (shm.headerConst().signal.load() != SIGNAL_GREEN)
@ -1356,7 +1339,7 @@ void printStats(Arguments const& args, ThreadStatistics const* stats, double con
}
}
/* TPS */
const auto tps = (current.getOpCount(OP_TASK) - prev.getOpCount(OP_TASK)) / duration_sec;
const auto tps = (current.getOpCount(OP_TRANSACTION) - prev.getOpCount(OP_TRANSACTION)) / duration_sec;
putFieldFloat(tps, 2);
if (fp) {
fprintf(fp, "\"tps\": %.2f,", tps);
@ -1473,9 +1456,9 @@ void printReport(Arguments const& args,
break;
}
}
const auto tps_f = final_stats.getOpCount(OP_TASK) / duration_sec;
const auto tps_f = final_stats.getOpCount(OP_TRANSACTION) / duration_sec;
const auto tps_i = static_cast<uint64_t>(tps_f);
fmt::printf("Total Xacts: %8lu\n", final_stats.getOpCount(OP_TASK));
fmt::printf("Total Xacts: %8lu\n", final_stats.getOpCount(OP_TRANSACTION));
fmt::printf("Total Conflicts: %8lu\n", final_stats.getConflictCount());
fmt::printf("Total Errors: %8lu\n", final_stats.getTotalErrorCount());
fmt::printf("Overall TPS: %8lu\n\n", tps_i);
@ -1487,7 +1470,7 @@ void printReport(Arguments const& args,
fmt::fprintf(fp, "\"totalThreads\": %d,", args.num_threads);
fmt::fprintf(fp, "\"totalAsyncXacts\": %d,", args.async_xacts);
fmt::fprintf(fp, "\"targetTPS\": %d,", args.tpsmax);
fmt::fprintf(fp, "\"totalXacts\": %lu,", final_stats.getOpCount(OP_TASK));
fmt::fprintf(fp, "\"totalXacts\": %lu,", final_stats.getOpCount(OP_TRANSACTION));
fmt::fprintf(fp, "\"totalConflicts\": %lu,", final_stats.getConflictCount());
fmt::fprintf(fp, "\"totalErrors\": %lu,", final_stats.getTotalErrorCount());
fmt::fprintf(fp, "\"overallTPS\": %lu,", tps_i);
@ -1503,7 +1486,7 @@ void printReport(Arguments const& args,
}
auto first_op = true;
for (auto op = 0; op < MAX_OP; op++) {
if ((args.txnspec.ops[op][OP_COUNT] > 0 && op != OP_TASK && op != OP_TRANSACTION) || op == OP_COMMIT) {
if ((args.txnspec.ops[op][OP_COUNT] > 0 && op != OP_TRANSACTION) || op == OP_COMMIT) {
putField(final_stats.getOpCount(op));
if (fp) {
if (first_op) {
@ -1517,7 +1500,7 @@ void printReport(Arguments const& args,
}
/* TPS */
const auto tps = final_stats.getOpCount(OP_TASK) / duration_sec;
const auto tps = final_stats.getOpCount(OP_TRANSACTION) / duration_sec;
putFieldFloat(tps, 2);
/* Conflicts */

View File

@ -97,8 +97,7 @@ enum OpKind {
OP_CLEARRANGE,
OP_SETCLEARRANGE,
OP_COMMIT,
OP_TRANSACTION, /* pseudo-operation - cumulative time for the operation + commit */
OP_TASK, /* pseudo-operation - cumulative time for each iteraton in runWorkload */
OP_TRANSACTION, /* pseudo-operation - time it takes to run one iteration of ops sequence */
OP_READ_BG,
MAX_OP /* must be the last item */
};

View File

@ -275,7 +275,6 @@ const std::array<Operation, MAX_OP> opTable{
true },
{ "COMMIT", { { StepKind::NONE, nullptr } }, 0, false },
{ "TRANSACTION", { { StepKind::NONE, nullptr } }, 0, false },
{ "TASK", { { StepKind::NONE, nullptr } }, 0, false },
{ "READBLOBGRANULE",
{ { StepKind::ON_ERROR,
[](Transaction& tx, Arguments const& args, ByteString& begin, ByteString& end, ByteString&) {

View File

@ -48,7 +48,7 @@ enum class StepKind {
// Ops that doesn't have concrete steps to execute and are there for measurements only
force_inline bool isAbstractOp(int op) noexcept {
return op == OP_COMMIT || op == OP_TRANSACTION; // || op == OP_TASK;
return op == OP_COMMIT || op == OP_TRANSACTION;
}
using StepFunction = fdb::Future (*)(fdb::Transaction& tx,