Style changes: C > C++
* more controlled shared memory access * simplified thread arguments * use namespace 'mako' * separate headers by their concerns (time, statistics, shared memory) * introduce Stopwatch class
This commit is contained in:
parent
695022ad19
commit
bcf52e6760
|
@ -37,6 +37,7 @@
|
|||
#include "utils.hpp"
|
||||
|
||||
using namespace fdb;
|
||||
using namespace mako;
|
||||
|
||||
const std::string KEY_PREFIX{ "mako" };
|
||||
const std::string TEMP_DATA_STORE{ "/tmp/makoTemp" };
|
||||
|
@ -95,7 +96,7 @@ FutureRC wait_and_handle_error(TX tx, FutureType f, std::string_view operation)
|
|||
}
|
||||
|
||||
/* cleanup database */
|
||||
int cleanup(TX tx, mako_args_t const& args) {
|
||||
int cleanup(TX tx, args_t const& args) {
|
||||
auto beginstr = ByteString{};
|
||||
beginstr.reserve(args.key_length);
|
||||
genkeyprefix(beginstr, KEY_PREFIX, args);
|
||||
|
@ -129,11 +130,11 @@ int cleanup(TX tx, mako_args_t const& args) {
|
|||
|
||||
/* populate database */
|
||||
int populate(TX tx,
|
||||
mako_args_t const& args,
|
||||
args_t const& args,
|
||||
int worker_id,
|
||||
int thread_id,
|
||||
int thread_tps,
|
||||
mako_stats_t& stats,
|
||||
stats_t& stats,
|
||||
sample_bin_array_t& sample_bins) {
|
||||
const auto key_begin = insert_begin(args.rows, worker_id, thread_id, args.num_processes, args.num_threads);
|
||||
const auto key_end = insert_end(args.rows, worker_id, thread_id, args.num_processes, args.num_threads);
|
||||
|
@ -144,6 +145,7 @@ int populate(TX tx,
|
|||
keystr.reserve(args.key_length);
|
||||
valstr.reserve(args.value_length);
|
||||
const auto num_commit_every = args.txnspec.ops[OP_INSERT][OP_COUNT];
|
||||
const auto num_seconds_trace_every = args.txntrace;
|
||||
auto watch_total = Stopwatch(start_at_ctor{});
|
||||
auto watch_throttle = Stopwatch(watch_total.get_start());
|
||||
auto watch_tx = Stopwatch(watch_total.get_start());
|
||||
|
@ -164,8 +166,8 @@ int populate(TX tx,
|
|||
usleep(1000);
|
||||
}
|
||||
}
|
||||
if (args.txntrace) {
|
||||
if (to_integer_seconds(watch_trace.stop().diff()) >= args.txntrace) {
|
||||
if (num_seconds_trace_every) {
|
||||
if (to_integer_seconds(watch_trace.stop().diff()) >= num_seconds_trace_every) {
|
||||
watch_trace.start_from_stop();
|
||||
fmt::print(debugme, "DEBUG: txn tracing {}\n", to_chars_ref(keystr));
|
||||
auto err = Error{};
|
||||
|
@ -192,12 +194,10 @@ int populate(TX tx,
|
|||
const auto rc = wait_and_handle_error(tx, future_commit, "COMMIT_POPULATE_INSERT");
|
||||
watch_commit.stop();
|
||||
watch_tx.set_stop(watch_commit.get_stop());
|
||||
auto tx_restarter = ExitGuard([&tx, &watch_tx]() {
|
||||
watch_tx.start_from_stop();
|
||||
tx.reset();
|
||||
});
|
||||
auto tx_restarter = ExitGuard([&watch_tx]() { watch_tx.start_from_stop(); });
|
||||
if (rc == FutureRC::OK) {
|
||||
key_checkpoint = i + 1; // restart on failures from next key
|
||||
tx.reset();
|
||||
} else if (rc == FutureRC::ABORT) {
|
||||
return -1;
|
||||
} else {
|
||||
|
@ -311,7 +311,7 @@ void granule_free_load(int64_t loadId, void* userContext) {
|
|||
context->data_by_id[loadId] = 0;
|
||||
}
|
||||
|
||||
inline int next_key(mako_args_t const& args) {
|
||||
inline int next_key(args_t const& args) {
|
||||
if (args.zipf)
|
||||
return zipfian_next();
|
||||
return urand(0, args.rows - 1);
|
||||
|
@ -335,20 +335,20 @@ const std::array<OpDesc, MAX_OP> op_desc{ { { "GRV", { StepKind::READ }, false }
|
|||
{ "READBLOBGRANULE", { StepKind::ON_ERROR }, false } } };
|
||||
|
||||
const std::map<std::pair<int /*op*/, int /*sub-op step*/>,
|
||||
Future (*)(TX, mako_args_t const&, ByteString& /*key1*/, ByteString& /*key2*/, ByteString& /*value*/)>
|
||||
Future (*)(TX, args_t const&, ByteString& /*key1*/, ByteString& /*key2*/, ByteString& /*value*/)>
|
||||
operation_fn_table{
|
||||
{ { OP_GETREADVERSION, 0 },
|
||||
[](TX tx, mako_args_t const&, ByteString&, ByteString&, ByteString&) {
|
||||
[](TX tx, args_t const&, ByteString&, ByteString&, ByteString&) {
|
||||
return tx.get_read_version().erase_type();
|
||||
} },
|
||||
{ { OP_GET, 0 },
|
||||
[](TX tx, mako_args_t const& args, ByteString& key, ByteString&, ByteString&) {
|
||||
[](TX tx, args_t const& args, ByteString& key, ByteString&, ByteString&) {
|
||||
const auto num = next_key(args);
|
||||
genkey(key, KEY_PREFIX, args, num);
|
||||
return tx.get(key, false /*snapshot*/).erase_type();
|
||||
} },
|
||||
{ { OP_GETRANGE, 0 },
|
||||
[](TX tx, mako_args_t const& args, ByteString& begin, ByteString& end, ByteString&) {
|
||||
[](TX tx, args_t const& args, ByteString& begin, ByteString& end, ByteString&) {
|
||||
const auto num_begin = next_key(args);
|
||||
genkey(begin, KEY_PREFIX, args, num_begin);
|
||||
auto num_end = num_begin + args.txnspec.ops[OP_GETRANGE][OP_RANGE] - 1;
|
||||
|
@ -367,13 +367,13 @@ const std::map<std::pair<int /*op*/, int /*sub-op step*/>,
|
|||
.erase_type();
|
||||
} },
|
||||
{ { OP_SGET, 0 },
|
||||
[](TX tx, mako_args_t const& args, ByteString& key, ByteString&, ByteString&) {
|
||||
[](TX tx, args_t const& args, ByteString& key, ByteString&, ByteString&) {
|
||||
const auto num = next_key(args);
|
||||
genkey(key, KEY_PREFIX, args, num);
|
||||
return tx.get(key, true /*snapshot*/).erase_type();
|
||||
} },
|
||||
{ { OP_SGETRANGE, 0 },
|
||||
[](TX tx, mako_args_t const& args, ByteString& begin, ByteString& end, ByteString&) {
|
||||
[](TX tx, args_t const& args, ByteString& begin, ByteString& end, ByteString&) {
|
||||
const auto num_begin = next_key(args);
|
||||
genkey(begin, KEY_PREFIX, args, num_begin);
|
||||
auto num_end = num_begin + args.txnspec.ops[OP_SGETRANGE][OP_RANGE] - 1;
|
||||
|
@ -392,19 +392,19 @@ const std::map<std::pair<int /*op*/, int /*sub-op step*/>,
|
|||
.erase_type();
|
||||
} },
|
||||
{ { OP_UPDATE, 0 },
|
||||
[](TX tx, mako_args_t const& args, ByteString& key, ByteString&, ByteString&) {
|
||||
[](TX tx, args_t const& args, ByteString& key, ByteString&, ByteString&) {
|
||||
const auto num = next_key(args);
|
||||
genkey(key, KEY_PREFIX, args, num);
|
||||
return tx.get(key, false /*snapshot*/).erase_type();
|
||||
} },
|
||||
{ { OP_UPDATE, 1 },
|
||||
[](TX tx, mako_args_t const& args, ByteString& key, ByteString&, ByteString& value) {
|
||||
[](TX tx, args_t const& args, ByteString& key, ByteString&, ByteString& value) {
|
||||
randstr(value, args.value_length);
|
||||
tx.set(key, value);
|
||||
return Future();
|
||||
} },
|
||||
{ { OP_INSERT, 0 },
|
||||
[](TX tx, mako_args_t const& args, ByteString& key, ByteString&, ByteString& value) {
|
||||
[](TX tx, args_t const& args, ByteString& key, ByteString&, ByteString& value) {
|
||||
genkeyprefix(key, KEY_PREFIX, args);
|
||||
// concat([padding], key_prefix, random_string): reasonably unique
|
||||
randstr<false /*clear-before-append*/>(key, args.key_length - static_cast<int>(key.size()));
|
||||
|
@ -413,7 +413,7 @@ const std::map<std::pair<int /*op*/, int /*sub-op step*/>,
|
|||
return Future();
|
||||
} },
|
||||
{ { OP_INSERTRANGE, 0 },
|
||||
[](TX tx, mako_args_t const& args, ByteString& key, ByteString&, ByteString& value) {
|
||||
[](TX tx, args_t const& args, ByteString& key, ByteString&, ByteString& value) {
|
||||
genkeyprefix(key, KEY_PREFIX, args);
|
||||
const auto prefix_len = static_cast<int>(key.size());
|
||||
const auto range = args.txnspec.ops[OP_INSERTRANGE][OP_RANGE];
|
||||
|
@ -432,20 +432,20 @@ const std::map<std::pair<int /*op*/, int /*sub-op step*/>,
|
|||
return Future();
|
||||
} },
|
||||
{ { OP_OVERWRITE, 0 },
|
||||
[](TX tx, mako_args_t const& args, ByteString& key, ByteString&, ByteString& value) {
|
||||
[](TX tx, args_t const& args, ByteString& key, ByteString&, ByteString& value) {
|
||||
genkey(key, KEY_PREFIX, args, next_key(args));
|
||||
randstr(value, args.value_length);
|
||||
tx.set(key, value);
|
||||
return Future();
|
||||
} },
|
||||
{ { OP_CLEAR, 0 },
|
||||
[](TX tx, mako_args_t const& args, ByteString& key, ByteString&, ByteString&) {
|
||||
[](TX tx, args_t const& args, ByteString& key, ByteString&, ByteString&) {
|
||||
genkey(key, KEY_PREFIX, args, next_key(args));
|
||||
tx.clear(key);
|
||||
return Future();
|
||||
} },
|
||||
{ { OP_SETCLEAR, 0 },
|
||||
[](TX tx, mako_args_t const& args, ByteString& key, ByteString&, ByteString& value) {
|
||||
[](TX tx, args_t const& args, ByteString& key, ByteString&, ByteString& value) {
|
||||
genkeyprefix(key, KEY_PREFIX, args);
|
||||
const auto prefix_len = static_cast<int>(key.size());
|
||||
randstr<false /*append-after-clear*/>(key, args.key_length - prefix_len);
|
||||
|
@ -454,13 +454,13 @@ const std::map<std::pair<int /*op*/, int /*sub-op step*/>,
|
|||
return tx.commit().erase_type();
|
||||
} },
|
||||
{ { OP_SETCLEAR, 1 },
|
||||
[](TX tx, mako_args_t const& args, ByteString& key, ByteString&, ByteString&) {
|
||||
[](TX tx, args_t const& args, ByteString& key, ByteString&, ByteString&) {
|
||||
tx.reset(); // assuming commit from step 0 worked.
|
||||
tx.clear(key); // key should forward unchanged from step 0
|
||||
return Future();
|
||||
} },
|
||||
{ { OP_CLEARRANGE, 0 },
|
||||
[](TX tx, mako_args_t const& args, ByteString& begin, ByteString& end, ByteString&) {
|
||||
[](TX tx, args_t const& args, ByteString& begin, ByteString& end, ByteString&) {
|
||||
const auto num_begin = next_key(args);
|
||||
genkey(begin, KEY_PREFIX, args, num_begin);
|
||||
const auto range = args.txnspec.ops[OP_CLEARRANGE][OP_RANGE];
|
||||
|
@ -470,7 +470,7 @@ const std::map<std::pair<int /*op*/, int /*sub-op step*/>,
|
|||
return Future();
|
||||
} },
|
||||
{ { OP_SETCLEARRANGE, 0 },
|
||||
[](TX tx, mako_args_t const& args, ByteString& key_begin, ByteString& key, ByteString& value) {
|
||||
[](TX tx, args_t const& args, ByteString& key_begin, ByteString& key, ByteString& value) {
|
||||
genkeyprefix(key, KEY_PREFIX, args);
|
||||
const auto prefix_len = static_cast<int>(key.size());
|
||||
const auto range = args.txnspec.ops[OP_SETCLEARRANGE][OP_RANGE];
|
||||
|
@ -495,13 +495,13 @@ const std::map<std::pair<int /*op*/, int /*sub-op step*/>,
|
|||
return tx.commit().erase_type();
|
||||
} },
|
||||
{ { OP_SETCLEARRANGE, 1 },
|
||||
[](TX tx, mako_args_t const& args, ByteString& begin, ByteString& end, ByteString&) {
|
||||
[](TX tx, args_t const& args, ByteString& begin, ByteString& end, ByteString&) {
|
||||
tx.reset();
|
||||
tx.clear_range(begin, end);
|
||||
return Future();
|
||||
} },
|
||||
{ { OP_READ_BG, 0 },
|
||||
[](TX tx, mako_args_t const& args, ByteString& begin, ByteString& end, ByteString&) {
|
||||
[](TX tx, args_t const& args, ByteString& begin, ByteString& end, ByteString&) {
|
||||
const auto num_begin = next_key(args);
|
||||
genkey(begin, KEY_PREFIX, args, num_begin);
|
||||
const auto range = args.txnspec.ops[OP_READ_BG][OP_RANGE];
|
||||
|
@ -555,7 +555,7 @@ using OpIterator = std::tuple<int /*op*/, int /*count*/, int /*step*/>;
|
|||
|
||||
constexpr const OpIterator OpEnd = OpIterator(MAX_OP, -1, -1);
|
||||
|
||||
OpIterator get_op_begin(mako_args_t const& args) noexcept {
|
||||
OpIterator get_op_begin(args_t const& args) noexcept {
|
||||
for (auto op = 0; op < MAX_OP; op++) {
|
||||
if (op == OP_COMMIT || op == OP_TRANSACTION || args.txnspec.ops[op][OP_COUNT] == 0)
|
||||
continue;
|
||||
|
@ -564,7 +564,7 @@ OpIterator get_op_begin(mako_args_t const& args) noexcept {
|
|||
return OpEnd;
|
||||
}
|
||||
|
||||
OpIterator get_op_next(mako_args_t const& args, OpIterator current) noexcept {
|
||||
OpIterator get_op_next(args_t const& args, OpIterator current) noexcept {
|
||||
if (OpEnd == current)
|
||||
return OpEnd;
|
||||
auto [op, count, step] = current;
|
||||
|
@ -581,7 +581,7 @@ OpIterator get_op_next(mako_args_t const& args, OpIterator current) noexcept {
|
|||
}
|
||||
|
||||
/* run one transaction */
|
||||
int run_one_transaction(TX tx, mako_args_t const& args, mako_stats_t& stats, sample_bin_array_t& sample_bins) {
|
||||
int run_one_transaction(TX tx, args_t const& args, stats_t& stats, sample_bin_array_t& sample_bins) {
|
||||
// reuse memory for keys to avoid realloc overhead
|
||||
auto key1 = ByteString{};
|
||||
key1.reserve(args.key_length);
|
||||
|
@ -709,12 +709,12 @@ int run_one_transaction(TX tx, mako_args_t const& args, mako_stats_t& stats, sam
|
|||
}
|
||||
|
||||
int run_workload(TX tx,
|
||||
mako_args_t const& args,
|
||||
args_t const& args,
|
||||
int const thread_tps,
|
||||
std::atomic<double> const& throttle_factor,
|
||||
int const thread_iters,
|
||||
std::atomic<int> const& signal,
|
||||
mako_stats_t& stats,
|
||||
stats_t& stats,
|
||||
sample_bin_array_t& sample_bins,
|
||||
int const dotrace,
|
||||
int const dotagging) {
|
||||
|
@ -815,31 +815,28 @@ std::string get_stats_file_name(std::string_view dirname, int worker_id, int thr
|
|||
|
||||
/* mako worker thread */
|
||||
void worker_thread(thread_args_t& thread_args) {
|
||||
const auto& args = *thread_args.process->args;
|
||||
const auto parent_id = thread_args.process->parent_id;
|
||||
const auto worker_id = thread_args.process->worker_id;
|
||||
const auto& args = *thread_args.args;
|
||||
const auto parent_id = thread_args.parent_id;
|
||||
const auto worker_id = thread_args.worker_id;
|
||||
const auto thread_id = thread_args.thread_id;
|
||||
const auto dotrace = (worker_id == 0 && thread_id == 0 && args.txntrace) ? args.txntrace : 0;
|
||||
auto database = thread_args.database;
|
||||
const auto dotagging = args.txntagging;
|
||||
const auto database_index = thread_args.database_index;
|
||||
const auto& signal = thread_args.process->shm->signal;
|
||||
const auto& throttle_factor = thread_args.process->shm->throttle_factor;
|
||||
auto& readycount = thread_args.process->shm->readycount;
|
||||
auto& stopcount = thread_args.process->shm->stopcount;
|
||||
static_assert(std::is_same_v<decltype(std::decay_t<decltype(*thread_args.process)>::shm), mako_shmhdr_t*>);
|
||||
auto& stats = *(reinterpret_cast<mako_stats_t*>(thread_args.process->shm + 1) /* skip header */ +
|
||||
(worker_id * args.num_threads + thread_id));
|
||||
const auto& signal = thread_args.shm.header_const().signal;
|
||||
const auto& throttle_factor = thread_args.shm.header_const().throttle_factor;
|
||||
auto& readycount = thread_args.shm.header().readycount;
|
||||
auto& stopcount = thread_args.shm.header().stopcount;
|
||||
auto& stats = thread_args.shm.stats_slot(worker_id, thread_id);
|
||||
|
||||
/* init per-thread latency statistics */
|
||||
new (&stats) mako_stats_t();
|
||||
new (&stats) stats_t();
|
||||
|
||||
fmt::print(debugme,
|
||||
"DEBUG: worker_id:{} ({}) thread_id:{} ({}) database_index:{} (tid:{})\n",
|
||||
"DEBUG: worker_id:{} ({}) thread_id:{} ({}) (tid:{})\n",
|
||||
worker_id,
|
||||
args.num_processes,
|
||||
thread_id,
|
||||
args.num_threads,
|
||||
database_index,
|
||||
reinterpret_cast<uint64_t>(pthread_self()));
|
||||
|
||||
const auto thread_tps =
|
||||
|
@ -851,7 +848,6 @@ void worker_thread(thread_args_t& thread_args) {
|
|||
? 0
|
||||
: compute_thread_iters(args.iteration, worker_id, thread_id, args.num_processes, args.num_threads);
|
||||
|
||||
auto database = thread_args.process->databases[database_index];
|
||||
/* create my own transaction object */
|
||||
auto tx = database.create_tx();
|
||||
|
||||
|
@ -885,7 +881,7 @@ void worker_thread(thread_args_t& thread_args) {
|
|||
if (args.mode == MODE_BUILD || args.mode == MODE_RUN) {
|
||||
const auto dirname = fmt::format("{}{}", TEMP_DATA_STORE, parent_id);
|
||||
const auto rc = mkdir(dirname.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
|
||||
if (rc < 0) {
|
||||
if (rc < 0 && errno != EEXIST) {
|
||||
fmt::print(stderr, "ERROR: mkdir {}: {}\n", dirname, strerror(errno));
|
||||
return;
|
||||
}
|
||||
|
@ -906,17 +902,10 @@ void worker_thread(thread_args_t& thread_args) {
|
|||
}
|
||||
|
||||
/* mako worker process */
|
||||
int worker_process_main(mako_args_t& args, int worker_id, mako_shmhdr_t* shm, pid_t pid_main) {
|
||||
process_info_t process;
|
||||
|
||||
process.worker_id = worker_id;
|
||||
process.parent_id = pid_main;
|
||||
process.args = &args;
|
||||
process.shm = shm;
|
||||
|
||||
int worker_process_main(args_t const& args, int worker_id, shm_access_t shm, pid_t pid_main) {
|
||||
fmt::print(debugme, "DEBUG: worker {} started\n", worker_id);
|
||||
|
||||
Error err;
|
||||
auto err = Error{};
|
||||
/* Everything starts from here */
|
||||
|
||||
select_api_version(args.api_version);
|
||||
|
@ -963,15 +952,19 @@ int worker_process_main(mako_args_t& args, int worker_id, mako_shmhdr_t* shm, pi
|
|||
|
||||
/* enable knobs if specified */
|
||||
if (args.knobs[0] != '\0') {
|
||||
char delim[] = ", ";
|
||||
auto knob = strtok(args.knobs, delim);
|
||||
while (knob != NULL) {
|
||||
fmt::print(debugme, "DEBUG: Setting client knobs: {}\n", knob);
|
||||
err = network::set_option_nothrow(FDB_NET_OPTION_KNOB, BytesRef(to_byte_ptr(knob)));
|
||||
auto knobs = std::string_view(args.knobs);
|
||||
const auto delim = std::string_view(", ");
|
||||
while (true) {
|
||||
knobs.remove_prefix(std::min(knobs.find_first_not_of(delim), knobs.size()));
|
||||
auto knob = knobs.substr(0, knobs.find_first_of(delim));
|
||||
if (knob.empty())
|
||||
break;
|
||||
fmt::print(debugme, "DEBUG: Setting client knob: {}\n", knob);
|
||||
err = network::set_option_nothrow(FDB_NET_OPTION_KNOB, to_bytes_ref(knob));
|
||||
if (err) {
|
||||
fmt::print(stderr, "ERROR: fdb_network_set_option: {}\n", err.what());
|
||||
fmt::print(stderr, "ERROR: fdb_network_set_option({}): {}\n", knob, err.what());
|
||||
}
|
||||
knob = strtok(NULL, delim);
|
||||
knobs.remove_prefix(knob.size());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1003,13 +996,14 @@ int worker_process_main(mako_args_t& args, int worker_id, mako_shmhdr_t* shm, pi
|
|||
|
||||
/*** let's party! ***/
|
||||
|
||||
auto databases = std::vector<fdb::Database>(args.num_databases);
|
||||
/* set up database for worker threads */
|
||||
for (auto i = 0; i < args.num_databases; i++) {
|
||||
size_t cluster_index = args.num_fdb_clusters <= 1 ? 0 : i % args.num_fdb_clusters;
|
||||
process.databases.emplace_back(Database(args.cluster_files[cluster_index]));
|
||||
databases[i] = Database(args.cluster_files[cluster_index]);
|
||||
fmt::print(debugme, "DEBUG: creating database at cluster {}\n", args.cluster_files[cluster_index]);
|
||||
if (args.disable_ryw) {
|
||||
process.databases.back().set_option(FDB_DB_OPTION_SNAPSHOT_RYW_DISABLE, BytesRef{});
|
||||
databases[i].set_option(FDB_DB_OPTION_SNAPSHOT_RYW_DISABLE, BytesRef{});
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1021,16 +1015,19 @@ int worker_process_main(mako_args_t& args, int worker_id, mako_shmhdr_t* shm, pi
|
|||
|
||||
for (auto i = 0; i < args.num_threads; i++) {
|
||||
auto& this_args = thread_args[i];
|
||||
this_args.worker_id = worker_id;
|
||||
this_args.thread_id = i;
|
||||
this_args.database_index = i % args.num_databases;
|
||||
this_args.parent_id = pid_main;
|
||||
this_args.args = &args;
|
||||
this_args.shm = shm;
|
||||
this_args.database = databases[i % args.num_databases];
|
||||
|
||||
/* for ops to run, pre-allocate one latency sample block */
|
||||
for (int op = 0; op < MAX_OP; op++) {
|
||||
for (auto op = 0; op < MAX_OP; op++) {
|
||||
if (args.txnspec.ops[op][OP_COUNT] > 0 || op == OP_TRANSACTION || op == OP_COMMIT) {
|
||||
this_args.sample_bins[op].reserve_one();
|
||||
}
|
||||
}
|
||||
this_args.process = &process;
|
||||
worker_threads[i] = std::thread(worker_thread, std::ref(this_args));
|
||||
}
|
||||
|
||||
|
@ -1057,11 +1054,11 @@ int worker_process_main(mako_args_t& args, int worker_id, mako_shmhdr_t* shm, pi
|
|||
}
|
||||
|
||||
/* initialize the parameters with default values */
|
||||
int init_args(mako_args_t* args) {
|
||||
int init_args(args_t* args) {
|
||||
int i;
|
||||
if (!args)
|
||||
return -1;
|
||||
memset(args, 0, sizeof(mako_args_t)); /* zero-out everything */
|
||||
memset(args, 0, sizeof(args_t)); /* zero-out everything */
|
||||
args->num_fdb_clusters = 0;
|
||||
args->num_databases = 1;
|
||||
args->api_version = max_api_version();
|
||||
|
@ -1106,7 +1103,7 @@ int init_args(mako_args_t* args) {
|
|||
}
|
||||
|
||||
/* parse transaction specification */
|
||||
int parse_transaction(mako_args_t* args, char const* optarg) {
|
||||
int parse_transaction(args_t* args, char const* optarg) {
|
||||
char const* ptr = optarg;
|
||||
int op = 0;
|
||||
int rangeop = 0;
|
||||
|
@ -1279,7 +1276,7 @@ void usage() {
|
|||
}
|
||||
|
||||
/* parse benchmark paramters */
|
||||
int parse_args(int argc, char* argv[], mako_args_t* args) {
|
||||
int parse_args(int argc, char* argv[], args_t* args) {
|
||||
int rc;
|
||||
int c;
|
||||
int idx;
|
||||
|
@ -1542,7 +1539,7 @@ char const* get_ops_name(int ops_code) {
|
|||
return "";
|
||||
}
|
||||
|
||||
int validate_args(mako_args_t* args) {
|
||||
int validate_args(args_t* args) {
|
||||
if (args->mode == MODE_INVALID) {
|
||||
fprintf(stderr, "ERROR: --mode has to be set\n");
|
||||
return -1;
|
||||
|
@ -1605,10 +1602,10 @@ int validate_args(mako_args_t* args) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void print_stats(mako_args_t const& args, mako_stats_t const* stats, double const duration_sec, FILE* fp) {
|
||||
static mako_stats_t prev;
|
||||
void print_stats(args_t const& args, stats_t const* stats, double const duration_sec, FILE* fp) {
|
||||
static stats_t prev;
|
||||
|
||||
auto current = mako_stats_t{};
|
||||
auto current = stats_t{};
|
||||
for (auto i = 0; i < args.num_processes; i++) {
|
||||
for (auto j = 0; j < args.num_threads; j++) {
|
||||
current.combine(stats[(i * args.num_threads) + j]);
|
||||
|
@ -1665,7 +1662,7 @@ void print_stats(mako_args_t const& args, mako_stats_t const* stats, double cons
|
|||
prev = current;
|
||||
}
|
||||
|
||||
void print_stats_header(mako_args_t const& args, bool show_commit, bool is_first_header_empty, bool show_op_stats) {
|
||||
void print_stats_header(args_t const& args, bool show_commit, bool is_first_header_empty, bool show_op_stats) {
|
||||
/* header */
|
||||
if (is_first_header_empty)
|
||||
put_title("");
|
||||
|
@ -1709,13 +1706,9 @@ void print_stats_header(mako_args_t const& args, bool show_commit, bool is_first
|
|||
fmt::print("\n");
|
||||
}
|
||||
|
||||
void print_report(mako_args_t const& args,
|
||||
mako_stats_t const* stats,
|
||||
double const duration_sec,
|
||||
pid_t pid_main,
|
||||
FILE* fp) {
|
||||
void print_report(args_t const& args, stats_t const* stats, double const duration_sec, pid_t pid_main, FILE* fp) {
|
||||
|
||||
auto final_stats = mako_stats_t{};
|
||||
auto final_stats = stats_t{};
|
||||
for (auto i = 0; i < args.num_processes; i++) {
|
||||
for (auto j = 0; j < args.num_threads; j++) {
|
||||
const auto idx = i * args.num_threads + j;
|
||||
|
@ -1945,7 +1938,7 @@ void print_report(mako_args_t const& args,
|
|||
const auto filename = get_stats_file_name(dirname, i, j, op);
|
||||
auto fp = fopen(filename.c_str(), "r");
|
||||
if (!fp) {
|
||||
fmt::print(stderr, "ERROR: fopen({}): {}\n", strerror(errno));
|
||||
fmt::print(stderr, "ERROR: fopen({}): {}\n", filename, strerror(errno));
|
||||
continue;
|
||||
}
|
||||
auto fclose_guard = ExitGuard([fp]() { fclose(fp); });
|
||||
|
@ -2072,11 +2065,11 @@ void print_report(mako_args_t const& args,
|
|||
system(command_remove.c_str());
|
||||
}
|
||||
|
||||
int stats_process_main(mako_args_t const& args,
|
||||
mako_stats_t* stats,
|
||||
int stats_process_main(args_t const& args,
|
||||
stats_t const* stats,
|
||||
std::atomic<double>& throttle_factor,
|
||||
std::atomic<int>& signal,
|
||||
std::atomic<int>& stopcount,
|
||||
std::atomic<int> const& signal,
|
||||
std::atomic<int> const& stopcount,
|
||||
pid_t pid_main) {
|
||||
bool first_stats = true;
|
||||
|
||||
|
@ -2207,7 +2200,7 @@ int main(int argc, char* argv[]) {
|
|||
setlinebuf(stdout);
|
||||
|
||||
auto rc = int{};
|
||||
auto args = mako_args_t{};
|
||||
auto args = args_t{};
|
||||
rc = init_args(&args);
|
||||
if (rc < 0) {
|
||||
fmt::print(stderr, "ERROR: init_args failed\n");
|
||||
|
@ -2250,33 +2243,33 @@ int main(int argc, char* argv[]) {
|
|||
});
|
||||
|
||||
/* allocate */
|
||||
const auto shmsize = sizeof(mako_shmhdr_t) + (sizeof(mako_stats_t) * args.num_processes * args.num_threads);
|
||||
auto shm = static_cast<mako_shmhdr_t*>(nullptr);
|
||||
const auto shmsize = shm_storage_size(args.num_processes, args.num_threads);
|
||||
auto shm = std::add_pointer_t<void>{};
|
||||
if (ftruncate(shmfd, shmsize) < 0) {
|
||||
shm = static_cast<mako_shmhdr_t*>(MAP_FAILED);
|
||||
shm = MAP_FAILED;
|
||||
fmt::print(stderr, "ERROR: ftruncate (fd:{} size:{}) failed\n", shmfd, shmsize);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* map it */
|
||||
shm = static_cast<mako_shmhdr_t*>(mmap(NULL, shmsize, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0));
|
||||
shm = mmap(NULL, shmsize, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0);
|
||||
if (shm == MAP_FAILED) {
|
||||
fmt::print(stderr, "ERROR: mmap (fd:{} size:{}) failed\n", shmfd, shmsize);
|
||||
return -1;
|
||||
}
|
||||
auto munmap_guard = ExitGuard([=]() { munmap(shm, shmsize); });
|
||||
|
||||
static_assert(std::is_same_v<decltype(shm), mako_shmhdr_t*>);
|
||||
auto stats = reinterpret_cast<mako_stats_t*>(shm + 1);
|
||||
auto shm_access = shm_access_t(shm, args.num_processes, args.num_threads);
|
||||
|
||||
/* initialize the shared memory */
|
||||
memset(shm, 0, shmsize);
|
||||
shm_access.reset();
|
||||
|
||||
/* get ready */
|
||||
shm->signal = SIGNAL_OFF;
|
||||
shm->readycount = 0;
|
||||
shm->stopcount = 0;
|
||||
shm->throttle_factor = 1.0;
|
||||
auto& shm_hdr = shm_access.header();
|
||||
shm_hdr.signal = SIGNAL_OFF;
|
||||
shm_hdr.readycount = 0;
|
||||
shm_hdr.stopcount = 0;
|
||||
shm_hdr.throttle_factor = 1.0;
|
||||
|
||||
auto proc_type = proc_master;
|
||||
/* fork worker processes + 1 stats process */
|
||||
|
@ -2317,7 +2310,7 @@ int main(int argc, char* argv[]) {
|
|||
|
||||
if (proc_type == proc_worker) {
|
||||
/* worker process */
|
||||
worker_process_main(args, worker_id, shm, pid_main);
|
||||
worker_process_main(args, worker_id, shm_access, pid_main);
|
||||
/* worker can exit here */
|
||||
exit(0);
|
||||
} else if (proc_type == proc_stats) {
|
||||
|
@ -2326,16 +2319,17 @@ int main(int argc, char* argv[]) {
|
|||
/* no stats needed for clean mode */
|
||||
exit(0);
|
||||
}
|
||||
stats_process_main(args, stats, shm->throttle_factor, shm->signal, shm->stopcount, pid_main);
|
||||
stats_process_main(
|
||||
args, shm_access.stats_const_array(), shm_hdr.throttle_factor, shm_hdr.signal, shm_hdr.stopcount, pid_main);
|
||||
exit(0);
|
||||
}
|
||||
|
||||
/* master */
|
||||
/* wait for everyone to be ready */
|
||||
while (shm->readycount < (args.num_processes * args.num_threads)) {
|
||||
while (shm_hdr.readycount.load() < (args.num_processes * args.num_threads)) {
|
||||
usleep(1000);
|
||||
}
|
||||
shm->signal = SIGNAL_GREEN;
|
||||
shm_hdr.signal.store(SIGNAL_GREEN);
|
||||
|
||||
if (args.mode == MODE_RUN) {
|
||||
/* run the benchmark */
|
||||
|
@ -2361,7 +2355,7 @@ int main(int argc, char* argv[]) {
|
|||
}
|
||||
|
||||
/* notify everyone the time's up */
|
||||
shm->signal = SIGNAL_RED;
|
||||
shm_hdr.signal.store(SIGNAL_RED);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2382,7 +2376,7 @@ int main(int argc, char* argv[]) {
|
|||
|
||||
/* all worker threads finished, stop the stats */
|
||||
if (args.mode == MODE_BUILD || args.iteration > 0) {
|
||||
shm->signal = SIGNAL_RED;
|
||||
shm_hdr.signal.store(SIGNAL_RED);
|
||||
}
|
||||
|
||||
/* wait for stats to stop */
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
#ifndef MAKO_HPP
|
||||
#define MAKO_HPP
|
||||
#pragma once
|
||||
|
||||
#ifndef FDB_API_VERSION
|
||||
#define FDB_API_VERSION 710
|
||||
|
@ -24,6 +23,12 @@
|
|||
#else
|
||||
#include <limits.h>
|
||||
#endif
|
||||
#include "operations.hpp"
|
||||
#include "shm.hpp"
|
||||
#include "stats.hpp"
|
||||
#include "time.hpp"
|
||||
|
||||
namespace mako {
|
||||
|
||||
constexpr const int VERBOSE_NONE = 0;
|
||||
constexpr const int VERBOSE_DEFAULT = 1;
|
||||
|
@ -35,34 +40,6 @@ constexpr const int MODE_CLEAN = 0;
|
|||
constexpr const int MODE_BUILD = 1;
|
||||
constexpr const int MODE_RUN = 2;
|
||||
|
||||
/* size of each block to get detailed latency for each operation */
|
||||
constexpr const size_t LAT_BLOCK_SIZE = 16384;
|
||||
|
||||
/* transaction specification */
|
||||
enum Operations {
|
||||
OP_GETREADVERSION,
|
||||
OP_GET,
|
||||
OP_GETRANGE,
|
||||
OP_SGET,
|
||||
OP_SGETRANGE,
|
||||
OP_UPDATE,
|
||||
OP_INSERT,
|
||||
OP_INSERTRANGE,
|
||||
OP_OVERWRITE,
|
||||
OP_CLEAR,
|
||||
OP_SETCLEAR,
|
||||
OP_CLEARRANGE,
|
||||
OP_SETCLEARRANGE,
|
||||
OP_COMMIT,
|
||||
OP_TRANSACTION, /* pseudo-operation - cumulative time for the operation + commit */
|
||||
OP_READ_BG,
|
||||
MAX_OP /* must be the last item */
|
||||
};
|
||||
|
||||
constexpr const int OP_COUNT = 0;
|
||||
constexpr const int OP_RANGE = 1;
|
||||
constexpr const int OP_REVERSE = 2;
|
||||
|
||||
/* for long arguments */
|
||||
enum Arguments {
|
||||
ARG_KEYLEN,
|
||||
|
@ -94,14 +71,14 @@ enum Arguments {
|
|||
|
||||
enum TPSChangeTypes { TPS_SIN, TPS_SQUARE, TPS_PULSE };
|
||||
|
||||
/* we set mako_txnspec_t and mako_args_t only once in the master process,
|
||||
/* we set txnspec_t and args_t only once in the master process,
|
||||
* and won't be touched by child processes.
|
||||
*/
|
||||
|
||||
typedef struct {
|
||||
struct txnspec_t {
|
||||
/* for each operation, it stores "count", "range" and "reverse" */
|
||||
int ops[MAX_OP][3];
|
||||
} mako_txnspec_t;
|
||||
};
|
||||
|
||||
constexpr const int LOGGROUP_MAX = 256;
|
||||
constexpr const int KNOB_MAX = 256;
|
||||
|
@ -111,7 +88,7 @@ constexpr const int NUM_DATABASES_MAX = 10;
|
|||
constexpr const int MAX_BG_IDS = 1000;
|
||||
|
||||
/* benchmark parameters */
|
||||
struct mako_args_t {
|
||||
struct args_t {
|
||||
int api_version;
|
||||
int json;
|
||||
int num_processes;
|
||||
|
@ -131,7 +108,7 @@ struct mako_args_t {
|
|||
int zipf;
|
||||
int commit_get;
|
||||
int verbose;
|
||||
mako_txnspec_t txnspec;
|
||||
txnspec_t txnspec;
|
||||
char cluster_files[NUM_CLUSTERS_MAX][PATH_MAX];
|
||||
int num_fdb_clusters;
|
||||
int num_databases;
|
||||
|
@ -158,205 +135,21 @@ constexpr const int SIGNAL_RED = 0;
|
|||
constexpr const int SIGNAL_GREEN = 1;
|
||||
constexpr const int SIGNAL_OFF = 2;
|
||||
|
||||
struct mako_shmhdr_t {
|
||||
std::atomic<int> signal;
|
||||
std::atomic<int> readycount;
|
||||
std::atomic<double> throttle_factor;
|
||||
std::atomic<int> stopcount;
|
||||
};
|
||||
|
||||
/* per-process information */
|
||||
typedef struct {
|
||||
int worker_id;
|
||||
pid_t parent_id;
|
||||
mako_args_t* args;
|
||||
mako_shmhdr_t* shm;
|
||||
std::vector<fdb::Database> databases;
|
||||
} process_info_t;
|
||||
|
||||
/* time measurement helpers */
|
||||
using std::chrono::steady_clock;
|
||||
using timepoint_t = decltype(steady_clock::now());
|
||||
using timediff_t = decltype(std::declval<timepoint_t>()-std::declval<timepoint_t>());
|
||||
|
||||
template <typename Duration>
|
||||
double to_double_seconds(Duration duration) {
|
||||
return std::chrono::duration_cast<std::chrono::duration<double>>(duration).count();
|
||||
}
|
||||
|
||||
template <typename Duration>
|
||||
uint64_t to_integer_seconds(Duration duration) {
|
||||
return std::chrono::duration_cast<std::chrono::duration<uint64_t>>(duration).count();
|
||||
}
|
||||
|
||||
template <typename Duration>
|
||||
uint64_t to_integer_microseconds(Duration duration) {
|
||||
return std::chrono::duration_cast<std::chrono::duration<uint64_t, std::micro>>(duration).count();
|
||||
}
|
||||
|
||||
class alignas(64) mako_stats_t {
|
||||
uint64_t xacts;
|
||||
uint64_t conflicts;
|
||||
uint64_t total_errors;
|
||||
uint64_t ops[MAX_OP];
|
||||
uint64_t errors[MAX_OP];
|
||||
uint64_t latency_samples[MAX_OP];
|
||||
uint64_t latency_us_total[MAX_OP];
|
||||
uint64_t latency_us_min[MAX_OP];
|
||||
uint64_t latency_us_max[MAX_OP];
|
||||
|
||||
public:
|
||||
mako_stats_t() noexcept {
|
||||
memset(this, 0, sizeof(mako_stats_t));
|
||||
memset(latency_us_min, 0xff, sizeof(latency_us_min));
|
||||
}
|
||||
|
||||
mako_stats_t(const mako_stats_t& other) noexcept = default;
|
||||
mako_stats_t& operator=(const mako_stats_t& other) noexcept = default;
|
||||
|
||||
uint64_t get_tx_count() const noexcept { return xacts; }
|
||||
|
||||
uint64_t get_conflict_count() const noexcept { return conflicts; }
|
||||
|
||||
uint64_t get_op_count(int op) const noexcept { return ops[op]; }
|
||||
|
||||
uint64_t get_error_count(int op) const noexcept { return errors[op]; }
|
||||
|
||||
uint64_t get_total_error_count() const noexcept { return total_errors; }
|
||||
|
||||
uint64_t get_latency_sample_count(int op) const noexcept { return latency_samples[op]; }
|
||||
|
||||
uint64_t get_latency_us_total(int op) const noexcept { return latency_us_total[op]; }
|
||||
|
||||
uint64_t get_latency_us_min(int op) const noexcept { return latency_us_min[op]; }
|
||||
|
||||
uint64_t get_latency_us_max(int op) const noexcept { return latency_us_max[op]; }
|
||||
|
||||
// with 'this' as final aggregation, factor in 'other'
|
||||
void combine(const mako_stats_t& other) {
|
||||
xacts += other.xacts;
|
||||
conflicts += other.conflicts;
|
||||
for (auto op = 0; op < MAX_OP; op++) {
|
||||
ops[op] += other.ops[op];
|
||||
errors[op] += other.errors[op];
|
||||
total_errors += other.errors[op];
|
||||
latency_samples[op] += other.latency_samples[op];
|
||||
latency_us_total[op] += other.latency_us_total[op];
|
||||
if (latency_us_min[op] > other.latency_us_min[op])
|
||||
latency_us_min[op] = other.latency_us_min[op];
|
||||
if (latency_us_max[op] < other.latency_us_max[op])
|
||||
latency_us_max[op] = other.latency_us_max[op];
|
||||
}
|
||||
}
|
||||
|
||||
void incr_tx_count() noexcept { xacts++; }
|
||||
void incr_conflict_count() noexcept { conflicts++; }
|
||||
|
||||
// non-commit write operations aren't measured for time.
|
||||
void incr_op_count(int op) noexcept { ops[op]++; }
|
||||
|
||||
void incr_error_count(int op) noexcept {
|
||||
total_errors++;
|
||||
errors[op]++;
|
||||
}
|
||||
|
||||
void add_latency(int op, timediff_t diff) noexcept {
|
||||
const auto latency_us = to_integer_microseconds(diff);
|
||||
latency_samples[op]++;
|
||||
latency_us_total[op] += latency_us;
|
||||
if (latency_us_min[op] > latency_us)
|
||||
latency_us_min[op] = latency_us;
|
||||
if (latency_us_max[op] < latency_us)
|
||||
latency_us_max[op] = latency_us;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
||||
/* memory block allocated to each operation when collecting detailed latency */
|
||||
class lat_block_t {
|
||||
uint64_t samples[LAT_BLOCK_SIZE]{
|
||||
0,
|
||||
};
|
||||
uint32_t index{ 0 };
|
||||
|
||||
public:
|
||||
lat_block_t() noexcept = default;
|
||||
bool full() const noexcept { return index >= LAT_BLOCK_SIZE; }
|
||||
void put(timediff_t td) {
|
||||
assert(!full());
|
||||
samples[index++] = to_integer_microseconds(td);
|
||||
}
|
||||
// return {data block, number of samples}
|
||||
std::pair<uint64_t const*, size_t> data() const noexcept { return { samples, index }; }
|
||||
};
|
||||
|
||||
/* collect sampled latencies */
|
||||
class sample_bin {
|
||||
std::list<lat_block_t> blocks;
|
||||
|
||||
public:
|
||||
void reserve_one() {
|
||||
if (blocks.empty())
|
||||
blocks.emplace_back();
|
||||
}
|
||||
|
||||
void put(timediff_t td) {
|
||||
if (blocks.empty() || blocks.back().full())
|
||||
blocks.emplace_back();
|
||||
blocks.back().put(td);
|
||||
}
|
||||
|
||||
// iterate & apply for each block user function void(uint64_t const*, size_t)
|
||||
template <typename Func>
|
||||
void for_each_block(Func&& fn) const {
|
||||
for (const auto& block : blocks) {
|
||||
auto [ptr, cnt] = block.data();
|
||||
fn(ptr, cnt);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
using sample_bin_array_t = std::array<sample_bin, MAX_OP>;
|
||||
/* args for threads */
|
||||
struct alignas(64) thread_args_t {
|
||||
int worker_id;
|
||||
int thread_id;
|
||||
int database_index; // index of the database to do work to
|
||||
pid_t parent_id;
|
||||
sample_bin_array_t sample_bins;
|
||||
process_info_t* process;
|
||||
args_t const* args;
|
||||
shm_access_t shm;
|
||||
fdb::Database database; // database to work with
|
||||
};
|
||||
|
||||
/* process type */
|
||||
typedef enum { proc_master = 0, proc_worker, proc_stats } proc_type_t;
|
||||
|
||||
// determines how resultant future will be handled
|
||||
enum class StepKind {
|
||||
NONE, ///< not part of the table: OP_TRANSACTION, OP_COMMIT
|
||||
IMM, ///< non-future ops that return immediately: e.g. set, clear_range
|
||||
READ, ///< blockable reads: get(), get_range(), get_read_version, ...
|
||||
COMMIT, ///< self-explanatory
|
||||
ON_ERROR ///< future is a result of tx.on_error()
|
||||
};
|
||||
|
||||
class OpDesc {
|
||||
std::string_view name_;
|
||||
std::vector<StepKind> steps_;
|
||||
bool needs_commit_;
|
||||
|
||||
public:
|
||||
OpDesc(std::string_view name, std::vector<StepKind>&& steps, bool needs_commit)
|
||||
: name_(name), steps_(std::move(steps)), needs_commit_(needs_commit) {}
|
||||
|
||||
std::string_view name() const noexcept { return name_; }
|
||||
// what
|
||||
StepKind step_kind(int step) const noexcept {
|
||||
assert(step < steps());
|
||||
return steps_[step];
|
||||
}
|
||||
// how many steps in this op?
|
||||
int steps() const noexcept { return static_cast<int>(steps_.size()); }
|
||||
// does the op needs to commit some time after its final step?
|
||||
bool needs_commit() const noexcept { return needs_commit_; }
|
||||
};
|
||||
} // namespace mako
|
||||
|
||||
#endif /* MAKO_HPP */
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
#ifndef MAKO_OPERATIONS_HPP
|
||||
#define MAKO_OPERATIONS_HPP
|
||||
|
||||
#include <vector>
|
||||
#include <string_view>
|
||||
|
||||
namespace mako {
|
||||
|
||||
/* transaction specification */
|
||||
enum Operations {
|
||||
OP_GETREADVERSION,
|
||||
OP_GET,
|
||||
OP_GETRANGE,
|
||||
OP_SGET,
|
||||
OP_SGETRANGE,
|
||||
OP_UPDATE,
|
||||
OP_INSERT,
|
||||
OP_INSERTRANGE,
|
||||
OP_OVERWRITE,
|
||||
OP_CLEAR,
|
||||
OP_SETCLEAR,
|
||||
OP_CLEARRANGE,
|
||||
OP_SETCLEARRANGE,
|
||||
OP_COMMIT,
|
||||
OP_TRANSACTION, /* pseudo-operation - cumulative time for the operation + commit */
|
||||
OP_READ_BG,
|
||||
MAX_OP /* must be the last item */
|
||||
};
|
||||
|
||||
constexpr const int OP_COUNT = 0;
|
||||
constexpr const int OP_RANGE = 1;
|
||||
constexpr const int OP_REVERSE = 2;
|
||||
|
||||
// determines how resultant future will be handled
|
||||
enum class StepKind {
|
||||
NONE, ///< not part of the table: OP_TRANSACTION, OP_COMMIT
|
||||
IMM, ///< non-future ops that return immediately: e.g. set, clear_range
|
||||
READ, ///< blockable reads: get(), get_range(), get_read_version, ...
|
||||
COMMIT, ///< self-explanatory
|
||||
ON_ERROR ///< future is a result of tx.on_error()
|
||||
};
|
||||
|
||||
class OpDesc {
|
||||
std::string_view name_;
|
||||
std::vector<StepKind> steps_;
|
||||
bool needs_commit_;
|
||||
|
||||
public:
|
||||
OpDesc(std::string_view name, std::vector<StepKind>&& steps, bool needs_commit)
|
||||
: name_(name), steps_(std::move(steps)), needs_commit_(needs_commit) {}
|
||||
|
||||
std::string_view name() const noexcept { return name_; }
|
||||
// what
|
||||
StepKind step_kind(int step) const noexcept {
|
||||
assert(step < steps());
|
||||
return steps_[step];
|
||||
}
|
||||
// how many steps in this op?
|
||||
int steps() const noexcept { return static_cast<int>(steps_.size()); }
|
||||
// does the op needs to commit some time after its final step?
|
||||
bool needs_commit() const noexcept { return needs_commit_; }
|
||||
};
|
||||
|
||||
} // namespace mako
|
||||
|
||||
#endif /* MAKO_OPERATIONS_HPP */
|
|
@ -0,0 +1,75 @@
|
|||
#ifndef MAKO_SHM_HPP
|
||||
#define MAKO_SHM_HPP
|
||||
|
||||
#include <atomic>
|
||||
#include <cassert>
|
||||
#include <cstdint>
|
||||
#include "stats.hpp"
|
||||
|
||||
namespace mako {
|
||||
|
||||
struct shmhdr_t {
|
||||
std::atomic<int> signal;
|
||||
std::atomic<int> readycount;
|
||||
std::atomic<double> throttle_factor;
|
||||
std::atomic<int> stopcount;
|
||||
};
|
||||
|
||||
struct shm_layout_helper {
|
||||
shmhdr_t hdr;
|
||||
stats_t stats;
|
||||
};
|
||||
|
||||
inline size_t shm_storage_size(int num_processes, int num_threads) noexcept {
|
||||
assert(num_processes >= 1 && num_threads >= 1);
|
||||
return sizeof(shm_layout_helper) + sizeof(stats_t) * ((num_processes * num_threads) - 1);
|
||||
}
|
||||
|
||||
inline stats_t& shm_stats_slot(void* shm_base, int num_threads, int process_idx, int thread_idx) noexcept {
|
||||
return (&static_cast<shm_layout_helper*>(shm_base)->stats)[process_idx * num_threads + thread_idx];
|
||||
}
|
||||
|
||||
class shm_access_t {
|
||||
protected:
|
||||
void* base;
|
||||
int num_processes;
|
||||
int num_threads;
|
||||
|
||||
public:
|
||||
shm_access_t(void* shm, int num_processes, int num_threads) noexcept
|
||||
: base(shm), num_processes(num_processes), num_threads(num_threads) {}
|
||||
|
||||
shm_access_t() noexcept : shm_access_t(nullptr, 0, 0) {}
|
||||
|
||||
shm_access_t(const shm_access_t&) noexcept = default;
|
||||
|
||||
shm_access_t& operator=(const shm_access_t&) noexcept = default;
|
||||
|
||||
size_t storage_size() const noexcept { return shm_storage_size(num_processes, num_threads); }
|
||||
|
||||
void reset() noexcept { memset(base, 0, storage_size()); }
|
||||
|
||||
shmhdr_t const& header_const() const noexcept { return *static_cast<shmhdr_t const*>(base); }
|
||||
|
||||
shmhdr_t& header() const noexcept { return *static_cast<shmhdr_t*>(base); }
|
||||
|
||||
stats_t const* stats_const_array() const noexcept {
|
||||
return &shm_stats_slot(base, num_threads, 0 /*process_id*/, 0 /*thread_id*/);
|
||||
}
|
||||
|
||||
stats_t* stats_array() const noexcept {
|
||||
return &shm_stats_slot(base, num_threads, 0 /*process_id*/, 0 /*thread_id*/);
|
||||
}
|
||||
|
||||
stats_t const& stats_const_slot(int process_idx, int thread_idx) const noexcept {
|
||||
return shm_stats_slot(base, num_threads, process_idx, thread_idx);
|
||||
}
|
||||
|
||||
stats_t& stats_slot(int process_idx, int thread_idx) const noexcept {
|
||||
return shm_stats_slot(base, num_threads, process_idx, thread_idx);
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace mako
|
||||
|
||||
#endif /* MAKO_SHM_HPP */
|
|
@ -0,0 +1,139 @@
|
|||
#ifndef MAKO_STATS_HPP
|
||||
#define MAKO_STATS_HPP
|
||||
|
||||
#include <cstdint>
|
||||
#include <cstring>
|
||||
#include <list>
|
||||
#include <utility>
|
||||
#include "operations.hpp"
|
||||
#include "time.hpp"
|
||||
|
||||
namespace mako {
|
||||
|
||||
/* size of each block to get detailed latency for each operation */
|
||||
constexpr const size_t LAT_BLOCK_SIZE = 4095;
|
||||
|
||||
/* memory block allocated to each operation when collecting detailed latency */
|
||||
class lat_block_t {
|
||||
uint64_t samples[LAT_BLOCK_SIZE]{
|
||||
0,
|
||||
};
|
||||
uint32_t index{ 0 };
|
||||
|
||||
public:
|
||||
lat_block_t() noexcept = default;
|
||||
bool full() const noexcept { return index >= LAT_BLOCK_SIZE; }
|
||||
void put(timediff_t td) {
|
||||
assert(!full());
|
||||
samples[index++] = to_integer_microseconds(td);
|
||||
}
|
||||
// return {data block, number of samples}
|
||||
std::pair<uint64_t const*, size_t> data() const noexcept { return { samples, index }; }
|
||||
};
|
||||
|
||||
/* collect sampled latencies */
|
||||
class sample_bin {
|
||||
std::list<lat_block_t> blocks;
|
||||
|
||||
public:
|
||||
void reserve_one() {
|
||||
if (blocks.empty())
|
||||
blocks.emplace_back();
|
||||
}
|
||||
|
||||
void put(timediff_t td) {
|
||||
if (blocks.empty() || blocks.back().full())
|
||||
blocks.emplace_back();
|
||||
blocks.back().put(td);
|
||||
}
|
||||
|
||||
// iterate & apply for each block user function void(uint64_t const*, size_t)
|
||||
template <typename Func>
|
||||
void for_each_block(Func&& fn) const {
|
||||
for (const auto& block : blocks) {
|
||||
auto [ptr, cnt] = block.data();
|
||||
fn(ptr, cnt);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
class alignas(64) stats_t {
|
||||
uint64_t xacts;
|
||||
uint64_t conflicts;
|
||||
uint64_t total_errors;
|
||||
uint64_t ops[MAX_OP];
|
||||
uint64_t errors[MAX_OP];
|
||||
uint64_t latency_samples[MAX_OP];
|
||||
uint64_t latency_us_total[MAX_OP];
|
||||
uint64_t latency_us_min[MAX_OP];
|
||||
uint64_t latency_us_max[MAX_OP];
|
||||
|
||||
public:
|
||||
stats_t() noexcept {
|
||||
memset(this, 0, sizeof(stats_t));
|
||||
memset(latency_us_min, 0xff, sizeof(latency_us_min));
|
||||
}
|
||||
|
||||
stats_t(const stats_t& other) noexcept = default;
|
||||
stats_t& operator=(const stats_t& other) noexcept = default;
|
||||
|
||||
uint64_t get_tx_count() const noexcept { return xacts; }
|
||||
|
||||
uint64_t get_conflict_count() const noexcept { return conflicts; }
|
||||
|
||||
uint64_t get_op_count(int op) const noexcept { return ops[op]; }
|
||||
|
||||
uint64_t get_error_count(int op) const noexcept { return errors[op]; }
|
||||
|
||||
uint64_t get_total_error_count() const noexcept { return total_errors; }
|
||||
|
||||
uint64_t get_latency_sample_count(int op) const noexcept { return latency_samples[op]; }
|
||||
|
||||
uint64_t get_latency_us_total(int op) const noexcept { return latency_us_total[op]; }
|
||||
|
||||
uint64_t get_latency_us_min(int op) const noexcept { return latency_us_min[op]; }
|
||||
|
||||
uint64_t get_latency_us_max(int op) const noexcept { return latency_us_max[op]; }
|
||||
|
||||
// with 'this' as final aggregation, factor in 'other'
|
||||
void combine(const stats_t& other) {
|
||||
xacts += other.xacts;
|
||||
conflicts += other.conflicts;
|
||||
for (auto op = 0; op < MAX_OP; op++) {
|
||||
ops[op] += other.ops[op];
|
||||
errors[op] += other.errors[op];
|
||||
total_errors += other.errors[op];
|
||||
latency_samples[op] += other.latency_samples[op];
|
||||
latency_us_total[op] += other.latency_us_total[op];
|
||||
if (latency_us_min[op] > other.latency_us_min[op])
|
||||
latency_us_min[op] = other.latency_us_min[op];
|
||||
if (latency_us_max[op] < other.latency_us_max[op])
|
||||
latency_us_max[op] = other.latency_us_max[op];
|
||||
}
|
||||
}
|
||||
|
||||
void incr_tx_count() noexcept { xacts++; }
|
||||
void incr_conflict_count() noexcept { conflicts++; }
|
||||
|
||||
// non-commit write operations aren't measured for time.
|
||||
void incr_op_count(int op) noexcept { ops[op]++; }
|
||||
|
||||
void incr_error_count(int op) noexcept {
|
||||
total_errors++;
|
||||
errors[op]++;
|
||||
}
|
||||
|
||||
void add_latency(int op, timediff_t diff) noexcept {
|
||||
const auto latency_us = to_integer_microseconds(diff);
|
||||
latency_samples[op]++;
|
||||
latency_us_total[op] += latency_us;
|
||||
if (latency_us_min[op] > latency_us)
|
||||
latency_us_min[op] = latency_us;
|
||||
if (latency_us_max[op] < latency_us)
|
||||
latency_us_max[op] = latency_us;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace mako
|
||||
|
||||
#endif /* MAKO_STATS_HPP */
|
|
@ -0,0 +1,57 @@
|
|||
#ifndef MAKO_TIME_HPP
|
||||
#define MAKO_TIME_HPP
|
||||
|
||||
#include <chrono>
|
||||
|
||||
namespace mako {
|
||||
|
||||
/* time measurement helpers */
|
||||
using std::chrono::steady_clock;
|
||||
using timepoint_t = decltype(steady_clock::now());
|
||||
using timediff_t = decltype(std::declval<timepoint_t>() - std::declval<timepoint_t>());
|
||||
|
||||
template <typename Duration>
|
||||
double to_double_seconds(Duration duration) {
|
||||
return std::chrono::duration_cast<std::chrono::duration<double>>(duration).count();
|
||||
}
|
||||
|
||||
template <typename Duration>
|
||||
uint64_t to_integer_seconds(Duration duration) {
|
||||
return std::chrono::duration_cast<std::chrono::duration<uint64_t>>(duration).count();
|
||||
}
|
||||
|
||||
template <typename Duration>
|
||||
uint64_t to_integer_microseconds(Duration duration) {
|
||||
return std::chrono::duration_cast<std::chrono::duration<uint64_t, std::micro>>(duration).count();
|
||||
}
|
||||
|
||||
// timing helpers
|
||||
struct start_at_ctor {};
|
||||
|
||||
class Stopwatch {
|
||||
timepoint_t p1, p2;
|
||||
|
||||
public:
|
||||
Stopwatch() noexcept = default;
|
||||
Stopwatch(start_at_ctor) noexcept { start(); }
|
||||
Stopwatch(timepoint_t start_time) noexcept : p1(start_time), p2() {}
|
||||
Stopwatch(const Stopwatch&) noexcept = default;
|
||||
Stopwatch& operator=(const Stopwatch&) noexcept = default;
|
||||
timepoint_t get_start() const noexcept { return p1; }
|
||||
timepoint_t get_stop() const noexcept { return p2; }
|
||||
void start() noexcept { p1 = steady_clock::now(); }
|
||||
Stopwatch& stop() noexcept {
|
||||
p2 = steady_clock::now();
|
||||
return *this;
|
||||
}
|
||||
Stopwatch& set_stop(timepoint_t p_stop) noexcept {
|
||||
p2 = p_stop;
|
||||
return *this;
|
||||
}
|
||||
void start_from_stop() noexcept { p1 = p2; }
|
||||
auto diff() const noexcept { return p2 - p1; }
|
||||
};
|
||||
|
||||
} // namespace mako
|
||||
|
||||
#endif /* MAKO_TIME_HPP */
|
|
@ -6,6 +6,8 @@
|
|||
#include <cstring>
|
||||
#include <fmt/format.h>
|
||||
|
||||
namespace mako {
|
||||
|
||||
/* uniform-distribution random */
|
||||
int urand(int low, int high) {
|
||||
double r = rand() / (1.0 + RAND_MAX);
|
||||
|
@ -47,3 +49,5 @@ int digits(int num) {
|
|||
}
|
||||
return digits;
|
||||
}
|
||||
|
||||
} // namespace mako
|
||||
|
|
|
@ -9,6 +9,9 @@
|
|||
#include <type_traits>
|
||||
|
||||
#include <fmt/format.h>
|
||||
|
||||
namespace mako {
|
||||
|
||||
/* uniform-distribution random */
|
||||
/* return a uniform random number between low and high, both inclusive */
|
||||
int urand(int low, int high);
|
||||
|
@ -69,7 +72,7 @@ int digits(int num);
|
|||
* (str) is appended with concat([padding], PREFIX)
|
||||
*/
|
||||
template <bool Clear = true, typename Char>
|
||||
void genkeyprefix(std::basic_string<Char>& str, std::string_view prefix, mako_args_t const& args) {
|
||||
void genkeyprefix(std::basic_string<Char>& str, std::string_view prefix, args_t const& args) {
|
||||
// concat('x' * padding_len, key_prefix)
|
||||
if constexpr (Clear)
|
||||
str.clear();
|
||||
|
@ -83,7 +86,7 @@ void genkeyprefix(std::basic_string<Char>& str, std::string_view prefix, mako_ar
|
|||
/* generate a key for a given key number */
|
||||
/* prefix is "mako" by default, prefixpadding = 1 means 'x' will be in front rather than trailing the keyname */
|
||||
template <bool Clear = true, typename Char>
|
||||
void genkey(std::basic_string<Char>& str, std::string_view prefix, mako_args_t const& args, int num) {
|
||||
void genkey(std::basic_string<Char>& str, std::string_view prefix, args_t const& args, int num) {
|
||||
static_assert(sizeof(Char) == 1);
|
||||
const auto pad_len = args.prefixpadding ? args.key_length - (static_cast<int>(prefix.size()) + args.row_digits) : 0;
|
||||
assert(pad_len >= 0);
|
||||
|
@ -127,40 +130,6 @@ public:
|
|||
}
|
||||
};
|
||||
|
||||
// timing helpers
|
||||
struct start_at_ctor{};
|
||||
|
||||
class Stopwatch {
|
||||
timepoint_t p1, p2;
|
||||
public:
|
||||
Stopwatch() noexcept = default;
|
||||
Stopwatch(start_at_ctor) noexcept {
|
||||
start();
|
||||
}
|
||||
Stopwatch(timepoint_t start_time) noexcept : p1(start_time), p2() {}
|
||||
Stopwatch(const Stopwatch&) noexcept = default;
|
||||
Stopwatch& operator=(const Stopwatch&) noexcept = default;
|
||||
timepoint_t get_start() const noexcept { return p1; }
|
||||
timepoint_t get_stop() const noexcept { return p2; }
|
||||
void start() noexcept {
|
||||
p1 = steady_clock::now();
|
||||
}
|
||||
Stopwatch& stop() noexcept {
|
||||
p2 = steady_clock::now();
|
||||
return *this;
|
||||
}
|
||||
Stopwatch& set_stop(timepoint_t p_stop) noexcept {
|
||||
p2 = p_stop;
|
||||
return *this;
|
||||
}
|
||||
void start_from_stop() noexcept {
|
||||
p1 = p2;
|
||||
}
|
||||
auto diff() const noexcept {
|
||||
return p2 - p1;
|
||||
}
|
||||
};
|
||||
|
||||
// trace helpers
|
||||
constexpr const int STATS_TITLE_WIDTH = 12;
|
||||
constexpr const int STATS_FIELD_WIDTH = 12;
|
||||
|
@ -193,4 +162,6 @@ void put_field_f(Value&& value, int precision) {
|
|||
fmt::print("{0: >{1}.{2}f} ", std::forward<Value>(value), STATS_FIELD_WIDTH, precision);
|
||||
}
|
||||
|
||||
} // namespace mako
|
||||
|
||||
#endif /* UTILS_HPP */
|
||||
|
|
Loading…
Reference in New Issue