diff --git a/bindings/c/test/mako/mako.c b/bindings/c/test/mako/mako.c index cf60ae5848..db5a995061 100755 --- a/bindings/c/test/mako/mako.c +++ b/bindings/c/test/mako/mako.c @@ -8,6 +8,7 @@ #include #include #include +#include #if defined(__linux__) #include @@ -643,7 +644,8 @@ int run_transaction(FDBTransaction *transaction, mako_args_t *args, return 0; } -int run_workload(FDBTransaction *transaction, mako_args_t *args, int thread_tps, +int run_workload(FDBTransaction *transaction, mako_args_t *args, + int thread_tps, volatile double *throttle_factor, int thread_iters, volatile int *signal, mako_stats_t *stats) { int xacts = 0; int rc = 0; @@ -651,10 +653,13 @@ int run_workload(FDBTransaction *transaction, mako_args_t *args, int thread_tps, char *keystr; char *keystr2; char *valstr; + int current_tps; if (thread_tps < 0) return 0; + current_tps = (int)((double)thread_tps * *throttle_factor); + keystr = (char *)malloc(sizeof(char) * args->key_length + 1); if (!keystr) return -1; @@ -674,7 +679,7 @@ int run_workload(FDBTransaction *transaction, mako_args_t *args, int thread_tps, while (1) { - if ((thread_tps > 0) && (xacts >= thread_tps)) { + if ((thread_tps > 0) && (xacts >= current_tps)) { /* throttling is on */ clock_gettime(CLOCK_MONOTONIC_COARSE, &timer_now); @@ -685,6 +690,8 @@ int run_workload(FDBTransaction *transaction, mako_args_t *args, int thread_tps, xacts = 0; timer_prev.tv_sec = timer_now.tv_sec; timer_prev.tv_nsec = timer_now.tv_nsec; + /* update throttle rate */ + current_tps = (int)((double)thread_tps * *throttle_factor); } else { /* 1 second not passed, throttle */ usleep(1000); @@ -727,6 +734,7 @@ void *worker_thread(void *thread_args) { int thread_iters = 0; int op; volatile int *signal = &((thread_args_t *)thread_args)->process->shm->signal; + volatile double *throttle_factor = &((thread_args_t *)thread_args)->process->shm->throttle_factor; volatile int *readycount = &((thread_args_t *)thread_args)->process->shm->readycount; mako_stats_t *stats = @@ -747,8 +755,8 @@ void *worker_thread(void *thread_args) { (unsigned int)pthread_self()); } - if (args->tps) { - thread_tps = compute_thread_tps(args->tps, worker_id, thread_id, + if (args->tpsmax) { + thread_tps = compute_thread_tps(args->tpsmax, worker_id, thread_id, args->num_processes, args->num_threads); } @@ -785,8 +793,8 @@ void *worker_thread(void *thread_args) { /* run the workload */ else if (args->mode == MODE_RUN) { - rc = run_workload(transaction, args, thread_tps, thread_iters, signal, - stats); + rc = run_workload(transaction, args, thread_tps, throttle_factor, + thread_iters, signal, stats); if (rc < 0) { fprintf(stderr, "ERROR: run_workload failed\n"); } @@ -1014,7 +1022,10 @@ int init_args(mako_args_t *args) { args->rows = 10000; args->seconds = 0; args->iteration = 0; - args->tps = 0; + args->tpsmax = 0; + args->tpsmin = -1; + args->tpsinterval = 10; + args->tpschange = TPS_SIN; args->sampling = 1000; args->key_length = 16; args->value_length = 16; @@ -1172,7 +1183,10 @@ void usage() { printf("%-24s%s\n", " --keylen=LENGTH", "Specify the key lengths"); printf("%-24s%s\n", " --vallen=LENGTH", "Specify the value lengths"); printf("%-24s%s\n", "-x, --transaction=SPEC", "Transaction specification"); - printf("%-24s%s\n", " --tps=TPS", "Specify the target TPS"); + printf("%-24s%s\n", " --tps|--tpsmax=TPS", "Specify the target max TPS"); + printf("%-24s%s\n", " --tpsmin=TPS", "Specify the target min TPS"); + printf("%-24s%s\n", " --tpsinterval=SEC", "Specify the TPS change interval (Default: 10 seconds)"); + printf("%-24s%s\n", " --tpschange=", "Specify the TPS change type (Default: sin)"); printf("%-24s%s\n", " --sampling=RATE", "Specify the sampling rate for latency stats"); printf("%-24s%s\n", "-m, --mode=MODE", @@ -1205,6 +1219,10 @@ int parse_args(int argc, char *argv[], mako_args_t *args) { {"vallen", required_argument, NULL, ARG_VALLEN}, {"transaction", required_argument, NULL, 'x'}, {"tps", required_argument, NULL, ARG_TPS}, + {"tpsmax", required_argument, NULL, ARG_TPSMAX}, + {"tpsmin", required_argument, NULL, ARG_TPSMIN}, + {"tpsinterval", required_argument, NULL, ARG_TPSINTERVAL}, + {"tpschange", required_argument, NULL, ARG_TPSCHANGE}, {"sampling", required_argument, NULL, ARG_SAMPLING}, {"verbose", required_argument, NULL, 'v'}, {"mode", required_argument, NULL, 'm'}, @@ -1273,7 +1291,26 @@ int parse_args(int argc, char *argv[], mako_args_t *args) { args->value_length = atoi(optarg); break; case ARG_TPS: - args->tps = atoi(optarg); + case ARG_TPSMAX: + args->tpsmax = atoi(optarg); + break; + case ARG_TPSMIN: + args->tpsmin = atoi(optarg); + break; + case ARG_TPSINTERVAL: + args->tpsinterval = atoi(optarg); + break; + case ARG_TPSCHANGE: + if (strcmp(optarg, "sin") == 0) + args->tpschange = TPS_SIN; + else if (strcmp(optarg, "square") == 0) + args->tpschange = TPS_SQUARE; + else if (strcmp(optarg, "pulse") == 0) + args->tpschange = TPS_PULSE; + else { + fprintf(stderr, "--tpschange must be sin, square or pulse\n"); + return -1; + } break; case ARG_SAMPLING: args->sampling = atoi(optarg); @@ -1300,6 +1337,9 @@ int parse_args(int argc, char *argv[], mako_args_t *args) { break; } } + if ((args->tpsmin == -1) || (args->tpsmin > args->tpsmax)) { + args->tpsmin = args->tpsmax; + } return 0; } @@ -1529,7 +1569,19 @@ void print_report(mako_args_t *args, mako_stats_t *stats, (double)durationns / 1000000000); printf("Total Processes: %8d\n", args->num_processes); printf("Total Threads: %8d\n", args->num_threads); - printf("Target TPS: %8d\n", args->tps); + if (args->tpsmax == args->tpsmin) + printf("Target TPS: %8d\n", args->tpsmax); + else { + printf("Target TPS (MAX): %8d\n", args->tpsmax); + printf("Target TPS (MIN): %8d\n", args->tpsmin); + printf("TPS Interval: %8d\n", args->tpsinterval); + printf("TPS Change: "); + switch (args->tpschange) { + case TPS_SIN: printf("%8s\n", "SIN"); break; + case TPS_SQUARE: printf("%8s\n", "SQUARE"); break; + case TPS_PULSE: printf("%8s\n", "PULSE"); break; + } + } printf("Total Xacts: %8lld\n", totalxacts); printf("Total Conflicts: %8lld\n", conflicts); printf("Total Errors: %8lld\n", totalerrors); @@ -1603,8 +1655,9 @@ void print_report(mako_args_t *args, mako_stats_t *stats, } int stats_process_main(mako_args_t *args, mako_stats_t *stats, - volatile int *signal) { + volatile double *throttle_factor, volatile int *signal) { struct timespec timer_start, timer_prev, timer_now; + double sin_factor; /* wait until the signal turn on */ while (*signal == SIGNAL_OFF) { @@ -1620,13 +1673,45 @@ int stats_process_main(mako_args_t *args, mako_stats_t *stats, while (*signal != SIGNAL_RED) { usleep(100000); /* sleep for 100ms */ clock_gettime(CLOCK_MONOTONIC_COARSE, &timer_now); - /* roughly 1 sec */ + + /* print stats every (roughly) 1 sec */ if (timer_now.tv_sec > timer_prev.tv_sec) { + + /* adjust throttle rate if needed */ + if (args->tpsmax != args->tpsmin) { + /* set the throttle factor between 0.0 and 1.0 */ + switch (args->tpschange) { + case TPS_SIN: + sin_factor = sin((timer_now.tv_sec % args->tpsinterval) / (double)args->tpsinterval * M_PI * 2) / 2.0 + 0.5; + *throttle_factor = 1 - (sin_factor * (1.0 - ((double)args->tpsmin / args->tpsmax))); + break; + case TPS_SQUARE: + if (timer_now.tv_sec % args->tpsinterval < (args->tpsinterval / 2)) { + /* set to max */ + *throttle_factor = 1.0; + } else { + /* set to min */ + *throttle_factor = (double)args->tpsmin / (double)args->tpsmax; + } + break; + case TPS_PULSE: + if (timer_now.tv_sec % args->tpsinterval == 0) { + /* set to max */ + *throttle_factor = 1.0; + } else { + /* set to min */ + *throttle_factor = (double)args->tpsmin / (double)args->tpsmax; + } + break; + } + } + if (args->verbose >= VERBOSE_DEFAULT) print_stats(args, stats, &timer_now, &timer_prev); timer_prev.tv_sec = timer_now.tv_sec; timer_prev.tv_nsec = timer_now.tv_nsec; } + } /* print report */ @@ -1714,6 +1799,7 @@ int main(int argc, char *argv[]) { /* get ready */ shm->signal = SIGNAL_OFF; shm->readycount = 0; + shm->throttle_factor = 1.0; /* fork worker processes */ worker_pids = (pid_t *)calloc(sizeof(pid_t), args.num_processes); @@ -1765,7 +1851,7 @@ int main(int argc, char *argv[]) { /* no stats needed for clean mode */ exit(0); } - stats_process_main(&args, stats, &shm->signal); + stats_process_main(&args, stats, &shm->throttle_factor, &shm->signal); exit(0); } diff --git a/bindings/c/test/mako/mako.h b/bindings/c/test/mako/mako.h index b8515092df..334a8774f8 100755 --- a/bindings/c/test/mako/mako.h +++ b/bindings/c/test/mako/mako.h @@ -34,36 +34,50 @@ */ /* transaction specification */ -#define OP_GETREADVERSION 0 -#define OP_GET 1 -#define OP_GETRANGE 2 -#define OP_SGET 3 -#define OP_SGETRANGE 4 -#define OP_UPDATE 5 -#define OP_INSERT 6 -#define OP_INSERTRANGE 7 -#define OP_CLEAR 8 -#define OP_SETCLEAR 9 -#define OP_CLEARRANGE 10 -#define OP_SETCLEARRANGE 11 -#define OP_COMMIT 12 -#define MAX_OP 13 /* update this when adding a new operation */ +enum Operations { + OP_GETREADVERSION, + OP_GET, + OP_GETRANGE, + OP_SGET, + OP_SGETRANGE, + OP_UPDATE, + OP_INSERT, + OP_INSERTRANGE, + OP_CLEAR, + OP_SETCLEAR, + OP_CLEARRANGE, + OP_SETCLEARRANGE, + OP_COMMIT, + MAX_OP /* must be the last item */ +}; #define OP_COUNT 0 #define OP_RANGE 1 #define OP_REVERSE 2 /* for arguments */ -#define ARG_KEYLEN 1 -#define ARG_VALLEN 2 -#define ARG_TPS 3 -#define ARG_COMMITGET 4 -#define ARG_SAMPLING 5 -#define ARG_VERSION 6 -#define ARG_KNOBS 7 -#define ARG_FLATBUFFERS 8 -#define ARG_TRACE 9 -#define ARG_TRACEPATH 10 +enum Arguments { + ARG_KEYLEN, + ARG_VALLEN, + ARG_TPS, + ARG_COMMITGET, + ARG_SAMPLING, + ARG_VERSION, + ARG_KNOBS, + ARG_FLATBUFFERS, + ARG_TRACE, + ARG_TRACEPATH, + ARG_TPSMAX, + ARG_TPSMIN, + ARG_TPSINTERVAL, + ARG_TPSCHANGE +}; + +enum TPSChangeTypes { + TPS_SIN, + TPS_SQUARE, + TPS_PULSE +}; #define KEYPREFIX "mako" #define KEYPREFIXLEN 4 @@ -84,7 +98,10 @@ typedef struct { int rows; /* is 2 billion enough? */ int seconds; int iteration; - int tps; + int tpsmax; + int tpsmin; + int tpsinterval; + int tpschange; int sampling; int key_length; int value_length; @@ -107,6 +124,7 @@ typedef struct { typedef struct { int signal; int readycount; + double throttle_factor; } mako_shmhdr_t; typedef struct {