Refactor Mako to C++
* Refine each op into non-blocking sub-op steps returning valid or null futures (for asynchrony to come) * Restructure op switch-case block to (op, step)-function mapping * Simplify statistics update * Simplify console output formatting * Simplify memory management
This commit is contained in:
parent
b23c51ffcc
commit
0c6578a1e5
File diff suppressed because it is too large
Load Diff
|
@ -1,12 +1,17 @@
|
|||
#ifndef MAKO_H
|
||||
#define MAKO_H
|
||||
#ifndef MAKO_HPP
|
||||
#define MAKO_HPP
|
||||
#pragma once
|
||||
|
||||
#ifndef FDB_API_VERSION
|
||||
#define FDB_API_VERSION 710
|
||||
#endif
|
||||
|
||||
#include <foundationdb/fdb_c.h>
|
||||
#include <array>
|
||||
#include <atomic>
|
||||
#include <list>
|
||||
#include <vector>
|
||||
#include <string_view>
|
||||
#include <fdb.hpp>
|
||||
#include <pthread.h>
|
||||
#include <sys/types.h>
|
||||
#include <stdbool.h>
|
||||
|
@ -18,192 +23,337 @@
|
|||
#include <limits.h>
|
||||
#endif
|
||||
|
||||
#define VERBOSE_NONE 0
|
||||
#define VERBOSE_DEFAULT 1
|
||||
#define VERBOSE_ANNOYING 2
|
||||
#define VERBOSE_DEBUG 3
|
||||
constexpr const int VERBOSE_NONE = 0;
|
||||
constexpr const int VERBOSE_DEFAULT = 1;
|
||||
constexpr const int VERBOSE_ANNOYING = 2;
|
||||
constexpr const int VERBOSE_DEBUG = 3;
|
||||
|
||||
#define MODE_INVALID -1
|
||||
#define MODE_CLEAN 0
|
||||
#define MODE_BUILD 1
|
||||
#define MODE_RUN 2
|
||||
constexpr const int MODE_INVALID = -1;
|
||||
constexpr const int MODE_CLEAN = 0;
|
||||
constexpr const int MODE_BUILD = 1;
|
||||
constexpr const int MODE_RUN = 2;
|
||||
|
||||
#define FDB_SUCCESS 0
|
||||
#define FDB_ERROR_RETRY -1
|
||||
#define FDB_ERROR_ABORT -2
|
||||
#define FDB_ERROR_CONFLICT -3
|
||||
|
||||
#define LAT_BLOCK_SIZE 511 /* size of each block to get detailed latency for each operation */
|
||||
/* size of each block to get detailed latency for each operation */
|
||||
constexpr const size_t LAT_BLOCK_SIZE = 511;
|
||||
|
||||
/* transaction specification */
|
||||
enum Operations {
|
||||
OP_GETREADVERSION,
|
||||
OP_GET,
|
||||
OP_GETRANGE,
|
||||
OP_SGET,
|
||||
OP_SGETRANGE,
|
||||
OP_UPDATE,
|
||||
OP_INSERT,
|
||||
OP_INSERTRANGE,
|
||||
OP_OVERWRITE,
|
||||
OP_CLEAR,
|
||||
OP_SETCLEAR,
|
||||
OP_CLEARRANGE,
|
||||
OP_SETCLEARRANGE,
|
||||
OP_COMMIT,
|
||||
OP_TRANSACTION, /* pseudo-operation - cumulative time for the operation + commit */
|
||||
OP_READ_BG,
|
||||
MAX_OP /* must be the last item */
|
||||
OP_GETREADVERSION,
|
||||
OP_GET,
|
||||
OP_GETRANGE,
|
||||
OP_SGET,
|
||||
OP_SGETRANGE,
|
||||
OP_UPDATE,
|
||||
OP_INSERT,
|
||||
OP_INSERTRANGE,
|
||||
OP_OVERWRITE,
|
||||
OP_CLEAR,
|
||||
OP_SETCLEAR,
|
||||
OP_CLEARRANGE,
|
||||
OP_SETCLEARRANGE,
|
||||
OP_COMMIT,
|
||||
OP_TRANSACTION, /* pseudo-operation - cumulative time for the operation + commit */
|
||||
OP_READ_BG,
|
||||
MAX_OP /* must be the last item */
|
||||
};
|
||||
|
||||
#define OP_COUNT 0
|
||||
#define OP_RANGE 1
|
||||
#define OP_REVERSE 2
|
||||
constexpr const int OP_COUNT = 0;
|
||||
constexpr const int OP_RANGE = 1;
|
||||
constexpr const int OP_REVERSE = 2;
|
||||
|
||||
/* for long arguments */
|
||||
enum Arguments {
|
||||
ARG_KEYLEN,
|
||||
ARG_VALLEN,
|
||||
ARG_TPS,
|
||||
ARG_COMMITGET,
|
||||
ARG_SAMPLING,
|
||||
ARG_VERSION,
|
||||
ARG_KNOBS,
|
||||
ARG_FLATBUFFERS,
|
||||
ARG_LOGGROUP,
|
||||
ARG_PREFIXPADDING,
|
||||
ARG_TRACE,
|
||||
ARG_TRACEPATH,
|
||||
ARG_TRACEFORMAT,
|
||||
ARG_TPSMAX,
|
||||
ARG_TPSMIN,
|
||||
ARG_TPSINTERVAL,
|
||||
ARG_TPSCHANGE,
|
||||
ARG_TXNTRACE,
|
||||
ARG_TXNTAGGING,
|
||||
ARG_TXNTAGGINGPREFIX,
|
||||
ARG_STREAMING_MODE,
|
||||
ARG_DISABLE_RYW,
|
||||
ARG_CLIENT_THREADS_PER_VERSION,
|
||||
ARG_JSON_REPORT,
|
||||
ARG_BG_FILE_PATH // if blob granule files are stored locally, mako will read and materialize them if this is set
|
||||
ARG_KEYLEN,
|
||||
ARG_VALLEN,
|
||||
ARG_TPS,
|
||||
ARG_COMMITGET,
|
||||
ARG_SAMPLING,
|
||||
ARG_VERSION,
|
||||
ARG_KNOBS,
|
||||
ARG_FLATBUFFERS,
|
||||
ARG_LOGGROUP,
|
||||
ARG_PREFIXPADDING,
|
||||
ARG_TRACE,
|
||||
ARG_TRACEPATH,
|
||||
ARG_TRACEFORMAT,
|
||||
ARG_TPSMAX,
|
||||
ARG_TPSMIN,
|
||||
ARG_TPSINTERVAL,
|
||||
ARG_TPSCHANGE,
|
||||
ARG_TXNTRACE,
|
||||
ARG_TXNTAGGING,
|
||||
ARG_TXNTAGGINGPREFIX,
|
||||
ARG_STREAMING_MODE,
|
||||
ARG_DISABLE_RYW,
|
||||
ARG_CLIENT_THREADS_PER_VERSION,
|
||||
ARG_JSON_REPORT,
|
||||
ARG_BG_FILE_PATH // if blob granule files are stored locally, mako will read and materialize them if this is set
|
||||
};
|
||||
|
||||
enum TPSChangeTypes { TPS_SIN, TPS_SQUARE, TPS_PULSE };
|
||||
|
||||
#define KEYPREFIX "mako"
|
||||
#define KEYPREFIXLEN 4
|
||||
|
||||
#define TEMP_DATA_STORE "/tmp/makoTemp"
|
||||
|
||||
/* we set mako_txnspec_t and mako_args_t only once in the master process,
|
||||
* and won't be touched by child processes.
|
||||
*/
|
||||
|
||||
typedef struct {
|
||||
/* for each operation, it stores "count", "range" and "reverse" */
|
||||
int ops[MAX_OP][3];
|
||||
/* for each operation, it stores "count", "range" and "reverse" */
|
||||
int ops[MAX_OP][3];
|
||||
} mako_txnspec_t;
|
||||
|
||||
#define LOGGROUP_MAX 256
|
||||
#define KNOB_MAX 256
|
||||
#define TAGPREFIXLENGTH_MAX 8
|
||||
#define NUM_CLUSTERS_MAX 3
|
||||
#define NUM_DATABASES_MAX 10
|
||||
#define MAX_BG_IDS 1000
|
||||
constexpr const int LOGGROUP_MAX = 256;
|
||||
constexpr const int KNOB_MAX = 256;
|
||||
constexpr const int TAGPREFIXLENGTH_MAX = 8;
|
||||
constexpr const int NUM_CLUSTERS_MAX = 3;
|
||||
constexpr const int NUM_DATABASES_MAX = 10;
|
||||
constexpr const int MAX_BG_IDS = 1000;
|
||||
|
||||
/* benchmark parameters */
|
||||
typedef struct {
|
||||
int api_version;
|
||||
int json;
|
||||
int num_processes;
|
||||
int num_threads;
|
||||
int mode;
|
||||
int rows; /* is 2 billion enough? */
|
||||
int seconds;
|
||||
int iteration;
|
||||
int tpsmax;
|
||||
int tpsmin;
|
||||
int tpsinterval;
|
||||
int tpschange;
|
||||
int sampling;
|
||||
int key_length;
|
||||
int value_length;
|
||||
int zipf;
|
||||
int commit_get;
|
||||
int verbose;
|
||||
mako_txnspec_t txnspec;
|
||||
char cluster_files[NUM_CLUSTERS_MAX][PATH_MAX];
|
||||
int num_fdb_clusters;
|
||||
int num_databases;
|
||||
char log_group[LOGGROUP_MAX];
|
||||
int prefixpadding;
|
||||
int trace;
|
||||
char tracepath[PATH_MAX];
|
||||
int traceformat; /* 0 - XML, 1 - JSON */
|
||||
char knobs[KNOB_MAX];
|
||||
uint8_t flatbuffers;
|
||||
int txntrace;
|
||||
int txntagging;
|
||||
char txntagging_prefix[TAGPREFIXLENGTH_MAX];
|
||||
FDBStreamingMode streaming_mode;
|
||||
int client_threads_per_version;
|
||||
int disable_ryw;
|
||||
char json_output_path[PATH_MAX];
|
||||
bool bg_materialize_files;
|
||||
char bg_file_path[PATH_MAX];
|
||||
} mako_args_t;
|
||||
struct mako_args_t {
|
||||
int api_version;
|
||||
int json;
|
||||
int num_processes;
|
||||
int num_threads;
|
||||
int mode;
|
||||
int rows; /* is 2 billion enough? */
|
||||
int row_digits;
|
||||
int seconds;
|
||||
int iteration;
|
||||
int tpsmax;
|
||||
int tpsmin;
|
||||
int tpsinterval;
|
||||
int tpschange;
|
||||
int sampling;
|
||||
int key_length;
|
||||
int value_length;
|
||||
int zipf;
|
||||
int commit_get;
|
||||
int verbose;
|
||||
mako_txnspec_t txnspec;
|
||||
char cluster_files[NUM_CLUSTERS_MAX][PATH_MAX];
|
||||
int num_fdb_clusters;
|
||||
int num_databases;
|
||||
char log_group[LOGGROUP_MAX];
|
||||
int prefixpadding;
|
||||
int trace;
|
||||
char tracepath[PATH_MAX];
|
||||
int traceformat; /* 0 - XML, 1 - JSON */
|
||||
char knobs[KNOB_MAX];
|
||||
uint8_t flatbuffers;
|
||||
int txntrace;
|
||||
int txntagging;
|
||||
char txntagging_prefix[TAGPREFIXLENGTH_MAX];
|
||||
FDBStreamingMode streaming_mode;
|
||||
int64_t client_threads_per_version;
|
||||
int disable_ryw;
|
||||
char json_output_path[PATH_MAX];
|
||||
bool bg_materialize_files;
|
||||
char bg_file_path[PATH_MAX];
|
||||
};
|
||||
|
||||
/* shared memory */
|
||||
#define SIGNAL_RED 0
|
||||
#define SIGNAL_GREEN 1
|
||||
#define SIGNAL_OFF 2
|
||||
constexpr const int SIGNAL_RED = 0;
|
||||
constexpr const int SIGNAL_GREEN = 1;
|
||||
constexpr const int SIGNAL_OFF = 2;
|
||||
|
||||
typedef struct {
|
||||
int signal;
|
||||
int readycount;
|
||||
double throttle_factor;
|
||||
int stopcount;
|
||||
} mako_shmhdr_t;
|
||||
struct mako_shmhdr_t {
|
||||
std::atomic<int> signal;
|
||||
std::atomic<int> readycount;
|
||||
std::atomic<double> throttle_factor;
|
||||
std::atomic<int> stopcount;
|
||||
};
|
||||
|
||||
/* memory block allocated to each operation when collecting detailed latency */
|
||||
typedef struct {
|
||||
uint64_t data[LAT_BLOCK_SIZE];
|
||||
void* next_block;
|
||||
} lat_block_t;
|
||||
class alignas(64) mako_stats_t {
|
||||
uint64_t xacts;
|
||||
uint64_t conflicts;
|
||||
uint64_t total_errors;
|
||||
uint64_t ops[MAX_OP];
|
||||
uint64_t errors[MAX_OP];
|
||||
uint64_t latency_samples[MAX_OP];
|
||||
uint64_t latency_us_total[MAX_OP];
|
||||
uint64_t latency_us_min[MAX_OP];
|
||||
uint64_t latency_us_max[MAX_OP];
|
||||
public:
|
||||
mako_stats_t() noexcept {
|
||||
memset(this, 0, sizeof(mako_stats_t));
|
||||
memset(latency_us_max, 0xff, sizeof(latency_us_max));
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
uint64_t xacts;
|
||||
uint64_t conflicts;
|
||||
uint64_t ops[MAX_OP];
|
||||
uint64_t errors[MAX_OP];
|
||||
uint64_t latency_samples[MAX_OP];
|
||||
uint64_t latency_us_total[MAX_OP];
|
||||
uint64_t latency_us_min[MAX_OP];
|
||||
uint64_t latency_us_max[MAX_OP];
|
||||
} mako_stats_t;
|
||||
mako_stats_t(const mako_stats_t& other) noexcept = default;
|
||||
mako_stats_t& operator=(const mako_stats_t& other) noexcept = default;
|
||||
|
||||
uint64_t get_tx_count() const noexcept {
|
||||
return xacts;
|
||||
}
|
||||
|
||||
uint64_t get_conflict_count() const noexcept {
|
||||
return conflicts;
|
||||
}
|
||||
|
||||
uint64_t get_op_count(int op) const noexcept {
|
||||
return ops[op];
|
||||
}
|
||||
|
||||
uint64_t get_error_count(int op) const noexcept {
|
||||
return errors[op];
|
||||
}
|
||||
|
||||
uint64_t get_total_error_count() const noexcept {
|
||||
return total_errors;
|
||||
}
|
||||
|
||||
uint64_t get_latency_sample_count(int op) const noexcept {
|
||||
return latency_samples[op];
|
||||
}
|
||||
|
||||
uint64_t get_latency_us_total(int op) const noexcept {
|
||||
return latency_us_total[op];
|
||||
}
|
||||
|
||||
uint64_t get_latency_us_min(int op) const noexcept {
|
||||
return latency_us_min[op];
|
||||
}
|
||||
|
||||
uint64_t get_latency_us_max(int op) const noexcept {
|
||||
return latency_us_max[op];
|
||||
}
|
||||
|
||||
// with 'this' as final aggregation, factor in 'other'
|
||||
void combine(const mako_stats_t& other) {
|
||||
xacts += other.xacts;
|
||||
conflicts += other.conflicts;
|
||||
for (auto op = 0; op < MAX_OP; op++) {
|
||||
ops[op] += other.ops[op];
|
||||
errors[op] += other.errors[op];
|
||||
total_errors += other.errors[op];
|
||||
latency_samples[op] += other.latency_samples[op];
|
||||
latency_us_total[op] += other.latency_us_total[op];
|
||||
if (latency_us_min[op] > other.latency_us_min[op])
|
||||
latency_us_min[op] = other.latency_us_min[op];
|
||||
if (latency_us_min[op] < other.latency_us_min[op])
|
||||
latency_us_max[op] = other.latency_us_max[op];
|
||||
}
|
||||
}
|
||||
|
||||
void incr_tx_count() noexcept { xacts++; }
|
||||
void incr_conflict_count() noexcept { conflicts++; }
|
||||
|
||||
// non-commit write operations aren't measured for time.
|
||||
void incr_count_immediate(int op) noexcept { ops[op]++; }
|
||||
|
||||
void incr_error_count(int op) noexcept {
|
||||
total_errors++;
|
||||
errors[op]++;
|
||||
}
|
||||
|
||||
void add_latency(int op, uint64_t latency_us) noexcept {
|
||||
latency_samples[op]++;
|
||||
latency_us_total[op] += latency_us;
|
||||
if (latency_us_min[op] > latency_us)
|
||||
latency_us_min[op] = latency_us;
|
||||
if (latency_us_max[op] < latency_us)
|
||||
latency_us_max[op] = latency_us;
|
||||
ops[op]++;
|
||||
}
|
||||
};
|
||||
|
||||
/* per-process information */
|
||||
typedef struct {
|
||||
int worker_id;
|
||||
pid_t parent_id;
|
||||
mako_args_t* args;
|
||||
mako_shmhdr_t* shm;
|
||||
FDBDatabase* databases[NUM_DATABASES_MAX];
|
||||
int worker_id;
|
||||
pid_t parent_id;
|
||||
mako_args_t* args;
|
||||
mako_shmhdr_t* shm;
|
||||
std::vector<fdb::Database> databases;
|
||||
} process_info_t;
|
||||
|
||||
/* memory block allocated to each operation when collecting detailed latency */
|
||||
class lat_block_t {
|
||||
uint64_t samples[LAT_BLOCK_SIZE]{0, };
|
||||
uint32_t index{0};
|
||||
public:
|
||||
lat_block_t() noexcept = default;
|
||||
bool full() const noexcept { return index >= LAT_BLOCK_SIZE; }
|
||||
void put(uint64_t point) {
|
||||
assert(!full());
|
||||
samples[index++] = point;
|
||||
}
|
||||
// return {data block, number of samples}
|
||||
std::pair<uint64_t const*, size_t> data() const noexcept {
|
||||
return {samples, index};
|
||||
}
|
||||
};
|
||||
|
||||
/* collect sampled latencies */
|
||||
class sample_bin {
|
||||
std::list<lat_block_t> blocks;
|
||||
public:
|
||||
void reserve_one() {
|
||||
if (blocks.empty())
|
||||
blocks.emplace_back();
|
||||
}
|
||||
|
||||
void put(uint64_t latency_us) {
|
||||
if (blocks.empty() || blocks.back().full())
|
||||
blocks.emplace_back();
|
||||
blocks.back().put(latency_us);
|
||||
}
|
||||
|
||||
// iterate & apply for each block user function void(uint64_t const*, size_t)
|
||||
template <typename Func>
|
||||
void for_each_block(Func&& fn) const {
|
||||
for (const auto& block : blocks) {
|
||||
auto [ptr, cnt] = block.data();
|
||||
fn(ptr, cnt);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
using sample_bin_array_t = std::array<sample_bin, MAX_OP>;
|
||||
/* args for threads */
|
||||
typedef struct {
|
||||
struct alignas(64) thread_args_t {
|
||||
int thread_id;
|
||||
int database_index; // index of the database to do work to
|
||||
int elem_size[MAX_OP]; /* stores the multiple of LAT_BLOCK_SIZE to check the memory allocation of each operation */
|
||||
bool is_memory_allocated[MAX_OP]; /* flag specified for each operation, whether the memory was allocated to that
|
||||
specific operation */
|
||||
lat_block_t* block[MAX_OP];
|
||||
sample_bin_array_t sample_bins;
|
||||
process_info_t* process;
|
||||
} thread_args_t;
|
||||
};
|
||||
|
||||
/* process type */
|
||||
typedef enum { proc_master = 0, proc_worker, proc_stats } proc_type_t;
|
||||
|
||||
#endif /* MAKO_H */
|
||||
// determines how resultant future will be handled
|
||||
enum class StepKind {
|
||||
NONE, ///< not part of the table: OP_TRANSACTION, OP_COMMIT
|
||||
IMM, ///< non-future ops that return immediately: e.g. set, clear_range
|
||||
READ, ///< blockable reads: get(), get_range(), get_read_version, ...
|
||||
COMMIT, ///< self-explanatory
|
||||
ON_ERROR ///< future is a result of tx.on_error()
|
||||
};
|
||||
|
||||
class OpDesc {
|
||||
std::string_view name_;
|
||||
std::vector<StepKind> steps_;
|
||||
bool needs_commit_;
|
||||
public:
|
||||
OpDesc(std::string_view name, std::vector<StepKind>&& steps, bool needs_commit) :
|
||||
name_(name), steps_(std::move(steps)), needs_commit_(needs_commit)
|
||||
{}
|
||||
|
||||
std::string_view name() const noexcept {
|
||||
return name_;
|
||||
}
|
||||
// what
|
||||
StepKind step_kind(int step) const noexcept {
|
||||
assert(step < steps());
|
||||
return steps_[step];
|
||||
}
|
||||
// how many steps in this op?
|
||||
int steps() const noexcept {
|
||||
return static_cast<int>(steps_.size());
|
||||
}
|
||||
// does the op needs to commit some time after its final step?
|
||||
bool needs_commit() const noexcept {
|
||||
return needs_commit_;
|
||||
}
|
||||
};
|
||||
|
||||
#endif /* MAKO_HPP */
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
#include "utils.h"
|
||||
#include "mako.h"
|
||||
#include <math.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include "utils.hpp"
|
||||
#include "mako.hpp"
|
||||
#include <cmath>
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <fmt/format.h>
|
||||
|
||||
/* uniform-distribution random */
|
||||
int urand(int low, int high) {
|
||||
|
@ -12,26 +13,6 @@ int urand(int low, int high) {
|
|||
return (int)((r * range) + low);
|
||||
}
|
||||
|
||||
/* random string */
|
||||
/* len is the buffer size, must include null */
|
||||
void randstr(char* str, int len) {
|
||||
int i;
|
||||
for (i = 0; i < len - 1; i++) {
|
||||
str[i] = '!' + urand(0, 'z' - '!'); /* generage a char from '!' to 'z' */
|
||||
}
|
||||
str[len - 1] = '\0';
|
||||
}
|
||||
|
||||
/* random numeric string */
|
||||
/* len is the buffer size, must include null */
|
||||
void randnumstr(char* str, int len) {
|
||||
int i;
|
||||
for (i = 0; i < len - 1; i++) {
|
||||
str[i] = '0' + urand(0, 9); /* generage a char from '!' to 'z' */
|
||||
}
|
||||
str[len - 1] = '\0';
|
||||
}
|
||||
|
||||
/* return the first key to be inserted */
|
||||
int insert_begin(int rows, int p_idx, int t_idx, int total_p, int total_t) {
|
||||
double interval = (double)rows / total_p / total_t;
|
||||
|
@ -66,71 +47,3 @@ int digits(int num) {
|
|||
}
|
||||
return digits;
|
||||
}
|
||||
|
||||
/* generate a key for a given key number */
|
||||
/* prefix is "mako" by default, prefixpadding = 1 means 'x' will be in front rather than trailing the keyname */
|
||||
/* len is the buffer size, key length + null */
|
||||
void genkey(char* str, char* prefix, int prefixlen, int prefixpadding, int num, int rows, int len) {
|
||||
const int rowdigit = digits(rows);
|
||||
const int prefixoffset = prefixpadding ? len - (prefixlen + rowdigit) - 1 : 0;
|
||||
char* prefixstr = (char*)alloca(sizeof(char) * (prefixlen + rowdigit + 1));
|
||||
snprintf(prefixstr, prefixlen + rowdigit + 1, "%s%0.*d", prefix, rowdigit, num);
|
||||
memset(str, 'x', len);
|
||||
memcpy(str + prefixoffset, prefixstr, prefixlen + rowdigit);
|
||||
str[len - 1] = '\0';
|
||||
}
|
||||
|
||||
/* This is another sorting algorithm used to calculate latency parameters */
|
||||
/* We moved from radix sort to quick sort to avoid extra space used in radix sort */
|
||||
|
||||
#if 0
|
||||
uint64_t get_max(uint64_t arr[], int n) {
|
||||
uint64_t mx = arr[0];
|
||||
for (int i = 1; i < n; i++) {
|
||||
if (arr[i] > mx) {
|
||||
mx = arr[i];
|
||||
}
|
||||
}
|
||||
return mx;
|
||||
}
|
||||
|
||||
void bucket_data(uint64_t arr[], int n, uint64_t exp) {
|
||||
// uint64_t output[n];
|
||||
int i, count[10] = { 0 };
|
||||
uint64_t* output = (uint64_t*)malloc(sizeof(uint64_t) * n);
|
||||
|
||||
for (i = 0; i < n; i++) {
|
||||
count[(arr[i] / exp) % 10]++;
|
||||
}
|
||||
for (i = 1; i < 10; i++) {
|
||||
count[i] += count[i - 1];
|
||||
}
|
||||
for (i = n - 1; i >= 0; i--) {
|
||||
output[count[(arr[i] / exp) % 10] - 1] = arr[i];
|
||||
count[(arr[i] / exp) % 10]--;
|
||||
}
|
||||
for (i = 0; i < n; i++) {
|
||||
arr[i] = output[i];
|
||||
}
|
||||
free(output);
|
||||
}
|
||||
|
||||
// The main function is to sort arr[] of size n using Radix Sort
|
||||
void radix_sort(uint64_t* arr, int n) {
|
||||
// Find the maximum number to know number of digits
|
||||
uint64_t m = get_max(arr, n);
|
||||
for (uint64_t exp = 1; m / exp > 0; exp *= 10) bucket_data(arr, n, exp);
|
||||
}
|
||||
#endif
|
||||
|
||||
int compare(const void* a, const void* b) {
|
||||
const uint64_t* da = (const uint64_t*)a;
|
||||
const uint64_t* db = (const uint64_t*)b;
|
||||
|
||||
return (*da > *db) - (*da < *db);
|
||||
}
|
||||
|
||||
// The main function is to sort arr[] of size n using Quick Sort
|
||||
void quick_sort(uint64_t* arr, int n) {
|
||||
qsort(arr, n, sizeof(uint64_t), compare);
|
||||
}
|
||||
|
|
|
@ -1,22 +1,40 @@
|
|||
#ifndef UTILS_H
|
||||
#define UTILS_H
|
||||
#ifndef UTILS_HPP
|
||||
#define UTILS_HPP
|
||||
#pragma once
|
||||
|
||||
#include "mako.hpp"
|
||||
#include <cassert>
|
||||
#include <chrono>
|
||||
#include <fmt/format.h>
|
||||
#include <stdint.h>
|
||||
|
||||
/* uniform-distribution random */
|
||||
/* return a uniform random number between low and high, both inclusive */
|
||||
int urand(int low, int high);
|
||||
|
||||
/* write a random string of the length of (len-1) to memory pointed by str
|
||||
* with a null-termination character at str[len-1].
|
||||
*/
|
||||
void randstr(char* str, int len);
|
||||
/* random string */
|
||||
template <bool Clear=true, typename Char>
|
||||
void randstr(std::basic_string<Char>& str, int len) {
|
||||
if constexpr (Clear)
|
||||
str.clear();
|
||||
assert(len >= 0);
|
||||
str.reserve(str.size() + static_cast<size_t>(len));
|
||||
for (auto i = 0; i < len; i++) {
|
||||
str.push_back('!' + urand(0, 'z' - '!')); /* generage a char from '!' to 'z' */
|
||||
}
|
||||
}
|
||||
|
||||
/* write a random numeric string of the length of (len-1) to memory pointed by str
|
||||
* with a null-termination character at str[len-1].
|
||||
*/
|
||||
void randnumstr(char* str, int len);
|
||||
/* random numeric string */
|
||||
template <bool Clear=true, typename Char>
|
||||
void randnumstr(std::basic_string<Char>& str, int len) {
|
||||
if constexpr (Clear)
|
||||
str.clear();
|
||||
assert(len >= 0);
|
||||
str.reserve(str.size() + static_cast<size_t>(len));
|
||||
for (auto i = 0; i < len; i++) {
|
||||
str.push_back('0' + urand(0, 9)); /* generage a char from '0' to '9' */
|
||||
}
|
||||
}
|
||||
|
||||
/* given the total number of rows to be inserted,
|
||||
* the worker process index p_idx and the thread index t_idx (both 0-based),
|
||||
|
@ -46,20 +64,121 @@ int compute_thread_portion(int val, int p_idx, int t_idx, int total_p, int total
|
|||
/* get the number of digits */
|
||||
int digits(int num);
|
||||
|
||||
/* fill (str) with configured key prefix: i.e. non-numeric part
|
||||
* (str) is appended with concat([padding], PREFIX)
|
||||
*/
|
||||
template <bool Clear=true, typename Char>
|
||||
void genkeyprefix(std::basic_string<Char>& str,
|
||||
std::string_view prefix,
|
||||
mako_args_t const& args) {
|
||||
// concat('x' * padding_len, key_prefix)
|
||||
if constexpr (Clear)
|
||||
str.clear();
|
||||
const auto padding_len = args.prefixpadding ?
|
||||
(args.key_length - args.row_digits - static_cast<int>(prefix.size())) : 0;
|
||||
assert(padding_len >= 0);
|
||||
str.reserve(str.size() + padding_len + prefix.size());
|
||||
fmt::format_to(std::back_inserter(str),
|
||||
"{0:x>{1}}{2}", "", padding_len, prefix);
|
||||
}
|
||||
|
||||
/* generate a key for a given key number */
|
||||
/* prefix is "mako" by default, prefixpadding = 1 means 'x' will be in front rather than trailing the keyname */
|
||||
/* len is the buffer size, key length + null */
|
||||
void genkey(char* str, char* prefix, int prefixlen, int prefixpadding, int num, int rows, int len);
|
||||
template <bool Clear=true, typename Char>
|
||||
void genkey(std::basic_string<Char>& str,
|
||||
std::string_view prefix,
|
||||
mako_args_t const& args, int num) {
|
||||
static_assert(sizeof(Char) == 1);
|
||||
const auto pad_len = args.prefixpadding ?
|
||||
args.key_length - (static_cast<int>(prefix.size()) + args.row_digits) : 0;
|
||||
assert(pad_len >= 0);
|
||||
if constexpr (Clear)
|
||||
str.clear();
|
||||
str.reserve(str.size() + static_cast<size_t>(args.key_length));
|
||||
fmt::format_to(std::back_inserter(str),
|
||||
"{0:x>{1}}{2}{3:0{4}d}{5:x>{6}}",
|
||||
"", pad_len,
|
||||
prefix,
|
||||
num, args.row_digits,
|
||||
"", args.key_length - pad_len - static_cast<int>(prefix.size()) - args.row_digits);
|
||||
}
|
||||
|
||||
#if 0
|
||||
// The main function is to sort arr[] of size n using Radix Sort
|
||||
void radix_sort(uint64_t arr[], int n);
|
||||
void bucket_data(uint64_t arr[], int n, uint64_t exp);
|
||||
uint64_t get_max(uint64_t arr[], int n);
|
||||
#endif
|
||||
// invoke user-provided callable when object goes out of scope.
|
||||
template <typename Func>
|
||||
class exit_guard {
|
||||
std::decay_t<Func> fn;
|
||||
public:
|
||||
exit_guard(Func&& fn) : fn(std::forward<Func>(fn)) {}
|
||||
|
||||
// The main function is to sort arr[] of size n using Quick Sort
|
||||
void quick_sort(uint64_t arr[], int n);
|
||||
int compare(const void* a, const void* b);
|
||||
~exit_guard() {
|
||||
fn();
|
||||
}
|
||||
};
|
||||
|
||||
#endif /* UTILS_H */
|
||||
// invoke user-provided callable when stack unwinds by exception.
|
||||
template <typename Func>
|
||||
class fail_guard {
|
||||
std::decay_t<Func> fn;
|
||||
public:
|
||||
fail_guard(Func&& fn) : fn(std::forward<Func>(fn)) {}
|
||||
|
||||
~fail_guard() {
|
||||
if (std::uncaught_exceptions()) {
|
||||
fn();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// timing helpers
|
||||
using std::chrono::steady_clock;
|
||||
using timepoint_t = decltype(steady_clock::now());
|
||||
|
||||
template <typename Duration>
|
||||
double to_double_seconds(Duration duration) {
|
||||
return std::chrono::duration_cast<std::chrono::duration<double>>(duration).count();
|
||||
}
|
||||
|
||||
template <typename Duration>
|
||||
uint64_t to_integer_seconds(Duration duration) {
|
||||
return std::chrono::duration_cast<std::chrono::duration<uint64_t>>(duration).count();
|
||||
}
|
||||
|
||||
template <typename Duration>
|
||||
uint64_t to_integer_microseconds(Duration duration) {
|
||||
return std::chrono::duration_cast<std::chrono::duration<uint64_t, std::micro>>(duration).count();
|
||||
}
|
||||
|
||||
// trace helpers
|
||||
constexpr const int STATS_TITLE_WIDTH = 12;
|
||||
constexpr const int STATS_FIELD_WIDTH = 12;
|
||||
|
||||
template <typename Value>
|
||||
void put_title(Value&& value) {
|
||||
fmt::print("{0: <{1}} ", std::forward<Value>(value), STATS_TITLE_WIDTH);
|
||||
}
|
||||
|
||||
template <typename Value>
|
||||
void put_title_r(Value&& value) {
|
||||
fmt::print("{0: >{1}} ", std::forward<Value>(value), STATS_TITLE_WIDTH);
|
||||
}
|
||||
|
||||
inline void put_title_bar() {
|
||||
fmt::print("{0:=<{1}} ", "", STATS_TITLE_WIDTH);
|
||||
}
|
||||
|
||||
template <typename Value>
|
||||
void put_field(Value&& value) {
|
||||
fmt::print("{0: >{1}} ", std::forward<Value>(value), STATS_FIELD_WIDTH);
|
||||
}
|
||||
|
||||
inline void put_field_bar() {
|
||||
fmt::print("{0:=>{1}} ", "", STATS_FIELD_WIDTH);
|
||||
}
|
||||
|
||||
template <typename Value>
|
||||
void put_field_f(Value&& value, int precision) {
|
||||
fmt::print("{0: >{1}.{2}f} ",
|
||||
std::forward<Value>(value), STATS_FIELD_WIDTH, precision);
|
||||
}
|
||||
|
||||
#endif /* UTILS_HPP */
|
||||
|
|
Loading…
Reference in New Issue