From 8fb67d28cec5f7947bbd275dbabebbc22f97e1f5 Mon Sep 17 00:00:00 2001 From: Christian Wende Date: Mon, 23 Jan 2023 15:01:11 +0100 Subject: [PATCH] mako: Improve location of CPU utilization measurements and improve how multiple measurements are accumulated --- bindings/c/test/mako/mako.cpp | 116 +++++++++++++++++++++------------ bindings/c/test/mako/stats.hpp | 72 +++++++++++++------- 2 files changed, 124 insertions(+), 64 deletions(-) diff --git a/bindings/c/test/mako/mako.cpp b/bindings/c/test/mako/mako.cpp index 4c843e5bea..9df9b1d8cd 100644 --- a/bindings/c/test/mako/mako.cpp +++ b/bindings/c/test/mako/mako.cpp @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -29,6 +30,7 @@ #include #include #include +#include #include #if defined(__linux__) #include @@ -412,7 +414,7 @@ int runWorkload(Database db, std::atomic const& throttle_factor, int const thread_iters, std::atomic const& signal, - WorkflowStatistics& stats, + WorkflowStatistics& workflow_stats, int const dotrace, int const dotagging) { auto traceid = std::string{}; @@ -507,7 +509,7 @@ int runWorkload(Database db, } } - rc = runOneTransaction(tx, args, stats, key1, key2, val); + rc = runOneTransaction(tx, args, workflow_stats, key1, key2, val); if (rc) { logr.warn("runOneTransaction failed ({})", rc); } @@ -638,7 +640,6 @@ void runAsyncWorkload(Arguments const& args, /* mako worker thread */ void workerThread(const ThreadArgs& thread_args) { - auto time_start = steady_clock::now(); const auto& args = *thread_args.args; const auto parent_id = thread_args.parent_id; @@ -651,8 +652,10 @@ void workerThread(const ThreadArgs& thread_args) { const auto& throttle_factor = thread_args.shm.headerConst().throttle_factor; auto& readycount = thread_args.shm.header().readycount; auto& stopcount = thread_args.shm.header().stopcount; - auto& stats = thread_args.shm.workerStatsSlot(process_idx, thread_idx); + auto& workflow_stats = thread_args.shm.workerStatsSlot(process_idx, thread_idx); + auto& thread_stats = thread_args.shm.threadStatsSlot(process_idx, thread_idx); logr = Logger(WorkerProcess{}, args.verbose, process_idx, thread_idx); + thread_stats.startThreadTimer(); logr.debug("started, tid: {}", reinterpret_cast(pthread_self())); @@ -682,24 +685,23 @@ void workerThread(const ThreadArgs& thread_args) { logr.error("cleanup failed"); } } else if (args.mode == MODE_BUILD) { - auto rc = populate(database, thread_args, thread_tps, stats); + auto rc = populate(database, thread_args, thread_tps, workflow_stats); if (rc < 0) { logr.error("populate failed"); } } else if (args.mode == MODE_RUN) { - auto rc = - runWorkload(database, args, thread_tps, throttle_factor, thread_iters, signal, stats, dotrace, dotagging); + auto rc = runWorkload( + database, args, thread_tps, throttle_factor, thread_iters, signal, workflow_stats, dotrace, dotagging); if (rc < 0) { logr.error("runWorkload failed"); } } if (args.mode == MODE_BUILD || args.mode == MODE_RUN) { - dumpThreadSamples(args, parent_id, process_idx, thread_idx, stats); + dumpThreadSamples(args, parent_id, process_idx, thread_idx, workflow_stats); } - thread_args.shm.threadStatsSlot(process_idx, thread_idx) - .setCPUUtilization(getProcessorTimeThread() / toDoubleSeconds(steady_clock::now() - time_start) * 100.); + thread_stats.endThreadTimer(); } /* mako worker process */ @@ -716,10 +718,12 @@ int workerProcessMain(Arguments const& args, int process_idx, shared_memory::Acc logr.debug("network::setup()"); network::setup(); + shm.processStatsSlot(process_idx).startProcessTimer(); + /* Each worker process will have its own network thread */ logr.debug("creating network thread"); auto network_thread = std::thread([parent_logr = logr, process_idx, shm]() { - auto time_start = steady_clock::now(); + shm.processStatsSlot(process_idx).startFDBNetworkTimer(); logr = parent_logr; logr.debug("network thread started"); @@ -727,9 +731,7 @@ int workerProcessMain(Arguments const& args, int process_idx, shared_memory::Acc logr.error("network::run(): {}", err.what()); } - shm.processStatsSlot(process_idx) - .setFDBNetworkCPUUtilization(getProcessorTimeThread() / toDoubleSeconds(steady_clock::now() - time_start) * - 100.); + shm.processStatsSlot(process_idx).endFDBNetworkTimer(); }); #if defined(__linux__) pthread_setname_np(network_thread.native_handle(), "mako_network"); @@ -801,22 +803,21 @@ int workerProcessMain(Arguments const& args, int process_idx, shared_memory::Acc auto worker_threads = std::vector(args.num_threads); for (auto i = 0; i < args.num_threads; i++) { worker_threads[i] = std::thread([&ctx, &args, process_idx, i, shm]() { - auto time_start = steady_clock::now(); + shm.threadStatsSlot(process_idx, i).startThreadTimer(); logr = Logger(WorkerProcess{}, args.verbose, process_idx); logr.debug("Async-mode worker thread {} started", i + 1); ctx.run(); logr.debug("Async-mode worker thread {} finished", i + 1); - shm.threadStatsSlot(process_idx, i) - .setCPUUtilization(getProcessorTimeThread() / toDoubleSeconds(steady_clock::now() - time_start) * - 100.); + shm.threadStatsSlot(process_idx, i).endThreadTimer(); }); #if defined(__linux__) const auto thread_name = "mako_worker_" + std::to_string(i); pthread_setname_np(worker_threads[i].native_handle(), thread_name.c_str()); #endif } + shm.header().readycount.fetch_add(args.num_threads); runAsyncWorkload(args, pid_main, process_idx, shm, ctx, databases); wg.reset(); @@ -824,6 +825,8 @@ int workerProcessMain(Arguments const& args, int process_idx, shared_memory::Acc thread.join(); shm.header().stopcount.fetch_add(args.num_threads); } + + shm.processStatsSlot(process_idx).endProcessTimer(); return 0; } @@ -2070,19 +2073,57 @@ void printReport(Arguments const& args, FILE* fp) { auto final_worker_stats = WorkflowStatistics{}; - auto final_thread_stats = ThreadStatistics{}; - auto final_process_stats = ProcessStatistics{}; const auto num_workers = args.async_xacts > 0 ? args.async_xacts : args.num_threads; for (auto i = 0; i < args.num_processes * num_workers; i++) { final_worker_stats.combine(worker_stats[i]); } - for (auto i = 0; i < args.num_processes * args.num_threads; i++) { - final_thread_stats.combine(thread_stats[i]); - } - for (auto i = 0; i < args.num_processes; i++) { - final_process_stats.combine(process_stats[i]); - } + + double cpu_time_worker_threads = + std::accumulate(thread_stats, + thread_stats + args.num_processes * args.num_threads, + 0.0, + [](double x, const ThreadStatistics& s) { return x + s.getCPUTime(); }); + double total_duration_worker_threads = + std::accumulate(thread_stats, + thread_stats + args.num_processes * args.num_threads, + 0.0, + [](double x, const ThreadStatistics& s) { return x + s.getTotalDuration(); }) / + (args.num_processes * args.num_threads); // average + + double cpu_util_worker_threads = 100. * cpu_time_worker_threads / total_duration_worker_threads; + + double cpu_time_worker_processes = std::accumulate( + process_stats, process_stats + args.num_processes, 0.0, [](double x, const ProcessStatistics& s) { + return x + s.getProcessCPUTime(); + }); + + double total_duration_worker_processes = + std::accumulate(process_stats, + process_stats + args.num_processes, + 0.0, + [](double x, const ProcessStatistics& s) { return x + s.getProcessTotalDuration(); }) / + args.num_processes; // average + + double cpu_util_worker_processes = 100. * cpu_time_worker_processes / total_duration_worker_processes; + + double cpu_time_local_fdb_networks = std::accumulate( + process_stats, process_stats + args.num_processes, 0.0, [](double x, const ProcessStatistics& s) { + return x + s.getFDBNetworkCPUTime(); + }); + + double total_duration_local_fdb_networks = + std::accumulate(process_stats, + process_stats + args.num_processes, + 0.0, + [](double x, const ProcessStatistics& s) { return x + s.getProcessTotalDuration(); }) / + args.num_processes; // average + + double cpu_util_local_fdb_networks = 100. * cpu_time_local_fdb_networks / total_duration_local_fdb_networks; + + double cpu_util_external_fdb_networks = + 100. * (cpu_time_worker_processes - cpu_time_worker_threads - cpu_time_local_fdb_networks) / + (total_duration_local_fdb_networks); // assume that external networks have same total duration as local networks /* overall stats */ fmt::printf("\n====== Total Duration %6.3f sec ======\n\n", duration_sec); @@ -2110,19 +2151,16 @@ void printReport(Arguments const& args, } const auto tps_f = final_worker_stats.getOpCount(OP_TRANSACTION) / duration_sec; const auto tps_i = static_cast(tps_f); - const auto cpu_external_network = final_process_stats.getTotalCPUUtilization() - - final_thread_stats.getCPUUtilization() - - final_process_stats.getFDBNetworkCPUUtilization(); fmt::printf("Total Xacts: %8lu\n", final_worker_stats.getOpCount(OP_TRANSACTION)); fmt::printf("Total Conflicts: %8lu\n", final_worker_stats.getConflictCount()); fmt::printf("Total Errors: %8lu\n", final_worker_stats.getTotalErrorCount()); fmt::printf("Total Timeouts: %8lu\n", final_worker_stats.getTotalTimeoutCount()); fmt::printf("Overall TPS: %8lu\n\n", tps_i); - fmt::printf("%%CPU Worker Processes: %6.2f \n", final_process_stats.getTotalCPUUtilization()); - fmt::printf("%%CPU Worker Threads: %6.2f \n", final_thread_stats.getCPUUtilization()); - fmt::printf("%%CPU Local Network Threads: %6.2f \n", final_process_stats.getFDBNetworkCPUUtilization()); - fmt::printf("%%CPU External Network Threads: %6.2f \n\n", cpu_external_network); + fmt::printf("%%CPU Worker Processes: %6.2f \n", cpu_util_worker_processes); + fmt::printf("%%CPU Worker Threads: %6.2f \n", cpu_util_worker_threads); + fmt::printf("%%CPU Local Network Threads: %6.2f \n", cpu_util_local_fdb_networks); + fmt::printf("%%CPU External Network Threads: %6.2f \n\n", cpu_util_external_fdb_networks); if (fp) { fmt::fprintf(fp, "\"results\": {"); @@ -2136,10 +2174,10 @@ void printReport(Arguments const& args, fmt::fprintf(fp, "\"totalErrors\": %lu,", final_worker_stats.getTotalErrorCount()); fmt::fprintf(fp, "\"totalTimeouts\": %lu,", final_worker_stats.getTotalTimeoutCount()); fmt::fprintf(fp, "\"overallTPS\": %lu,", tps_i); - fmt::fprintf(fp, "\"workerProcesseCPU\": %.8f,", final_process_stats.getTotalCPUUtilization()); - fmt::fprintf(fp, "\"workerThreadCPU\": %.8f,", final_thread_stats.getCPUUtilization()); - fmt::fprintf(fp, "\"localNetworkCPU\": %.8f,", final_process_stats.getFDBNetworkCPUUtilization()); - fmt::fprintf(fp, "\"externalNetworkCPU\": %.8f,", cpu_external_network); + fmt::fprintf(fp, "\"workerProcesseCPU\": %.8f,", cpu_util_worker_processes); + fmt::fprintf(fp, "\"workerThreadCPU\": %.8f,", cpu_util_worker_threads); + fmt::fprintf(fp, "\"localNetworkCPU\": %.8f,", cpu_util_local_fdb_networks); + fmt::fprintf(fp, "\"externalNetworkCPU\": %.8f,", cpu_util_external_fdb_networks); } /* per-op stats */ @@ -2613,13 +2651,9 @@ int main(int argc, char* argv[]) { if (proc_type == ProcKind::WORKER) { /* worker process */ - auto time_start = steady_clock::now(); workerProcessMain(args, process_idx, shm_access, pid_main); - shm_access.processStatsSlot(process_idx) - .setCPUUtilization(getProcessorTimeProcess() / toDoubleSeconds(steady_clock::now() - time_start) * 100.); - /* worker can exit here */ _exit(0); } else if (proc_type == ProcKind::STATS) { /* stats */ diff --git a/bindings/c/test/mako/stats.hpp b/bindings/c/test/mako/stats.hpp index 24106df65c..d40de97151 100644 --- a/bindings/c/test/mako/stats.hpp +++ b/bindings/c/test/mako/stats.hpp @@ -22,6 +22,7 @@ #define MAKO_STATS_HPP #include +#include #include #include #include @@ -31,6 +32,7 @@ #include #include #include +#include "flow/Platform.h" #include "mako/mako.hpp" #include "operations.hpp" #include "time.hpp" @@ -282,43 +284,67 @@ inline std::ifstream& operator>>(std::ifstream& is, WorkflowStatistics& stats) { return is; } -class alignas(64) ThreadStatistics { - double cpu_utilization{ 0.0 }; +enum TimerKind { THREAD, PROCESS }; + +class CPUUtilizationTimer { + steady_clock::time_point timepoint_start; + steady_clock::time_point timepoint_end; + double cpu_time_start{ 0.0 }; + double cpu_time_end{ 0.0 }; + TimerKind kind; public: - ThreadStatistics() noexcept {} + CPUUtilizationTimer(TimerKind kind) : kind(kind) {} + void start() { + timepoint_start = steady_clock::now(); + cpu_time_start = (kind == THREAD) ? getProcessorTimeThread() : getProcessorTimeProcess(); + } + void end() { + timepoint_end = steady_clock::now(); + cpu_time_end = (kind == THREAD) ? getProcessorTimeThread() : getProcessorTimeProcess(); + } + double getCPUUtilization() const { + return (cpu_time_end - cpu_time_start) / toDoubleSeconds(timepoint_end - timepoint_start) * 100.; + } - ThreadStatistics(const ThreadStatistics& other) = default; - ThreadStatistics& operator=(const ThreadStatistics& other) = default; + double getCPUTime() const { return cpu_time_end - cpu_time_start; } - double getCPUUtilization() { return cpu_utilization; } - void setCPUUtilization(double cpu_utilization) { this->cpu_utilization = cpu_utilization; } + double getTotalDuration() const { return toDoubleSeconds(timepoint_end - timepoint_start); } +}; - void combine(const ThreadStatistics& other) { cpu_utilization += other.cpu_utilization; } +class alignas(64) ThreadStatistics { + CPUUtilizationTimer thread_timer; + +public: + ThreadStatistics() : thread_timer(THREAD) {} + + void startThreadTimer() { thread_timer.start(); } + void endThreadTimer() { thread_timer.end(); } + double getThreadCPUUtilization() const { return thread_timer.getCPUUtilization(); } + double getCPUTime() const { return thread_timer.getCPUTime(); } + double getTotalDuration() const { return thread_timer.getTotalDuration(); } }; class alignas(64) ProcessStatistics { - double total_cpu_utilization{ 0.0 }; - double fdb_network_cpu_utilization{ 0.0 }; + CPUUtilizationTimer process_timer; + CPUUtilizationTimer fdb_network_timer; public: - ProcessStatistics() noexcept {} + ProcessStatistics() : process_timer(PROCESS), fdb_network_timer(THREAD) {} + void startProcessTimer() { process_timer.start(); } + void endProcessTimer() { process_timer.end(); } - ProcessStatistics(const ProcessStatistics& other) = default; - ProcessStatistics& operator=(const ProcessStatistics& other) = default; + void startFDBNetworkTimer() { fdb_network_timer.start(); } + void endFDBNetworkTimer() { fdb_network_timer.end(); } - double getTotalCPUUtilization() { return total_cpu_utilization; } - double getFDBNetworkCPUUtilization() { return fdb_network_cpu_utilization; } + double getProcessCPUUtilization() const { return process_timer.getCPUUtilization(); } + double getFDBNetworkCPUUtilization() const { return fdb_network_timer.getCPUUtilization(); } - void setCPUUtilization(double total_cpu_utilization) { this->total_cpu_utilization = total_cpu_utilization; } - void setFDBNetworkCPUUtilization(double fdb_network_cpu_utilization) { - this->fdb_network_cpu_utilization = fdb_network_cpu_utilization; - } + double getProcessCPUTime() const { return process_timer.getCPUTime(); } + double getFDBNetworkCPUTime() const { return fdb_network_timer.getCPUTime(); } - void combine(const ProcessStatistics& other) { - total_cpu_utilization += other.total_cpu_utilization; - fdb_network_cpu_utilization += other.fdb_network_cpu_utilization; - } + double getProcessTotalDuration() const { return process_timer.getTotalDuration(); } + double getFDBNetworkTotalDuration() const { return fdb_network_timer.getTotalDuration(); } }; } // namespace mako