mako: Improve location of CPU utilization measurements and improve how multiple measurements are accumulated

This commit is contained in:
Christian Wende 2023-01-23 15:01:11 +01:00
parent 39528b62ea
commit 8fb67d28ce
2 changed files with 124 additions and 64 deletions

View File

@ -20,6 +20,7 @@
#include <array>
#include <cassert>
#include <chrono>
#include <cmath>
#include <cstddef>
#include <cstdint>
@ -29,6 +30,7 @@
#include <fstream>
#include <map>
#include <new>
#include <numeric>
#include <optional>
#if defined(__linux__)
#include <pthread.h>
@ -412,7 +414,7 @@ int runWorkload(Database db,
std::atomic<double> const& throttle_factor,
int const thread_iters,
std::atomic<int> const& signal,
WorkflowStatistics& stats,
WorkflowStatistics& workflow_stats,
int const dotrace,
int const dotagging) {
auto traceid = std::string{};
@ -507,7 +509,7 @@ int runWorkload(Database db,
}
}
rc = runOneTransaction(tx, args, stats, key1, key2, val);
rc = runOneTransaction(tx, args, workflow_stats, key1, key2, val);
if (rc) {
logr.warn("runOneTransaction failed ({})", rc);
}
@ -638,7 +640,6 @@ void runAsyncWorkload(Arguments const& args,
/* mako worker thread */
void workerThread(const ThreadArgs& thread_args) {
auto time_start = steady_clock::now();
const auto& args = *thread_args.args;
const auto parent_id = thread_args.parent_id;
@ -651,8 +652,10 @@ void workerThread(const ThreadArgs& thread_args) {
const auto& throttle_factor = thread_args.shm.headerConst().throttle_factor;
auto& readycount = thread_args.shm.header().readycount;
auto& stopcount = thread_args.shm.header().stopcount;
auto& stats = thread_args.shm.workerStatsSlot(process_idx, thread_idx);
auto& workflow_stats = thread_args.shm.workerStatsSlot(process_idx, thread_idx);
auto& thread_stats = thread_args.shm.threadStatsSlot(process_idx, thread_idx);
logr = Logger(WorkerProcess{}, args.verbose, process_idx, thread_idx);
thread_stats.startThreadTimer();
logr.debug("started, tid: {}", reinterpret_cast<uint64_t>(pthread_self()));
@ -682,24 +685,23 @@ void workerThread(const ThreadArgs& thread_args) {
logr.error("cleanup failed");
}
} else if (args.mode == MODE_BUILD) {
auto rc = populate(database, thread_args, thread_tps, stats);
auto rc = populate(database, thread_args, thread_tps, workflow_stats);
if (rc < 0) {
logr.error("populate failed");
}
} else if (args.mode == MODE_RUN) {
auto rc =
runWorkload(database, args, thread_tps, throttle_factor, thread_iters, signal, stats, dotrace, dotagging);
auto rc = runWorkload(
database, args, thread_tps, throttle_factor, thread_iters, signal, workflow_stats, dotrace, dotagging);
if (rc < 0) {
logr.error("runWorkload failed");
}
}
if (args.mode == MODE_BUILD || args.mode == MODE_RUN) {
dumpThreadSamples(args, parent_id, process_idx, thread_idx, stats);
dumpThreadSamples(args, parent_id, process_idx, thread_idx, workflow_stats);
}
thread_args.shm.threadStatsSlot(process_idx, thread_idx)
.setCPUUtilization(getProcessorTimeThread() / toDoubleSeconds(steady_clock::now() - time_start) * 100.);
thread_stats.endThreadTimer();
}
/* mako worker process */
@ -716,10 +718,12 @@ int workerProcessMain(Arguments const& args, int process_idx, shared_memory::Acc
logr.debug("network::setup()");
network::setup();
shm.processStatsSlot(process_idx).startProcessTimer();
/* Each worker process will have its own network thread */
logr.debug("creating network thread");
auto network_thread = std::thread([parent_logr = logr, process_idx, shm]() {
auto time_start = steady_clock::now();
shm.processStatsSlot(process_idx).startFDBNetworkTimer();
logr = parent_logr;
logr.debug("network thread started");
@ -727,9 +731,7 @@ int workerProcessMain(Arguments const& args, int process_idx, shared_memory::Acc
logr.error("network::run(): {}", err.what());
}
shm.processStatsSlot(process_idx)
.setFDBNetworkCPUUtilization(getProcessorTimeThread() / toDoubleSeconds(steady_clock::now() - time_start) *
100.);
shm.processStatsSlot(process_idx).endFDBNetworkTimer();
});
#if defined(__linux__)
pthread_setname_np(network_thread.native_handle(), "mako_network");
@ -801,22 +803,21 @@ int workerProcessMain(Arguments const& args, int process_idx, shared_memory::Acc
auto worker_threads = std::vector<std::thread>(args.num_threads);
for (auto i = 0; i < args.num_threads; i++) {
worker_threads[i] = std::thread([&ctx, &args, process_idx, i, shm]() {
auto time_start = steady_clock::now();
shm.threadStatsSlot(process_idx, i).startThreadTimer();
logr = Logger(WorkerProcess{}, args.verbose, process_idx);
logr.debug("Async-mode worker thread {} started", i + 1);
ctx.run();
logr.debug("Async-mode worker thread {} finished", i + 1);
shm.threadStatsSlot(process_idx, i)
.setCPUUtilization(getProcessorTimeThread() / toDoubleSeconds(steady_clock::now() - time_start) *
100.);
shm.threadStatsSlot(process_idx, i).endThreadTimer();
});
#if defined(__linux__)
const auto thread_name = "mako_worker_" + std::to_string(i);
pthread_setname_np(worker_threads[i].native_handle(), thread_name.c_str());
#endif
}
shm.header().readycount.fetch_add(args.num_threads);
runAsyncWorkload(args, pid_main, process_idx, shm, ctx, databases);
wg.reset();
@ -824,6 +825,8 @@ int workerProcessMain(Arguments const& args, int process_idx, shared_memory::Acc
thread.join();
shm.header().stopcount.fetch_add(args.num_threads);
}
shm.processStatsSlot(process_idx).endProcessTimer();
return 0;
}
@ -2070,19 +2073,57 @@ void printReport(Arguments const& args,
FILE* fp) {
auto final_worker_stats = WorkflowStatistics{};
auto final_thread_stats = ThreadStatistics{};
auto final_process_stats = ProcessStatistics{};
const auto num_workers = args.async_xacts > 0 ? args.async_xacts : args.num_threads;
for (auto i = 0; i < args.num_processes * num_workers; i++) {
final_worker_stats.combine(worker_stats[i]);
}
for (auto i = 0; i < args.num_processes * args.num_threads; i++) {
final_thread_stats.combine(thread_stats[i]);
}
for (auto i = 0; i < args.num_processes; i++) {
final_process_stats.combine(process_stats[i]);
}
double cpu_time_worker_threads =
std::accumulate(thread_stats,
thread_stats + args.num_processes * args.num_threads,
0.0,
[](double x, const ThreadStatistics& s) { return x + s.getCPUTime(); });
double total_duration_worker_threads =
std::accumulate(thread_stats,
thread_stats + args.num_processes * args.num_threads,
0.0,
[](double x, const ThreadStatistics& s) { return x + s.getTotalDuration(); }) /
(args.num_processes * args.num_threads); // average
double cpu_util_worker_threads = 100. * cpu_time_worker_threads / total_duration_worker_threads;
double cpu_time_worker_processes = std::accumulate(
process_stats, process_stats + args.num_processes, 0.0, [](double x, const ProcessStatistics& s) {
return x + s.getProcessCPUTime();
});
double total_duration_worker_processes =
std::accumulate(process_stats,
process_stats + args.num_processes,
0.0,
[](double x, const ProcessStatistics& s) { return x + s.getProcessTotalDuration(); }) /
args.num_processes; // average
double cpu_util_worker_processes = 100. * cpu_time_worker_processes / total_duration_worker_processes;
double cpu_time_local_fdb_networks = std::accumulate(
process_stats, process_stats + args.num_processes, 0.0, [](double x, const ProcessStatistics& s) {
return x + s.getFDBNetworkCPUTime();
});
double total_duration_local_fdb_networks =
std::accumulate(process_stats,
process_stats + args.num_processes,
0.0,
[](double x, const ProcessStatistics& s) { return x + s.getProcessTotalDuration(); }) /
args.num_processes; // average
double cpu_util_local_fdb_networks = 100. * cpu_time_local_fdb_networks / total_duration_local_fdb_networks;
double cpu_util_external_fdb_networks =
100. * (cpu_time_worker_processes - cpu_time_worker_threads - cpu_time_local_fdb_networks) /
(total_duration_local_fdb_networks); // assume that external networks have same total duration as local networks
/* overall stats */
fmt::printf("\n====== Total Duration %6.3f sec ======\n\n", duration_sec);
@ -2110,19 +2151,16 @@ void printReport(Arguments const& args,
}
const auto tps_f = final_worker_stats.getOpCount(OP_TRANSACTION) / duration_sec;
const auto tps_i = static_cast<uint64_t>(tps_f);
const auto cpu_external_network = final_process_stats.getTotalCPUUtilization() -
final_thread_stats.getCPUUtilization() -
final_process_stats.getFDBNetworkCPUUtilization();
fmt::printf("Total Xacts: %8lu\n", final_worker_stats.getOpCount(OP_TRANSACTION));
fmt::printf("Total Conflicts: %8lu\n", final_worker_stats.getConflictCount());
fmt::printf("Total Errors: %8lu\n", final_worker_stats.getTotalErrorCount());
fmt::printf("Total Timeouts: %8lu\n", final_worker_stats.getTotalTimeoutCount());
fmt::printf("Overall TPS: %8lu\n\n", tps_i);
fmt::printf("%%CPU Worker Processes: %6.2f \n", final_process_stats.getTotalCPUUtilization());
fmt::printf("%%CPU Worker Threads: %6.2f \n", final_thread_stats.getCPUUtilization());
fmt::printf("%%CPU Local Network Threads: %6.2f \n", final_process_stats.getFDBNetworkCPUUtilization());
fmt::printf("%%CPU External Network Threads: %6.2f \n\n", cpu_external_network);
fmt::printf("%%CPU Worker Processes: %6.2f \n", cpu_util_worker_processes);
fmt::printf("%%CPU Worker Threads: %6.2f \n", cpu_util_worker_threads);
fmt::printf("%%CPU Local Network Threads: %6.2f \n", cpu_util_local_fdb_networks);
fmt::printf("%%CPU External Network Threads: %6.2f \n\n", cpu_util_external_fdb_networks);
if (fp) {
fmt::fprintf(fp, "\"results\": {");
@ -2136,10 +2174,10 @@ void printReport(Arguments const& args,
fmt::fprintf(fp, "\"totalErrors\": %lu,", final_worker_stats.getTotalErrorCount());
fmt::fprintf(fp, "\"totalTimeouts\": %lu,", final_worker_stats.getTotalTimeoutCount());
fmt::fprintf(fp, "\"overallTPS\": %lu,", tps_i);
fmt::fprintf(fp, "\"workerProcesseCPU\": %.8f,", final_process_stats.getTotalCPUUtilization());
fmt::fprintf(fp, "\"workerThreadCPU\": %.8f,", final_thread_stats.getCPUUtilization());
fmt::fprintf(fp, "\"localNetworkCPU\": %.8f,", final_process_stats.getFDBNetworkCPUUtilization());
fmt::fprintf(fp, "\"externalNetworkCPU\": %.8f,", cpu_external_network);
fmt::fprintf(fp, "\"workerProcesseCPU\": %.8f,", cpu_util_worker_processes);
fmt::fprintf(fp, "\"workerThreadCPU\": %.8f,", cpu_util_worker_threads);
fmt::fprintf(fp, "\"localNetworkCPU\": %.8f,", cpu_util_local_fdb_networks);
fmt::fprintf(fp, "\"externalNetworkCPU\": %.8f,", cpu_util_external_fdb_networks);
}
/* per-op stats */
@ -2613,13 +2651,9 @@ int main(int argc, char* argv[]) {
if (proc_type == ProcKind::WORKER) {
/* worker process */
auto time_start = steady_clock::now();
workerProcessMain(args, process_idx, shm_access, pid_main);
shm_access.processStatsSlot(process_idx)
.setCPUUtilization(getProcessorTimeProcess() / toDoubleSeconds(steady_clock::now() - time_start) * 100.);
/* worker can exit here */
_exit(0);
} else if (proc_type == ProcKind::STATS) {
/* stats */

View File

@ -22,6 +22,7 @@
#define MAKO_STATS_HPP
#include <array>
#include <chrono>
#include <cstdint>
#include <cstring>
#include <fstream>
@ -31,6 +32,7 @@
#include <new>
#include <ostream>
#include <utility>
#include "flow/Platform.h"
#include "mako/mako.hpp"
#include "operations.hpp"
#include "time.hpp"
@ -282,43 +284,67 @@ inline std::ifstream& operator>>(std::ifstream& is, WorkflowStatistics& stats) {
return is;
}
class alignas(64) ThreadStatistics {
double cpu_utilization{ 0.0 };
enum TimerKind { THREAD, PROCESS };
class CPUUtilizationTimer {
steady_clock::time_point timepoint_start;
steady_clock::time_point timepoint_end;
double cpu_time_start{ 0.0 };
double cpu_time_end{ 0.0 };
TimerKind kind;
public:
ThreadStatistics() noexcept {}
CPUUtilizationTimer(TimerKind kind) : kind(kind) {}
void start() {
timepoint_start = steady_clock::now();
cpu_time_start = (kind == THREAD) ? getProcessorTimeThread() : getProcessorTimeProcess();
}
void end() {
timepoint_end = steady_clock::now();
cpu_time_end = (kind == THREAD) ? getProcessorTimeThread() : getProcessorTimeProcess();
}
double getCPUUtilization() const {
return (cpu_time_end - cpu_time_start) / toDoubleSeconds(timepoint_end - timepoint_start) * 100.;
}
ThreadStatistics(const ThreadStatistics& other) = default;
ThreadStatistics& operator=(const ThreadStatistics& other) = default;
double getCPUTime() const { return cpu_time_end - cpu_time_start; }
double getCPUUtilization() { return cpu_utilization; }
void setCPUUtilization(double cpu_utilization) { this->cpu_utilization = cpu_utilization; }
double getTotalDuration() const { return toDoubleSeconds(timepoint_end - timepoint_start); }
};
void combine(const ThreadStatistics& other) { cpu_utilization += other.cpu_utilization; }
class alignas(64) ThreadStatistics {
CPUUtilizationTimer thread_timer;
public:
ThreadStatistics() : thread_timer(THREAD) {}
void startThreadTimer() { thread_timer.start(); }
void endThreadTimer() { thread_timer.end(); }
double getThreadCPUUtilization() const { return thread_timer.getCPUUtilization(); }
double getCPUTime() const { return thread_timer.getCPUTime(); }
double getTotalDuration() const { return thread_timer.getTotalDuration(); }
};
class alignas(64) ProcessStatistics {
double total_cpu_utilization{ 0.0 };
double fdb_network_cpu_utilization{ 0.0 };
CPUUtilizationTimer process_timer;
CPUUtilizationTimer fdb_network_timer;
public:
ProcessStatistics() noexcept {}
ProcessStatistics() : process_timer(PROCESS), fdb_network_timer(THREAD) {}
void startProcessTimer() { process_timer.start(); }
void endProcessTimer() { process_timer.end(); }
ProcessStatistics(const ProcessStatistics& other) = default;
ProcessStatistics& operator=(const ProcessStatistics& other) = default;
void startFDBNetworkTimer() { fdb_network_timer.start(); }
void endFDBNetworkTimer() { fdb_network_timer.end(); }
double getTotalCPUUtilization() { return total_cpu_utilization; }
double getFDBNetworkCPUUtilization() { return fdb_network_cpu_utilization; }
double getProcessCPUUtilization() const { return process_timer.getCPUUtilization(); }
double getFDBNetworkCPUUtilization() const { return fdb_network_timer.getCPUUtilization(); }
void setCPUUtilization(double total_cpu_utilization) { this->total_cpu_utilization = total_cpu_utilization; }
void setFDBNetworkCPUUtilization(double fdb_network_cpu_utilization) {
this->fdb_network_cpu_utilization = fdb_network_cpu_utilization;
}
double getProcessCPUTime() const { return process_timer.getCPUTime(); }
double getFDBNetworkCPUTime() const { return fdb_network_timer.getCPUTime(); }
void combine(const ProcessStatistics& other) {
total_cpu_utilization += other.total_cpu_utilization;
fdb_network_cpu_utilization += other.fdb_network_cpu_utilization;
}
double getProcessTotalDuration() const { return process_timer.getTotalDuration(); }
double getFDBNetworkTotalDuration() const { return fdb_network_timer.getTotalDuration(); }
};
} // namespace mako