Adhere to sampling interval argument

* stopwatch class for easier timepoint management
This commit is contained in:
Junhyun Shim 2022-03-05 01:39:19 +01:00
parent ad3e1b2de2
commit f958e1dc0a
3 changed files with 159 additions and 108 deletions

View File

@ -106,7 +106,7 @@ int cleanup(TX tx, mako_args_t const& args) {
genkeyprefix(endstr, KEY_PREFIX, args);
endstr.push_back(0xff);
const auto time_start = steady_clock::now();
auto watch = Stopwatch(start_at_ctor{});
while (true) {
tx.clear_range(beginstr, endstr);
@ -123,8 +123,7 @@ int cleanup(TX tx, mako_args_t const& args) {
}
tx.reset();
const auto time_end = steady_clock::now();
fmt::print(printme, "INFO: Clear range: {:6.3f} sec\n", to_double_seconds(time_end - time_start));
fmt::print(printme, "INFO: Clear range: {:6.3f} sec\n", to_double_seconds(watch.stop().diff()));
return 0;
}
@ -144,11 +143,11 @@ int populate(TX tx,
auto valstr = ByteString{};
keystr.reserve(args.key_length);
valstr.reserve(args.value_length);
const auto time_start = steady_clock::now();
const auto num_commit_every = args.txnspec.ops[OP_INSERT][OP_COUNT];
auto time_prev = time_start; // for throttling
auto time_tx_start = time_start;
auto time_last_traced = time_start;
auto watch_total = Stopwatch(start_at_ctor{});
auto watch_throttle = Stopwatch(watch_total.get_start());
auto watch_tx = Stopwatch(watch_total.get_start());
auto watch_trace = Stopwatch(watch_total.get_start());
auto key_checkpoint = key_begin; // in case of commit failure, restart from this key
for (auto i = key_begin; i <= key_end; i++) {
@ -158,18 +157,16 @@ int populate(TX tx,
randstr(valstr, args.value_length);
while (thread_tps > 0 && xacts >= thread_tps /* throttle */) {
const auto time_now = steady_clock::now();
if (to_integer_seconds(time_now - time_prev) >= 1) {
if (to_integer_seconds(watch_throttle.stop().diff()) >= 1) {
xacts = 0;
time_prev = time_now;
watch_throttle.start_from_stop();
} else {
usleep(1000);
}
}
if (args.txntrace) {
const auto time_now = steady_clock::now();
if (to_integer_seconds(time_now - time_last_traced) >= args.txntrace) {
time_last_traced = time_now;
if (to_integer_seconds(watch_trace.stop().diff()) >= args.txntrace) {
watch_trace.start_from_stop();
fmt::print(debugme, "DEBUG: txn tracing {}\n", to_chars_ref(keystr));
auto err = Error{};
err = tx.set_option_nothrow(FDB_TR_OPTION_DEBUG_TRANSACTION_IDENTIFIER, keystr);
@ -185,50 +182,50 @@ int populate(TX tx,
/* insert (SET) */
tx.set(keystr, valstr);
stats.incr_count_immediate(OP_INSERT);
stats.incr_op_count(OP_INSERT);
/* commit every 100 inserts (default) or if this is the last key */
if (i == key_end || (i - key_begin + 1) % num_commit_every == 0) {
const auto is_sample_target = (stats.get_tx_count() % args.sampling) == 0;
auto time_commit_start = steady_clock::now();
const auto do_sample = (stats.get_tx_count() % args.sampling) == 0;
auto watch_commit = Stopwatch(start_at_ctor{});
auto future_commit = tx.commit();
const auto rc = wait_and_handle_error(tx, future_commit, "COMMIT_POPULATE_INSERT");
watch_commit.stop();
watch_tx.set_stop(watch_commit.get_stop());
auto tx_restarter = ExitGuard([&tx, &watch_tx]() {
watch_tx.start_from_stop();
tx.reset();
});
if (rc == FutureRC::OK) {
key_checkpoint = i + 1; // restart on failures from next key
} else if (rc == FutureRC::ABORT) {
return -1;
} else {
i = key_checkpoint - 1; // restart from last committed
time_tx_start = steady_clock::now(); // tx shall restart
continue;
}
const auto time_commit_end = steady_clock::now();
/* xact latency stats */
const auto commit_latency_us = to_integer_microseconds(time_commit_end - time_commit_start);
const auto tx_duration_us = to_integer_microseconds(time_commit_end - time_tx_start);
stats.add_latency(OP_COMMIT, commit_latency_us);
stats.add_latency(OP_TRANSACTION, tx_duration_us);
if (is_sample_target) {
sample_bins[OP_COMMIT].put(commit_latency_us);
sample_bins[OP_TRANSACTION].put(tx_duration_us);
if (do_sample) {
const auto commit_latency = watch_commit.diff();
const auto tx_duration = watch_tx.diff();
stats.add_latency(OP_COMMIT, commit_latency);
stats.add_latency(OP_TRANSACTION, tx_duration);
sample_bins[OP_COMMIT].put(commit_latency);
sample_bins[OP_TRANSACTION].put(tx_duration);
}
stats.incr_op_count(OP_COMMIT);
stats.incr_op_count(OP_TRANSACTION);
time_tx_start = steady_clock::now();
tx.reset();
stats.incr_tx_count();
xacts++; /* for throttling */
}
}
const auto time_end = steady_clock::now();
fmt::print(debugme,
"DEBUG: Populated {} rows [{}, {}]: {:6.3f} sec\n",
key_end - key_begin + 1,
key_begin,
key_end,
to_double_seconds(time_end - time_start));
to_double_seconds(watch_total.stop().diff()));
return 0;
}
@ -593,20 +590,20 @@ int run_one_transaction(TX tx, mako_args_t const& args, mako_stats_t& stats, sam
auto val = ByteString{};
val.reserve(args.value_length);
auto time_tx_start = steady_clock::now();
auto watch_tx = Stopwatch(start_at_ctor{});
auto op_iter = get_op_begin(args);
auto needs_commit = false;
auto time_per_op_start = std::array<timepoint_t, MAX_OP>{};
auto watch_per_op = std::array<Stopwatch, MAX_OP>{};
const auto do_sample = (stats.get_tx_count() % args.sampling) == 0;
while (op_iter != OpEnd) {
const auto [op, count, step] = op_iter;
const auto step_kind = op_desc[op].step_kind(step);
const auto op_key = std::make_pair(op, step);
const auto time_step_start = steady_clock::now();
if (step == 0 /* first step */) {
time_per_op_start[op] = time_step_start;
}
auto watch_step = Stopwatch{};
watch_step.start();
if (step == 0 /* first step */)
watch_per_op[op] = Stopwatch(watch_step.get_start());
auto f = operation_fn_table.at(op_key)(tx, args, key1, key2, val);
auto future_rc = FutureRC::OK;
if (f) {
@ -619,8 +616,7 @@ int run_one_transaction(TX tx, mako_args_t const& args, mako_stats_t& stats, sam
}
}
}
const auto time_step_end = steady_clock::now();
const auto step_usec = to_integer_microseconds(time_step_end - time_step_start);
watch_step.stop();
if (future_rc != FutureRC::OK) {
if (future_rc == FutureRC::CONFLICT) {
stats.incr_conflict_count();
@ -638,55 +634,72 @@ int run_one_transaction(TX tx, mako_args_t const& args, mako_stats_t& stats, sam
}
// step successful
if (step_kind == StepKind::COMMIT) {
const auto tx_usec = to_integer_microseconds(time_step_end - time_tx_start);
stats.add_latency(OP_COMMIT, step_usec);
stats.add_latency(OP_TRANSACTION, tx_usec);
// reset transaction boundary
const auto step_latency = watch_step.diff();
watch_tx.set_stop(watch_step.get_stop());
if (do_sample) {
sample_bins[OP_COMMIT].put(step_usec);
sample_bins[OP_TRANSACTION].put(tx_usec);
const auto tx_duration = watch_tx.diff();
stats.add_latency(OP_COMMIT, step_latency);
stats.add_latency(OP_TRANSACTION, tx_duration);
sample_bins[OP_COMMIT].put(step_latency);
sample_bins[OP_TRANSACTION].put(tx_duration);
}
time_tx_start = time_step_end; // new tx begins
tx.reset();
watch_tx.start_from_stop(); // new tx begins
stats.incr_op_count(OP_COMMIT);
stats.incr_op_count(OP_TRANSACTION);
needs_commit = false;
}
// op completed successfully
if (step + 1 == op_desc[op].steps() /* last step */) {
if (op_desc[op].needs_commit())
needs_commit = true;
const auto op_usec = to_integer_microseconds(time_step_end - time_per_op_start[op]);
stats.add_latency(op, op_usec);
if (do_sample)
sample_bins[op].put(op_usec);
watch_per_op[op].set_stop(watch_step.get_stop());
if (do_sample) {
const auto op_latency = watch_per_op[op].diff();
stats.add_latency(op, op_latency);
sample_bins[op].put(op_latency);
}
stats.incr_op_count(op);
}
// move to next op
op_iter = get_op_next(args, op_iter);
// reached the end?
if (op_iter == OpEnd && (needs_commit || args.commit_get)) {
const auto time_commit_start = steady_clock::now();
auto watch_commit = Stopwatch(start_at_ctor{});
auto f = tx.commit();
const auto rc = wait_and_handle_error(tx, f, "COMMIT_AT_TX_END");
const auto time_commit_end = steady_clock::now();
const auto commit_usec = to_integer_microseconds(time_commit_end - time_commit_start);
const auto tx_usec = to_integer_microseconds(time_commit_end - time_tx_start);
watch_commit.stop();
watch_tx.set_stop(watch_commit.get_stop());
auto tx_resetter = ExitGuard([&watch_tx, &tx]() {
tx.reset();
watch_tx.start_from_stop();
});
if (rc == FutureRC::OK) {
stats.add_latency(OP_COMMIT, commit_usec);
stats.add_latency(OP_TRANSACTION, tx_usec);
if (do_sample) {
sample_bins[OP_COMMIT].put(commit_usec);
sample_bins[OP_TRANSACTION].put(tx_usec);
const auto commit_latency = watch_commit.diff();
const auto tx_duration = watch_tx.diff();
stats.add_latency(OP_COMMIT, commit_latency);
stats.add_latency(OP_TRANSACTION, tx_duration);
sample_bins[OP_COMMIT].put(commit_latency);
sample_bins[OP_TRANSACTION].put(tx_duration);
}
stats.incr_op_count(OP_COMMIT);
stats.incr_op_count(OP_TRANSACTION);
} else {
if (rc == FutureRC::CONFLICT)
stats.incr_conflict_count();
else
stats.incr_error_count(OP_COMMIT);
if (rc == FutureRC::ABORT) {
tx.reset();
return -1;
}
// restart from beginning
op_iter = get_op_begin(args);
}
needs_commit = false;
}
}
stats.incr_tx_count();
@ -844,7 +857,7 @@ void worker_thread(thread_args_t& thread_args) {
/* i'm ready */
readycount.fetch_add(1);
auto stopcount_guard = exit_guard([&stopcount]() { stopcount.fetch_add(1); });
auto stopcount_guard = ExitGuard([&stopcount]() { stopcount.fetch_add(1); });
while (signal.load() == SIGNAL_OFF) {
usleep(10000); /* 10ms */
}
@ -884,7 +897,7 @@ void worker_thread(thread_args_t& thread_args) {
fmt::print(stderr, "ERROR: fopen({}): {}\n", filename, strerror(errno));
continue;
}
auto fclose_guard = exit_guard([fp]() { fclose(fp); });
auto fclose_guard = ExitGuard([fp]() { fclose(fp); });
thread_args.sample_bins[op].for_each_block(
[fp](auto ptr, auto count) { fwrite(ptr, sizeof(*ptr) * count, 1, fp); });
}
@ -1926,7 +1939,6 @@ void print_report(mako_args_t const& args,
const auto lat_total = final_stats.get_latency_us_total(op);
const auto lat_samples = final_stats.get_latency_sample_count(op);
if (lat_total && lat_samples) {
data_points[op].reserve(lat_samples);
for (auto i = 0; i < args.num_processes; i++) {
for (auto j = 0; j < args.num_threads; j++) {
const auto dirname = fmt::format("{}{}", TEMP_DATA_STORE, pid_main);
@ -1936,16 +1948,15 @@ void print_report(mako_args_t const& args,
fmt::print(stderr, "ERROR: fopen({}): {}\n", strerror(errno));
continue;
}
auto fclose_guard = exit_guard([fp]() { fclose(fp); });
auto fclose_guard = ExitGuard([fp]() { fclose(fp); });
fseek(fp, 0, SEEK_END);
const auto num_points = ftell(fp) / sizeof(uint64_t);
data_points[op].reserve(num_points);
fseek(fp, 0, 0);
auto index = 0u;
while (index < num_points) {
for (auto index = 0u; index < num_points; index++) {
auto value = uint64_t{};
fread(&value, sizeof(uint64_t), 1, fp);
data_points[op].push_back(value);
++index;
}
}
}
@ -2232,7 +2243,7 @@ int main(int argc, char* argv[]) {
fmt::print(stderr, "ERROR: shm_open failed\n");
return -1;
}
auto shmfd_guard = exit_guard([shmfd, &shmpath]() {
auto shmfd_guard = ExitGuard([shmfd, &shmpath]() {
close(shmfd);
shm_unlink(shmpath.c_str());
unlink(shmpath.c_str());
@ -2253,7 +2264,7 @@ int main(int argc, char* argv[]) {
fmt::print(stderr, "ERROR: mmap (fd:{} size:{}) failed\n", shmfd, shmsize);
return -1;
}
auto munmap_guard = exit_guard([=]() { munmap(shm, shmsize); });
auto munmap_guard = ExitGuard([=]() { munmap(shm, shmsize); });
static_assert(std::is_same_v<decltype(shm), mako_shmhdr_t*>);
auto stats = reinterpret_cast<mako_stats_t*>(shm + 1);

View File

@ -8,6 +8,8 @@
#include <array>
#include <atomic>
#include <cassert>
#include <chrono>
#include <list>
#include <vector>
#include <string_view>
@ -34,7 +36,7 @@ constexpr const int MODE_BUILD = 1;
constexpr const int MODE_RUN = 2;
/* size of each block to get detailed latency for each operation */
constexpr const size_t LAT_BLOCK_SIZE = 511;
constexpr const size_t LAT_BLOCK_SIZE = 16384;
/* transaction specification */
enum Operations {
@ -163,6 +165,35 @@ struct mako_shmhdr_t {
std::atomic<int> stopcount;
};
/* per-process information */
typedef struct {
int worker_id;
pid_t parent_id;
mako_args_t* args;
mako_shmhdr_t* shm;
std::vector<fdb::Database> databases;
} process_info_t;
/* time measurement helpers */
using std::chrono::steady_clock;
using timepoint_t = decltype(steady_clock::now());
using timediff_t = decltype(std::declval<timepoint_t>()-std::declval<timepoint_t>());
template <typename Duration>
double to_double_seconds(Duration duration) {
return std::chrono::duration_cast<std::chrono::duration<double>>(duration).count();
}
template <typename Duration>
uint64_t to_integer_seconds(Duration duration) {
return std::chrono::duration_cast<std::chrono::duration<uint64_t>>(duration).count();
}
template <typename Duration>
uint64_t to_integer_microseconds(Duration duration) {
return std::chrono::duration_cast<std::chrono::duration<uint64_t, std::micro>>(duration).count();
}
class alignas(64) mako_stats_t {
uint64_t xacts;
uint64_t conflicts;
@ -222,32 +253,25 @@ public:
void incr_conflict_count() noexcept { conflicts++; }
// non-commit write operations aren't measured for time.
void incr_count_immediate(int op) noexcept { ops[op]++; }
void incr_op_count(int op) noexcept { ops[op]++; }
void incr_error_count(int op) noexcept {
total_errors++;
errors[op]++;
}
void add_latency(int op, uint64_t latency_us) noexcept {
void add_latency(int op, timediff_t diff) noexcept {
const auto latency_us = to_integer_microseconds(diff);
latency_samples[op]++;
latency_us_total[op] += latency_us;
if (latency_us_min[op] > latency_us)
latency_us_min[op] = latency_us;
if (latency_us_max[op] < latency_us)
latency_us_max[op] = latency_us;
ops[op]++;
}
};
/* per-process information */
typedef struct {
int worker_id;
pid_t parent_id;
mako_args_t* args;
mako_shmhdr_t* shm;
std::vector<fdb::Database> databases;
} process_info_t;
/* memory block allocated to each operation when collecting detailed latency */
class lat_block_t {
@ -259,9 +283,9 @@ class lat_block_t {
public:
lat_block_t() noexcept = default;
bool full() const noexcept { return index >= LAT_BLOCK_SIZE; }
void put(uint64_t point) {
void put(timediff_t td) {
assert(!full());
samples[index++] = point;
samples[index++] = to_integer_microseconds(td);
}
// return {data block, number of samples}
std::pair<uint64_t const*, size_t> data() const noexcept { return { samples, index }; }
@ -277,10 +301,10 @@ public:
blocks.emplace_back();
}
void put(uint64_t latency_us) {
void put(timediff_t td) {
if (blocks.empty() || blocks.back().full())
blocks.emplace_back();
blocks.back().put(latency_us);
blocks.back().put(td);
}
// iterate & apply for each block user function void(uint64_t const*, size_t)

View File

@ -5,9 +5,10 @@
#include "mako.hpp"
#include <cassert>
#include <chrono>
#include <fmt/format.h>
#include <stdint.h>
#include <cstdint>
#include <type_traits>
#include <fmt/format.h>
/* uniform-distribution random */
/* return a uniform random number between low and high, both inclusive */
int urand(int low, int high);
@ -102,24 +103,24 @@ void genkey(std::basic_string<Char>& str, std::string_view prefix, mako_args_t c
// invoke user-provided callable when object goes out of scope.
template <typename Func>
class exit_guard {
class ExitGuard {
std::decay_t<Func> fn;
public:
exit_guard(Func&& fn) : fn(std::forward<Func>(fn)) {}
ExitGuard(Func&& fn) : fn(std::forward<Func>(fn)) {}
~exit_guard() { fn(); }
~ExitGuard() { fn(); }
};
// invoke user-provided callable when stack unwinds by exception.
template <typename Func>
class fail_guard {
class FailGuard {
std::decay_t<Func> fn;
public:
fail_guard(Func&& fn) : fn(std::forward<Func>(fn)) {}
FailGuard(Func&& fn) : fn(std::forward<Func>(fn)) {}
~fail_guard() {
~FailGuard() {
if (std::uncaught_exceptions()) {
fn();
}
@ -127,23 +128,38 @@ public:
};
// timing helpers
using std::chrono::steady_clock;
using timepoint_t = decltype(steady_clock::now());
struct start_at_ctor{};
template <typename Duration>
double to_double_seconds(Duration duration) {
return std::chrono::duration_cast<std::chrono::duration<double>>(duration).count();
}
template <typename Duration>
uint64_t to_integer_seconds(Duration duration) {
return std::chrono::duration_cast<std::chrono::duration<uint64_t>>(duration).count();
}
template <typename Duration>
uint64_t to_integer_microseconds(Duration duration) {
return std::chrono::duration_cast<std::chrono::duration<uint64_t, std::micro>>(duration).count();
}
class Stopwatch {
timepoint_t p1, p2;
public:
Stopwatch() noexcept = default;
Stopwatch(start_at_ctor) noexcept {
start();
}
Stopwatch(timepoint_t start_time) noexcept : p1(start_time), p2() {}
Stopwatch(const Stopwatch&) noexcept = default;
Stopwatch& operator=(const Stopwatch&) noexcept = default;
timepoint_t get_start() const noexcept { return p1; }
timepoint_t get_stop() const noexcept { return p2; }
void start() noexcept {
p1 = steady_clock::now();
}
Stopwatch& stop() noexcept {
p2 = steady_clock::now();
return *this;
}
Stopwatch& set_stop(timepoint_t p_stop) noexcept {
p2 = p_stop;
return *this;
}
void start_from_stop() noexcept {
p1 = p2;
}
auto diff() const noexcept {
return p2 - p1;
}
};
// trace helpers
constexpr const int STATS_TITLE_WIDTH = 12;