mako: add cpu usage statistics, including some refactoring of statistics accumulation

This commit is contained in:
Christian Wende 2023-01-20 11:14:36 +01:00
parent df0999bee2
commit 7d1e8e10f0
4 changed files with 297 additions and 135 deletions

View File

@ -41,7 +41,7 @@ struct ResumableStateForPopulate : std::enable_shared_from_this<ResumableStateFo
fdb::Transaction tx;
boost::asio::io_context& io_context;
Arguments const& args;
ThreadStatistics& stats;
WorkerStatistics& stats;
std::atomic<int>& stopcount;
int key_begin;
int key_end;
@ -57,7 +57,7 @@ struct ResumableStateForPopulate : std::enable_shared_from_this<ResumableStateFo
fdb::Transaction tx,
boost::asio::io_context& io_context,
Arguments const& args,
ThreadStatistics& stats,
WorkerStatistics& stats,
std::atomic<int>& stopcount,
int key_begin,
int key_end)
@ -79,7 +79,7 @@ struct ResumableStateForRunWorkload : std::enable_shared_from_this<ResumableStat
fdb::Transaction tx;
boost::asio::io_context& io_context;
Arguments const& args;
ThreadStatistics& stats;
WorkerStatistics& stats;
int64_t total_xacts;
std::atomic<int>& stopcount;
std::atomic<int> const& signal;
@ -99,7 +99,7 @@ struct ResumableStateForRunWorkload : std::enable_shared_from_this<ResumableStat
fdb::Transaction tx,
boost::asio::io_context& io_context,
Arguments const& args,
ThreadStatistics& stats,
WorkerStatistics& stats,
std::atomic<int>& stopcount,
std::atomic<int> const& signal,
int max_iters,

View File

@ -67,13 +67,14 @@
#include "time.hpp"
#include "rapidjson/document.h"
#include "rapidjson/error/en.h"
#include "flow/Platform.h"
namespace mako {
/* args for threads */
struct alignas(64) ThreadArgs {
int worker_id;
int thread_id;
int process_idx;
int thread_idx;
int active_tenants;
int total_tenants;
pid_t parent_id;
@ -116,7 +117,7 @@ Transaction createNewTransaction(Database db, Arguments const& args, int id = -1
tr.setOption(FDB_TR_OPTION_AUTHORIZATION_TOKEN, token_map_iter->second);
} else {
logr.error("could not find token for tenant '{}'", tenant_name);
exit(1);
_exit(1);
}
}
return tr;
@ -179,10 +180,10 @@ int cleanupNormalKeyspace(Database db, Arguments const& args) {
}
/* populate database */
int populate(Database db, const ThreadArgs& thread_args, int thread_tps, ThreadStatistics& stats) {
int populate(Database db, const ThreadArgs& thread_args, int thread_tps, WorkerStatistics& stats) {
Arguments const& args = *thread_args.args;
const auto worker_id = thread_args.worker_id;
const auto thread_id = thread_args.thread_id;
const auto process_idx = thread_args.process_idx;
const auto thread_idx = thread_args.thread_idx;
auto xacts = 0;
auto keystr = ByteString{};
auto valstr = ByteString{};
@ -204,8 +205,8 @@ int populate(Database db, const ThreadArgs& thread_args, int thread_tps, ThreadS
// Each tenant should have the same range populated
for (auto t_id = 0; t_id < populate_iters; ++t_id) {
Transaction tx = createNewTransaction(db, args, t_id, args.active_tenants > 0 ? tenants : nullptr);
const auto key_begin = insertBegin(args.rows, worker_id, thread_id, args.num_processes, args.num_threads);
const auto key_end = insertEnd(args.rows, worker_id, thread_id, args.num_processes, args.num_threads);
const auto key_begin = insertBegin(args.rows, process_idx, thread_idx, args.num_processes, args.num_threads);
const auto key_end = insertEnd(args.rows, process_idx, thread_idx, args.num_processes, args.num_threads);
auto key_checkpoint = key_begin; // in case of commit failure, restart from this key
double required_keys = (key_end - key_begin + 1) * args.load_factor;
for (auto i = key_begin; i <= key_end; i++) {
@ -291,7 +292,7 @@ int populate(Database db, const ThreadArgs& thread_args, int thread_tps, ThreadS
return 0;
}
void updateErrorStatsRunMode(ThreadStatistics& stats, fdb::Error err, int op) {
void updateErrorStatsRunMode(WorkerStatistics& stats, fdb::Error err, int op) {
if (err) {
if (err.is(1020 /*not_commited*/)) {
stats.incrConflictCount();
@ -306,7 +307,7 @@ void updateErrorStatsRunMode(ThreadStatistics& stats, fdb::Error err, int op) {
/* run one iteration of configured transaction */
int runOneTransaction(Transaction& tx,
Arguments const& args,
ThreadStatistics& stats,
WorkerStatistics& stats,
ByteString& key1,
ByteString& key2,
ByteString& val) {
@ -411,7 +412,7 @@ int runWorkload(Database db,
std::atomic<double> const& throttle_factor,
int const thread_iters,
std::atomic<int> const& signal,
ThreadStatistics& stats,
WorkerStatistics& stats,
int const dotrace,
int const dotagging) {
auto traceid = std::string{};
@ -528,20 +529,20 @@ int runWorkload(Database db,
return rc;
}
std::string getStatsFilename(std::string_view dirname, int worker_id, int thread_id, int op) {
std::string getStatsFilename(std::string_view dirname, int process_idx, int thread_id, int op) {
return fmt::format("{}/{}_{}_{}", dirname, worker_id + 1, thread_id + 1, opTable[op].name());
return fmt::format("{}/{}_{}_{}", dirname, process_idx + 1, thread_id + 1, opTable[op].name());
}
std::string getStatsFilename(std::string_view dirname, int worker_id, int thread_id) {
return fmt::format("{}/{}_{}", dirname, worker_id + 1, thread_id + 1);
std::string getStatsFilename(std::string_view dirname, int process_idx, int thread_id) {
return fmt::format("{}/{}_{}", dirname, process_idx + 1, thread_id + 1);
}
void dumpThreadSamples(Arguments const& args,
pid_t parent_id,
int worker_id,
int process_idx,
int thread_id,
const ThreadStatistics& stats,
const WorkerStatistics& stats,
bool overwrite = true) {
const auto dirname = fmt::format("{}{}", TEMP_DATA_STORE, parent_id);
const auto rc = mkdir(dirname.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
@ -551,21 +552,21 @@ void dumpThreadSamples(Arguments const& args,
}
for (auto op = 0; op < MAX_OP; op++) {
if (args.txnspec.ops[op][OP_COUNT] > 0 || isAbstractOp(op)) {
stats.writeToFile(getStatsFilename(dirname, worker_id, thread_id, op), op);
stats.writeToFile(getStatsFilename(dirname, process_idx, thread_id, op), op);
}
}
}
void runAsyncWorkload(Arguments const& args,
pid_t pid_main,
int worker_id,
int process_idx,
shared_memory::Access shm,
boost::asio::io_context& io_context,
std::vector<Database>& databases) {
auto dump_samples = [&args, pid_main, worker_id](auto&& states) {
auto dump_samples = [&args, pid_main, process_idx](auto&& states) {
auto overwrite = true; /* overwrite or append */
for (const auto& state : states) {
dumpThreadSamples(args, pid_main, worker_id, 0 /*thread_id*/, state->stats, overwrite);
dumpThreadSamples(args, pid_main, process_idx, 0 /*thread_id*/, state->stats, overwrite);
overwrite = false;
}
};
@ -573,16 +574,16 @@ void runAsyncWorkload(Arguments const& args,
if (args.mode == MODE_BUILD) {
auto states = std::vector<PopulateStateHandle>(args.async_xacts);
for (auto i = 0; i < args.async_xacts; i++) {
const auto key_begin = insertBegin(args.rows, worker_id, i, args.num_processes, args.async_xacts);
const auto key_end = insertEnd(args.rows, worker_id, i, args.num_processes, args.async_xacts);
const auto key_begin = insertBegin(args.rows, process_idx, i, args.num_processes, args.async_xacts);
const auto key_end = insertEnd(args.rows, process_idx, i, args.num_processes, args.async_xacts);
auto db = databases[i % args.num_databases];
auto state =
std::make_shared<ResumableStateForPopulate>(Logger(WorkerProcess{}, args.verbose, worker_id, i),
std::make_shared<ResumableStateForPopulate>(Logger(WorkerProcess{}, args.verbose, process_idx, i),
db,
createNewTransaction(db, args),
io_context,
args,
shm.statsSlot(worker_id, i),
shm.workerStatsSlot(process_idx, i),
stopcount,
key_begin,
key_end);
@ -605,17 +606,17 @@ void runAsyncWorkload(Arguments const& args,
const auto max_iters =
args.iteration == 0
? -1
: computeThreadIters(args.iteration, worker_id, i, args.num_processes, args.async_xacts);
: computeThreadIters(args.iteration, process_idx, i, args.num_processes, args.async_xacts);
// argument validation should ensure max_iters > 0
assert(args.iteration == 0 || max_iters > 0);
auto state =
std::make_shared<ResumableStateForRunWorkload>(Logger(WorkerProcess{}, args.verbose, worker_id, i),
std::make_shared<ResumableStateForRunWorkload>(Logger(WorkerProcess{}, args.verbose, process_idx, i),
db,
createNewTransaction(db, args),
io_context,
args,
shm.statsSlot(worker_id, i),
shm.workerStatsSlot(process_idx, i),
stopcount,
shm.headerConst().signal,
max_iters,
@ -637,32 +638,34 @@ void runAsyncWorkload(Arguments const& args,
/* mako worker thread */
void workerThread(const ThreadArgs& thread_args) {
auto time_start = steady_clock::now();
const auto& args = *thread_args.args;
const auto parent_id = thread_args.parent_id;
const auto worker_id = thread_args.worker_id;
const auto thread_id = thread_args.thread_id;
const auto dotrace = (worker_id == 0 && thread_id == 0 && args.txntrace) ? args.txntrace : 0;
const auto process_idx = thread_args.process_idx;
const auto thread_idx = thread_args.thread_idx;
const auto dotrace = (process_idx == 0 && thread_idx == 0 && args.txntrace) ? args.txntrace : 0;
auto database = thread_args.database;
const auto dotagging = args.txntagging;
const auto& signal = thread_args.shm.headerConst().signal;
const auto& throttle_factor = thread_args.shm.headerConst().throttle_factor;
auto& readycount = thread_args.shm.header().readycount;
auto& stopcount = thread_args.shm.header().stopcount;
auto& stats = thread_args.shm.statsSlot(worker_id, thread_id);
logr = Logger(WorkerProcess{}, args.verbose, worker_id, thread_id);
auto& stats = thread_args.shm.workerStatsSlot(process_idx, thread_idx);
logr = Logger(WorkerProcess{}, args.verbose, process_idx, thread_idx);
logr.debug("started, tid: {}", reinterpret_cast<uint64_t>(pthread_self()));
const auto thread_tps =
args.tpsmax == 0 ? 0
: computeThreadTps(args.tpsmax, worker_id, thread_id, args.num_processes, args.num_threads);
: computeThreadTps(args.tpsmax, process_idx, thread_idx, args.num_processes, args.num_threads);
// argument validation should ensure thread_tps > 0
assert(args.tpsmax == 0 || thread_tps > 0);
const auto thread_iters =
args.iteration == 0
? -1
: computeThreadIters(args.iteration, worker_id, thread_id, args.num_processes, args.num_threads);
: computeThreadIters(args.iteration, process_idx, thread_idx, args.num_processes, args.num_threads);
// argument validation should ensure thread_iters > 0
assert(args.iteration == 0 || thread_iters > 0);
@ -692,12 +695,15 @@ void workerThread(const ThreadArgs& thread_args) {
}
if (args.mode == MODE_BUILD || args.mode == MODE_RUN) {
dumpThreadSamples(args, parent_id, worker_id, thread_id, stats);
dumpThreadSamples(args, parent_id, process_idx, thread_idx, stats);
}
thread_args.shm.threadStatsSlot(process_idx, thread_idx)
.setCPUUtilization(getProcessorTimeThread() / toDoubleSeconds(steady_clock::now() - time_start) * 100.);
}
/* mako worker process */
int workerProcessMain(Arguments const& args, int worker_id, shared_memory::Access shm, pid_t pid_main) {
int workerProcessMain(Arguments const& args, int process_idx, shared_memory::Access shm, pid_t pid_main) {
logr.debug("started");
auto err = Error{};
@ -712,12 +718,18 @@ int workerProcessMain(Arguments const& args, int worker_id, shared_memory::Acces
/* Each worker process will have its own network thread */
logr.debug("creating network thread");
auto network_thread = std::thread([parent_logr = logr]() {
auto network_thread = std::thread([parent_logr = logr, process_idx, shm]() {
auto time_start = steady_clock::now();
logr = parent_logr;
logr.debug("network thread started");
if (auto err = network::run()) {
logr.error("network::run(): {}", err.what());
}
shm.processStatsSlot(process_idx)
.setFDBNetworkCPUUtilization(getProcessorTimeThread() / toDoubleSeconds(steady_clock::now() - time_start) *
100.);
});
#if defined(__linux__)
pthread_setname_np(network_thread.native_handle(), "mako_network");
@ -762,8 +774,8 @@ int workerProcessMain(Arguments const& args, int worker_id, shared_memory::Acces
for (auto i = 0; i < args.num_threads; i++) {
auto& this_args = thread_args[i];
this_args.worker_id = worker_id;
this_args.thread_id = i;
this_args.process_idx = process_idx;
this_args.thread_idx = i;
this_args.parent_id = pid_main;
this_args.active_tenants = args.active_tenants;
this_args.total_tenants = args.total_tenants;
@ -788,11 +800,17 @@ int workerProcessMain(Arguments const& args, int worker_id, shared_memory::Acces
auto wg = WorkGuard(ctx.get_executor());
auto worker_threads = std::vector<std::thread>(args.num_threads);
for (auto i = 0; i < args.num_threads; i++) {
worker_threads[i] = std::thread([&ctx, &args, worker_id, i]() {
logr = Logger(WorkerProcess{}, args.verbose, worker_id);
worker_threads[i] = std::thread([&ctx, &args, process_idx, i, shm]() {
auto time_start = steady_clock::now();
logr = Logger(WorkerProcess{}, args.verbose, process_idx);
logr.debug("Async-mode worker thread {} started", i + 1);
ctx.run();
logr.debug("Async-mode worker thread {} finished", i + 1);
shm.threadStatsSlot(process_idx, i)
.setCPUUtilization(getProcessorTimeThread() / toDoubleSeconds(steady_clock::now() - time_start) *
100.);
});
#if defined(__linux__)
const auto thread_name = "mako_worker_" + std::to_string(i);
@ -800,7 +818,7 @@ int workerProcessMain(Arguments const& args, int worker_id, shared_memory::Acces
#endif
}
shm.header().readycount.fetch_add(args.num_threads);
runAsyncWorkload(args, pid_main, worker_id, shm, ctx, databases);
runAsyncWorkload(args, pid_main, process_idx, shm, ctx, databases);
wg.reset();
for (auto& thread : worker_threads)
thread.join();
@ -1387,7 +1405,7 @@ int parseArguments(int argc, char* argv[], Arguments& args) {
break;
case ARG_VERSION:
logr.error("Version: {}", FDB_API_VERSION);
exit(0);
_exit(0);
break;
case ARG_COMMITGET:
args.commit_get = 1;
@ -1452,7 +1470,7 @@ int parseArguments(int argc, char* argv[], Arguments& args) {
case ARG_TXNTAGGINGPREFIX:
if (strlen(optarg) > TAGPREFIXLENGTH_MAX) {
logr.error("the length of txntagging_prefix is larger than {}", TAGPREFIXLENGTH_MAX);
exit(0);
_exit(0);
}
memcpy(args.txntagging_prefix, optarg, strlen(optarg));
break;
@ -1698,15 +1716,13 @@ void Arguments::generateAuthorizationTokens() {
logr.info("generated {} tokens in {:6.3f} seconds", active_tenants, toDoubleSeconds(stopwatch.stop().diff()));
}
void printStats(Arguments const& args, ThreadStatistics const* stats, double const duration_sec, FILE* fp) {
static ThreadStatistics prev;
void printStats(Arguments const& args, WorkerStatistics const* stats, double const duration_sec, FILE* fp) {
static WorkerStatistics prev;
const auto num_effective_threads = args.async_xacts > 0 ? args.async_xacts : args.num_threads;
auto current = ThreadStatistics{};
for (auto i = 0; i < args.num_processes; i++) {
for (auto j = 0; j < num_effective_threads; j++) {
current.combine(stats[(i * num_effective_threads) + j]);
}
const auto num_workers = args.async_xacts > 0 ? args.async_xacts : args.num_threads;
auto current = WorkerStatistics{};
for (auto i = 0; i < args.num_processes * num_workers; i++) {
current.combine(stats[i]);
}
if (fp) {
@ -1803,7 +1819,7 @@ void printStatsHeader(Arguments const& args, bool show_commit, bool is_first_hea
fmt::print("\n");
}
void printThreadStats(ThreadStatistics& final_stats, Arguments args, FILE* fp, bool is_report = false) {
void printWorkerStats(WorkerStatistics& final_stats, Arguments args, FILE* fp, bool is_report = false) {
if (is_report) {
for (auto op = 0; op < MAX_OP; op++) {
@ -2046,18 +2062,26 @@ void loadSample(int pid_main, int op, std::vector<DDSketchMako>& data_points, in
}
void printReport(Arguments const& args,
ThreadStatistics const* stats,
WorkerStatistics const* worker_stats,
ThreadStatistics const* thread_stats,
ProcessStatistics const* process_stats,
double const duration_sec,
pid_t pid_main,
FILE* fp) {
auto final_stats = ThreadStatistics{};
const auto num_effective_threads = args.async_xacts > 0 ? args.async_xacts : args.num_threads;
auto final_worker_stats = WorkerStatistics{};
auto final_thread_stats = ThreadStatistics{};
auto final_process_stats = ProcessStatistics{};
const auto num_workers = args.async_xacts > 0 ? args.async_xacts : args.num_threads;
for (auto i = 0; i < args.num_processes * num_workers; i++) {
final_worker_stats.combine(worker_stats[i]);
}
for (auto i = 0; i < args.num_processes * args.num_threads; i++) {
final_thread_stats.combine(thread_stats[i]);
}
for (auto i = 0; i < args.num_processes; i++) {
for (auto j = 0; j < num_effective_threads; j++) {
const auto idx = i * num_effective_threads + j;
final_stats.combine(stats[idx]);
}
final_process_stats.combine(process_stats[i]);
}
/* overall stats */
@ -2084,13 +2108,21 @@ void printReport(Arguments const& args,
break;
}
}
const auto tps_f = final_stats.getOpCount(OP_TRANSACTION) / duration_sec;
const auto tps_f = final_worker_stats.getOpCount(OP_TRANSACTION) / duration_sec;
const auto tps_i = static_cast<uint64_t>(tps_f);
fmt::printf("Total Xacts: %8lu\n", final_stats.getOpCount(OP_TRANSACTION));
fmt::printf("Total Conflicts: %8lu\n", final_stats.getConflictCount());
fmt::printf("Total Errors: %8lu\n", final_stats.getTotalErrorCount());
fmt::printf("Total Timeouts: %8lu\n", final_stats.getTotalTimeoutCount());
const auto cpu_external_network = final_process_stats.getTotalCPUUtilization() -
final_thread_stats.getCPUUtilization() -
final_process_stats.getFDBNetworkCPUUtilization();
fmt::printf("Total Xacts: %8lu\n", final_worker_stats.getOpCount(OP_TRANSACTION));
fmt::printf("Total Conflicts: %8lu\n", final_worker_stats.getConflictCount());
fmt::printf("Total Errors: %8lu\n", final_worker_stats.getTotalErrorCount());
fmt::printf("Total Timeouts: %8lu\n", final_worker_stats.getTotalTimeoutCount());
fmt::printf("Overall TPS: %8lu\n\n", tps_i);
fmt::printf("%%CPU Worker Processes: %6.2f \n", final_process_stats.getTotalCPUUtilization());
fmt::printf("%%CPU Worker Threads: %6.2f \n", final_thread_stats.getCPUUtilization());
fmt::printf("%%CPU Local Network Threads: %6.2f \n", final_process_stats.getFDBNetworkCPUUtilization());
fmt::printf("%%CPU External Network Threads: %6.2f \n\n", cpu_external_network);
if (fp) {
fmt::fprintf(fp, "\"results\": {");
@ -2099,11 +2131,15 @@ void printReport(Arguments const& args,
fmt::fprintf(fp, "\"totalThreads\": %d,", args.num_threads);
fmt::fprintf(fp, "\"totalAsyncXacts\": %d,", args.async_xacts);
fmt::fprintf(fp, "\"targetTPS\": %d,", args.tpsmax);
fmt::fprintf(fp, "\"totalXacts\": %lu,", final_stats.getOpCount(OP_TRANSACTION));
fmt::fprintf(fp, "\"totalConflicts\": %lu,", final_stats.getConflictCount());
fmt::fprintf(fp, "\"totalErrors\": %lu,", final_stats.getTotalErrorCount());
fmt::fprintf(fp, "\"totalTimeouts\": %lu,", final_stats.getTotalTimeoutCount());
fmt::fprintf(fp, "\"totalXacts\": %lu,", final_worker_stats.getOpCount(OP_TRANSACTION));
fmt::fprintf(fp, "\"totalConflicts\": %lu,", final_worker_stats.getConflictCount());
fmt::fprintf(fp, "\"totalErrors\": %lu,", final_worker_stats.getTotalErrorCount());
fmt::fprintf(fp, "\"totalTimeouts\": %lu,", final_worker_stats.getTotalTimeoutCount());
fmt::fprintf(fp, "\"overallTPS\": %lu,", tps_i);
fmt::fprintf(fp, "\"workerProcesseCPU\": %.8f,", final_process_stats.getTotalCPUUtilization());
fmt::fprintf(fp, "\"workerThreadCPU\": %.8f,", final_thread_stats.getCPUUtilization());
fmt::fprintf(fp, "\"localNetworkCPU\": %.8f,", final_process_stats.getFDBNetworkCPUUtilization());
fmt::fprintf(fp, "\"externalNetworkCPU\": %.8f,", cpu_external_network);
}
/* per-op stats */
@ -2117,24 +2153,24 @@ void printReport(Arguments const& args,
auto first_op = true;
for (auto op = 0; op < MAX_OP; op++) {
if ((args.txnspec.ops[op][OP_COUNT] > 0 && op != OP_TRANSACTION) || op == OP_COMMIT) {
putField(final_stats.getOpCount(op));
putField(final_worker_stats.getOpCount(op));
if (fp) {
if (first_op) {
first_op = false;
} else {
fmt::fprintf(fp, ",");
}
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), final_stats.getOpCount(op));
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), final_worker_stats.getOpCount(op));
}
}
}
/* TPS */
const auto tps = final_stats.getOpCount(OP_TRANSACTION) / duration_sec;
const auto tps = final_worker_stats.getOpCount(OP_TRANSACTION) / duration_sec;
putFieldFloat(tps, 2);
/* Conflicts */
const auto conflicts_rate = final_stats.getConflictCount() / duration_sec;
const auto conflicts_rate = final_worker_stats.getConflictCount() / duration_sec;
putFieldFloat(conflicts_rate, 2);
fmt::print("\n");
@ -2147,14 +2183,14 @@ void printReport(Arguments const& args,
first_op = true;
for (auto op = 0; op < MAX_OP; op++) {
if ((args.txnspec.ops[op][OP_COUNT] > 0 && op != OP_TRANSACTION) || op == OP_COMMIT) {
putField(final_stats.getErrorCount(op));
putField(final_worker_stats.getErrorCount(op));
if (fp) {
if (first_op) {
first_op = false;
} else {
fmt::fprintf(fp, ",");
}
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), final_stats.getErrorCount(op));
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), final_worker_stats.getErrorCount(op));
}
}
}
@ -2168,14 +2204,14 @@ void printReport(Arguments const& args,
first_op = true;
for (auto op = 0; op < MAX_OP; op++) {
if ((args.txnspec.ops[op][OP_COUNT] > 0 && op != OP_TRANSACTION) || op == OP_COMMIT) {
putField(final_stats.getTimeoutCount(op));
putField(final_worker_stats.getTimeoutCount(op));
if (fp) {
if (first_op) {
first_op = false;
} else {
fmt::fprintf(fp, ",");
}
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), final_stats.getTimeoutCount(op));
fmt::fprintf(fp, "\"%s\": %lu", getOpName(op), final_worker_stats.getTimeoutCount(op));
}
}
}
@ -2200,14 +2236,14 @@ void printReport(Arguments const& args,
}
}
}
final_stats.updateLatencies(data_points);
final_worker_stats.updateLatencies(data_points);
printThreadStats(final_stats, args, fp);
printWorkerStats(final_worker_stats, args, fp);
// export the ddsketch if the flag was set
if (args.stats_export_path[0] != 0) {
std::ofstream f(args.stats_export_path);
f << final_stats;
f << final_worker_stats;
}
const auto command_remove = fmt::format("rm -rf {}{}", TEMP_DATA_STORE, pid_main);
@ -2218,7 +2254,9 @@ void printReport(Arguments const& args,
}
int statsProcessMain(Arguments const& args,
ThreadStatistics const* stats,
WorkerStatistics const* worker_stats,
ThreadStatistics const* thread_stats,
ProcessStatistics const* process_stats,
std::atomic<double>& throttle_factor,
std::atomic<int> const& signal,
std::atomic<int> const& stopcount,
@ -2327,7 +2365,7 @@ int statsProcessMain(Arguments const& args,
if (fp)
fmt::fprintf(fp, ",");
}
printStats(args, stats, toDoubleSeconds(time_now - time_prev), fp);
printStats(args, worker_stats, toDoubleSeconds(time_now - time_prev), fp);
}
time_prev = time_now;
}
@ -2343,7 +2381,8 @@ int statsProcessMain(Arguments const& args,
while (stopcount.load() < args.num_threads * args.num_processes) {
usleep(10000); /* 10ms */
}
printReport(args, stats, toDoubleSeconds(time_now - time_start), pid_main, fp);
printReport(
args, worker_stats, thread_stats, process_stats, toDoubleSeconds(time_now - time_start), pid_main, fp);
}
if (fp) {
@ -2354,12 +2393,12 @@ int statsProcessMain(Arguments const& args,
return 0;
}
ThreadStatistics mergeSketchReport(Arguments& args) {
WorkerStatistics mergeSketchReport(Arguments& args) {
ThreadStatistics stats;
WorkerStatistics stats;
for (int i = 0; i < args.num_report_files; i++) {
std::ifstream f{ args.report_files[i] };
ThreadStatistics tmp;
WorkerStatistics tmp;
f >> tmp;
stats.combine(tmp);
}
@ -2442,8 +2481,8 @@ int main(int argc, char* argv[]) {
}
if (args.mode == MODE_REPORT) {
ThreadStatistics stats = mergeSketchReport(args);
printThreadStats(stats, args, NULL, true);
WorkerStatistics stats = mergeSketchReport(args);
printWorkerStats(stats, args, NULL, true);
return 0;
}
@ -2502,9 +2541,9 @@ int main(int argc, char* argv[]) {
});
const auto async_mode = args.async_xacts > 0;
const auto nthreads_for_shm = async_mode ? args.async_xacts : args.num_threads;
const auto num_workers = async_mode ? args.async_xacts : args.num_threads;
/* allocate */
const auto shmsize = shared_memory::storageSize(args.num_processes, nthreads_for_shm);
const auto shmsize = shared_memory::storageSize(args.num_processes, args.num_threads, num_workers);
auto shm = std::add_pointer_t<void>{};
if (ftruncate(shmfd, shmsize) < 0) {
@ -2521,7 +2560,7 @@ int main(int argc, char* argv[]) {
}
auto munmap_guard = ExitGuard([=]() { munmap(shm, shmsize); });
auto shm_access = shared_memory::Access(shm, args.num_processes, nthreads_for_shm);
auto shm_access = shared_memory::Access(shm, args.num_processes, args.num_threads, num_workers);
/* initialize the shared memory */
shm_access.initMemory();
@ -2537,7 +2576,7 @@ int main(int argc, char* argv[]) {
/* fork worker processes + 1 stats process */
auto worker_pids = std::vector<pid_t>(args.num_processes + 1);
auto worker_id = int{};
auto process_idx = int{};
/* forking (num_process + 1) children */
/* last process is the stats handler */
@ -2554,7 +2593,7 @@ int main(int argc, char* argv[]) {
/* worker process */
logr = Logger(WorkerProcess{}, args.verbose, p);
proc_type = ProcKind::WORKER;
worker_id = p;
process_idx = p;
} else {
/* stats */
logr = Logger(StatsProcess{}, args.verbose);
@ -2574,18 +2613,29 @@ int main(int argc, char* argv[]) {
if (proc_type == ProcKind::WORKER) {
/* worker process */
workerProcessMain(args, worker_id, shm_access, pid_main);
auto time_start = steady_clock::now();
workerProcessMain(args, process_idx, shm_access, pid_main);
shm_access.processStatsSlot(process_idx)
.setCPUUtilization(getProcessorTimeProcess() / toDoubleSeconds(steady_clock::now() - time_start) * 100.);
/* worker can exit here */
exit(0);
_exit(0);
} else if (proc_type == ProcKind::STATS) {
/* stats */
if (args.mode == MODE_CLEAN) {
/* no stats needed for clean mode */
exit(0);
_exit(0);
}
statsProcessMain(
args, shm_access.statsConstArray(), shm_hdr.throttle_factor, shm_hdr.signal, shm_hdr.stopcount, pid_main);
exit(0);
statsProcessMain(args,
shm_access.workerStatsConstArray(),
shm_access.threadStatsConstArray(),
shm_access.processStatsConstArray(),
shm_hdr.throttle_factor,
shm_hdr.signal,
shm_hdr.stopcount,
pid_main);
_exit(0);
}
/* master */

View File

@ -43,63 +43,136 @@ struct Header {
struct LayoutHelper {
Header hdr;
ThreadStatistics stats;
WorkerStatistics stats;
};
inline size_t storageSize(int num_processes, int num_threads) noexcept {
inline size_t storageSize(int num_processes, int num_threads, int num_workers) noexcept {
assert(num_processes >= 1 && num_threads >= 1);
return sizeof(LayoutHelper) + sizeof(ThreadStatistics) * ((num_processes * num_threads) - 1);
return sizeof(LayoutHelper) + sizeof(WorkerStatistics) * ((num_processes * num_workers) - 1) +
sizeof(ThreadStatistics) * (num_threads * num_processes) + sizeof(ProcessStatistics) * num_processes;
}
// class Access memory layout:
// Header | WorkerStatistics | WorkerStatistics * (num_processes * num_workers - 1) | ThreadStatistics * (num_processes
// * num_threads) | ProcessStatistics * (num_processes)
// all Statistics classes have alignas(64)
class Access {
void* base;
int num_processes;
int num_threads;
int num_workers;
static inline ThreadStatistics& statsSlot(void* shm_base,
int num_threads,
int process_idx,
int thread_idx) noexcept {
return (&static_cast<LayoutHelper*>(shm_base)->stats)[process_idx * num_threads + thread_idx];
static inline WorkerStatistics& workerStatsSlot(void* shm_base,
int num_workers,
int process_idx,
int worker_idx) noexcept {
return (&static_cast<LayoutHelper*>(shm_base)->stats)[process_idx * num_workers + worker_idx];
}
static inline ThreadStatistics& threadStatsSlot(void* shm_base,
int num_processes,
int num_threads,
int num_workers,
int process_idx,
int thread_idx) noexcept {
ThreadStatistics* thread_stat_base =
reinterpret_cast<ThreadStatistics*>(static_cast<char*>(shm_base) + sizeof(LayoutHelper) +
sizeof(WorkerStatistics) * num_processes * num_workers);
return thread_stat_base[process_idx * num_threads + thread_idx];
}
static inline ProcessStatistics& processStatsSlot(void* shm_base,
int num_processes,
int num_threads,
int num_workers,
int process_idx) noexcept {
ProcessStatistics* proc_stat_base =
reinterpret_cast<ProcessStatistics*>(static_cast<char*>(shm_base) + sizeof(LayoutHelper) +
sizeof(WorkerStatistics) * num_processes * num_workers +
sizeof(ThreadStatistics) * num_processes * num_threads);
return proc_stat_base[process_idx];
}
public:
Access(void* shm, int num_processes, int num_threads) noexcept
: base(shm), num_processes(num_processes), num_threads(num_threads) {}
Access(void* shm, int num_processes, int num_threads, int num_workers) noexcept
: base(shm), num_processes(num_processes), num_threads(num_threads), num_workers(num_workers) {}
Access() noexcept : Access(nullptr, 0, 0) {}
Access() noexcept : Access(nullptr, 0, 0, 0) {}
Access(const Access&) noexcept = default;
Access& operator=(const Access&) noexcept = default;
size_t size() const noexcept { return storageSize(num_processes, num_threads); }
size_t size() const noexcept { return storageSize(num_processes, num_threads, num_workers); }
void initMemory() noexcept {
new (&header()) Header{};
for (auto i = 0; i < num_processes; i++)
for (auto j = 0; j < num_threads; j++)
new (&statsSlot(i, j)) ThreadStatistics();
for (auto j = 0; j < num_workers; j++) {
new (&workerStatsSlot(i, j)) WorkerStatistics();
}
for (auto i = 0; i < num_processes; i++)
for (auto j = 0; j < num_threads; j++) {
new (&threadStatsSlot(i, j)) ThreadStatistics();
}
for (auto i = 0; i < num_processes; i++) {
new (&processStatsSlot(i)) ProcessStatistics();
}
}
Header const& headerConst() const noexcept { return *static_cast<Header const*>(base); }
Header& header() const noexcept { return *static_cast<Header*>(base); }
ThreadStatistics const* statsConstArray() const noexcept {
return &statsSlot(base, num_threads, 0 /*process_id*/, 0 /*thread_id*/);
WorkerStatistics const* workerStatsConstArray() const noexcept {
return &workerStatsSlot(base, num_workers, 0 /*process_idx*/, 0 /*worker_idx*/);
}
ThreadStatistics* statsArray() const noexcept {
return &statsSlot(base, num_threads, 0 /*process_id*/, 0 /*thread_id*/);
WorkerStatistics* workerStatsArray() const noexcept {
return &workerStatsSlot(base, num_workers, 0 /*process_idx*/, 0 /*worker_idx*/);
}
ThreadStatistics const& statsConstSlot(int process_idx, int thread_idx) const noexcept {
return statsSlot(base, num_threads, process_idx, thread_idx);
WorkerStatistics const& workerStatsConstSlot(int process_idx, int worker_idx) const noexcept {
return workerStatsSlot(base, num_workers, process_idx, worker_idx);
}
ThreadStatistics& statsSlot(int process_idx, int thread_idx) const noexcept {
return statsSlot(base, num_threads, process_idx, thread_idx);
WorkerStatistics& workerStatsSlot(int process_idx, int worker_idx) const noexcept {
return workerStatsSlot(base, num_workers, process_idx, worker_idx);
}
ThreadStatistics const* threadStatsConstArray() const noexcept {
return &threadStatsSlot(base, num_processes, num_threads, num_workers, 0 /*process_idx*/, 0 /*thread_idx*/);
}
ThreadStatistics* threadStatsArray() const noexcept {
return &threadStatsSlot(base, num_processes, num_threads, num_workers, 0 /*process_idx*/, 0 /*thread_idx*/);
}
ThreadStatistics const& threadStatsConstSlot(int process_idx, int thread_idx) const noexcept {
return threadStatsSlot(base, num_processes, num_threads, num_workers, process_idx, thread_idx);
}
ThreadStatistics& threadStatsSlot(int process_idx, int thread_idx) const noexcept {
return threadStatsSlot(base, num_processes, num_threads, num_workers, process_idx, thread_idx);
}
ProcessStatistics const* processStatsConstArray() const noexcept {
return &processStatsSlot(base, num_processes, num_threads, num_workers, 0 /*process_idx*/);
}
ProcessStatistics* processStatsArray() const noexcept {
return &processStatsSlot(base, num_processes, num_threads, num_workers, 0 /*process_idx*/);
}
ProcessStatistics const& processStatsConstSlot(int process_idx) const noexcept {
return processStatsSlot(base, num_processes, num_threads, num_workers, process_idx);
}
ProcessStatistics& processStatsSlot(int process_idx) const noexcept {
return processStatsSlot(base, num_processes, num_threads, num_workers, process_idx);
}
};

View File

@ -88,7 +88,7 @@ public:
}
};
class alignas(64) ThreadStatistics {
class alignas(64) WorkerStatistics {
uint64_t conflicts{ 0 };
uint64_t total_errors{ 0 };
uint64_t total_timeouts{ 0 };
@ -100,7 +100,7 @@ class alignas(64) ThreadStatistics {
std::vector<DDSketchMako> sketches;
public:
ThreadStatistics() noexcept {
WorkerStatistics() noexcept {
std::fill(ops.begin(), ops.end(), 0);
std::fill(errors.begin(), errors.end(), 0);
std::fill(timeouts.begin(), timeouts.end(), 0);
@ -109,8 +109,8 @@ public:
sketches.resize(MAX_OP);
}
ThreadStatistics(const ThreadStatistics& other) = default;
ThreadStatistics& operator=(const ThreadStatistics& other) = default;
WorkerStatistics(const WorkerStatistics& other) = default;
WorkerStatistics& operator=(const WorkerStatistics& other) = default;
uint64_t getConflictCount() const noexcept { return conflicts; }
@ -137,7 +137,7 @@ public:
uint64_t mean(int op) const noexcept { return sketches[op].mean(); }
// with 'this' as final aggregation, factor in 'other'
void combine(const ThreadStatistics& other) {
void combine(const WorkerStatistics& other) {
conflicts += other.conflicts;
for (auto op = 0; op < MAX_OP; op++) {
sketches[op].mergeWith(other.sketches[op]);
@ -183,11 +183,11 @@ public:
void updateLatencies(const std::vector<DDSketchMako> other_sketches) { sketches = other_sketches; }
friend std::ofstream& operator<<(std::ofstream& os, ThreadStatistics& stats);
friend std::ifstream& operator>>(std::ifstream& is, ThreadStatistics& stats);
friend std::ofstream& operator<<(std::ofstream& os, WorkerStatistics& stats);
friend std::ifstream& operator>>(std::ifstream& is, WorkerStatistics& stats);
};
inline std::ofstream& operator<<(std::ofstream& os, ThreadStatistics& stats) {
inline std::ofstream& operator<<(std::ofstream& os, WorkerStatistics& stats) {
rapidjson::StringBuffer ss;
rapidjson::Writer<rapidjson::StringBuffer> writer(ss);
writer.StartObject();
@ -254,7 +254,7 @@ inline void populateArray(std::array<uint64_t, MAX_OP>& arr,
}
}
inline std::ifstream& operator>>(std::ifstream& is, ThreadStatistics& stats) {
inline std::ifstream& operator>>(std::ifstream& is, WorkerStatistics& stats) {
std::stringstream buffer;
buffer << is.rdbuf();
rapidjson::Document doc;
@ -282,6 +282,45 @@ inline std::ifstream& operator>>(std::ifstream& is, ThreadStatistics& stats) {
return is;
}
class alignas(64) ThreadStatistics {
double cpu_utilization{ 0.0 };
public:
ThreadStatistics() noexcept {}
ThreadStatistics(const ThreadStatistics& other) = default;
ThreadStatistics& operator=(const ThreadStatistics& other) = default;
double getCPUUtilization() { return cpu_utilization; }
void setCPUUtilization(double cpu_utilization) { this->cpu_utilization = cpu_utilization; }
void combine(const ThreadStatistics& other) { cpu_utilization += other.cpu_utilization; }
};
class alignas(64) ProcessStatistics {
double total_cpu_utilization{ 0.0 };
double fdb_network_cpu_utilization{ 0.0 };
public:
ProcessStatistics() noexcept {}
ProcessStatistics(const ProcessStatistics& other) = default;
ProcessStatistics& operator=(const ProcessStatistics& other) = default;
double getTotalCPUUtilization() { return total_cpu_utilization; }
double getFDBNetworkCPUUtilization() { return fdb_network_cpu_utilization; }
void setCPUUtilization(double total_cpu_utilization) { this->total_cpu_utilization = total_cpu_utilization; }
void setFDBNetworkCPUUtilization(double fdb_network_cpu_utilization) {
this->fdb_network_cpu_utilization = fdb_network_cpu_utilization;
}
void combine(const ProcessStatistics& other) {
total_cpu_utilization += other.total_cpu_utilization;
fdb_network_cpu_utilization += other.fdb_network_cpu_utilization;
}
};
} // namespace mako
#endif /* MAKO_STATS_HPP */