Merge pull request #2133 from kaomakino/kaomakino/mako

Mako benchmark:  Add variable throttling
This commit is contained in:
Vishesh Yadav 2019-10-08 15:26:52 -07:00 committed by GitHub
commit 62972b523c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 142 additions and 38 deletions

View File

@ -8,6 +8,7 @@
#include <sys/wait.h> #include <sys/wait.h>
#include <time.h> #include <time.h>
#include <unistd.h> #include <unistd.h>
#include <math.h>
#if defined(__linux__) #if defined(__linux__)
#include <linux/limits.h> #include <linux/limits.h>
@ -643,7 +644,8 @@ int run_transaction(FDBTransaction *transaction, mako_args_t *args,
return 0; return 0;
} }
int run_workload(FDBTransaction *transaction, mako_args_t *args, int thread_tps, int run_workload(FDBTransaction *transaction, mako_args_t *args,
int thread_tps, volatile double *throttle_factor,
int thread_iters, volatile int *signal, mako_stats_t *stats) { int thread_iters, volatile int *signal, mako_stats_t *stats) {
int xacts = 0; int xacts = 0;
int rc = 0; int rc = 0;
@ -651,10 +653,13 @@ int run_workload(FDBTransaction *transaction, mako_args_t *args, int thread_tps,
char *keystr; char *keystr;
char *keystr2; char *keystr2;
char *valstr; char *valstr;
int current_tps;
if (thread_tps < 0) if (thread_tps < 0)
return 0; return 0;
current_tps = (int)((double)thread_tps * *throttle_factor);
keystr = (char *)malloc(sizeof(char) * args->key_length + 1); keystr = (char *)malloc(sizeof(char) * args->key_length + 1);
if (!keystr) if (!keystr)
return -1; return -1;
@ -674,7 +679,7 @@ int run_workload(FDBTransaction *transaction, mako_args_t *args, int thread_tps,
while (1) { while (1) {
if ((thread_tps > 0) && (xacts >= thread_tps)) { if ((thread_tps > 0) && (xacts >= current_tps)) {
/* throttling is on */ /* throttling is on */
clock_gettime(CLOCK_MONOTONIC_COARSE, &timer_now); clock_gettime(CLOCK_MONOTONIC_COARSE, &timer_now);
@ -685,6 +690,8 @@ int run_workload(FDBTransaction *transaction, mako_args_t *args, int thread_tps,
xacts = 0; xacts = 0;
timer_prev.tv_sec = timer_now.tv_sec; timer_prev.tv_sec = timer_now.tv_sec;
timer_prev.tv_nsec = timer_now.tv_nsec; timer_prev.tv_nsec = timer_now.tv_nsec;
/* update throttle rate */
current_tps = (int)((double)thread_tps * *throttle_factor);
} else { } else {
/* 1 second not passed, throttle */ /* 1 second not passed, throttle */
usleep(1000); usleep(1000);
@ -727,6 +734,7 @@ void *worker_thread(void *thread_args) {
int thread_iters = 0; int thread_iters = 0;
int op; int op;
volatile int *signal = &((thread_args_t *)thread_args)->process->shm->signal; volatile int *signal = &((thread_args_t *)thread_args)->process->shm->signal;
volatile double *throttle_factor = &((thread_args_t *)thread_args)->process->shm->throttle_factor;
volatile int *readycount = volatile int *readycount =
&((thread_args_t *)thread_args)->process->shm->readycount; &((thread_args_t *)thread_args)->process->shm->readycount;
mako_stats_t *stats = mako_stats_t *stats =
@ -747,8 +755,8 @@ void *worker_thread(void *thread_args) {
(unsigned int)pthread_self()); (unsigned int)pthread_self());
} }
if (args->tps) { if (args->tpsmax) {
thread_tps = compute_thread_tps(args->tps, worker_id, thread_id, thread_tps = compute_thread_tps(args->tpsmax, worker_id, thread_id,
args->num_processes, args->num_threads); args->num_processes, args->num_threads);
} }
@ -785,8 +793,8 @@ void *worker_thread(void *thread_args) {
/* run the workload */ /* run the workload */
else if (args->mode == MODE_RUN) { else if (args->mode == MODE_RUN) {
rc = run_workload(transaction, args, thread_tps, thread_iters, signal, rc = run_workload(transaction, args, thread_tps, throttle_factor,
stats); thread_iters, signal, stats);
if (rc < 0) { if (rc < 0) {
fprintf(stderr, "ERROR: run_workload failed\n"); fprintf(stderr, "ERROR: run_workload failed\n");
} }
@ -1014,7 +1022,10 @@ int init_args(mako_args_t *args) {
args->rows = 10000; args->rows = 10000;
args->seconds = 0; args->seconds = 0;
args->iteration = 0; args->iteration = 0;
args->tps = 0; args->tpsmax = 0;
args->tpsmin = -1;
args->tpsinterval = 10;
args->tpschange = TPS_SIN;
args->sampling = 1000; args->sampling = 1000;
args->key_length = 16; args->key_length = 16;
args->value_length = 16; args->value_length = 16;
@ -1172,7 +1183,10 @@ void usage() {
printf("%-24s%s\n", " --keylen=LENGTH", "Specify the key lengths"); printf("%-24s%s\n", " --keylen=LENGTH", "Specify the key lengths");
printf("%-24s%s\n", " --vallen=LENGTH", "Specify the value lengths"); printf("%-24s%s\n", " --vallen=LENGTH", "Specify the value lengths");
printf("%-24s%s\n", "-x, --transaction=SPEC", "Transaction specification"); printf("%-24s%s\n", "-x, --transaction=SPEC", "Transaction specification");
printf("%-24s%s\n", " --tps=TPS", "Specify the target TPS"); printf("%-24s%s\n", " --tps|--tpsmax=TPS", "Specify the target max TPS");
printf("%-24s%s\n", " --tpsmin=TPS", "Specify the target min TPS");
printf("%-24s%s\n", " --tpsinterval=SEC", "Specify the TPS change interval (Default: 10 seconds)");
printf("%-24s%s\n", " --tpschange=<sin|square|pulse>", "Specify the TPS change type (Default: sin)");
printf("%-24s%s\n", " --sampling=RATE", printf("%-24s%s\n", " --sampling=RATE",
"Specify the sampling rate for latency stats"); "Specify the sampling rate for latency stats");
printf("%-24s%s\n", "-m, --mode=MODE", printf("%-24s%s\n", "-m, --mode=MODE",
@ -1205,6 +1219,10 @@ int parse_args(int argc, char *argv[], mako_args_t *args) {
{"vallen", required_argument, NULL, ARG_VALLEN}, {"vallen", required_argument, NULL, ARG_VALLEN},
{"transaction", required_argument, NULL, 'x'}, {"transaction", required_argument, NULL, 'x'},
{"tps", required_argument, NULL, ARG_TPS}, {"tps", required_argument, NULL, ARG_TPS},
{"tpsmax", required_argument, NULL, ARG_TPSMAX},
{"tpsmin", required_argument, NULL, ARG_TPSMIN},
{"tpsinterval", required_argument, NULL, ARG_TPSINTERVAL},
{"tpschange", required_argument, NULL, ARG_TPSCHANGE},
{"sampling", required_argument, NULL, ARG_SAMPLING}, {"sampling", required_argument, NULL, ARG_SAMPLING},
{"verbose", required_argument, NULL, 'v'}, {"verbose", required_argument, NULL, 'v'},
{"mode", required_argument, NULL, 'm'}, {"mode", required_argument, NULL, 'm'},
@ -1273,7 +1291,26 @@ int parse_args(int argc, char *argv[], mako_args_t *args) {
args->value_length = atoi(optarg); args->value_length = atoi(optarg);
break; break;
case ARG_TPS: case ARG_TPS:
args->tps = atoi(optarg); case ARG_TPSMAX:
args->tpsmax = atoi(optarg);
break;
case ARG_TPSMIN:
args->tpsmin = atoi(optarg);
break;
case ARG_TPSINTERVAL:
args->tpsinterval = atoi(optarg);
break;
case ARG_TPSCHANGE:
if (strcmp(optarg, "sin") == 0)
args->tpschange = TPS_SIN;
else if (strcmp(optarg, "square") == 0)
args->tpschange = TPS_SQUARE;
else if (strcmp(optarg, "pulse") == 0)
args->tpschange = TPS_PULSE;
else {
fprintf(stderr, "--tpschange must be sin, square or pulse\n");
return -1;
}
break; break;
case ARG_SAMPLING: case ARG_SAMPLING:
args->sampling = atoi(optarg); args->sampling = atoi(optarg);
@ -1300,6 +1337,9 @@ int parse_args(int argc, char *argv[], mako_args_t *args) {
break; break;
} }
} }
if ((args->tpsmin == -1) || (args->tpsmin > args->tpsmax)) {
args->tpsmin = args->tpsmax;
}
return 0; return 0;
} }
@ -1529,7 +1569,19 @@ void print_report(mako_args_t *args, mako_stats_t *stats,
(double)durationns / 1000000000); (double)durationns / 1000000000);
printf("Total Processes: %8d\n", args->num_processes); printf("Total Processes: %8d\n", args->num_processes);
printf("Total Threads: %8d\n", args->num_threads); printf("Total Threads: %8d\n", args->num_threads);
printf("Target TPS: %8d\n", args->tps); if (args->tpsmax == args->tpsmin)
printf("Target TPS: %8d\n", args->tpsmax);
else {
printf("Target TPS (MAX): %8d\n", args->tpsmax);
printf("Target TPS (MIN): %8d\n", args->tpsmin);
printf("TPS Interval: %8d\n", args->tpsinterval);
printf("TPS Change: ");
switch (args->tpschange) {
case TPS_SIN: printf("%8s\n", "SIN"); break;
case TPS_SQUARE: printf("%8s\n", "SQUARE"); break;
case TPS_PULSE: printf("%8s\n", "PULSE"); break;
}
}
printf("Total Xacts: %8lld\n", totalxacts); printf("Total Xacts: %8lld\n", totalxacts);
printf("Total Conflicts: %8lld\n", conflicts); printf("Total Conflicts: %8lld\n", conflicts);
printf("Total Errors: %8lld\n", totalerrors); printf("Total Errors: %8lld\n", totalerrors);
@ -1603,8 +1655,9 @@ void print_report(mako_args_t *args, mako_stats_t *stats,
} }
int stats_process_main(mako_args_t *args, mako_stats_t *stats, int stats_process_main(mako_args_t *args, mako_stats_t *stats,
volatile int *signal) { volatile double *throttle_factor, volatile int *signal) {
struct timespec timer_start, timer_prev, timer_now; struct timespec timer_start, timer_prev, timer_now;
double sin_factor;
/* wait until the signal turn on */ /* wait until the signal turn on */
while (*signal == SIGNAL_OFF) { while (*signal == SIGNAL_OFF) {
@ -1620,13 +1673,45 @@ int stats_process_main(mako_args_t *args, mako_stats_t *stats,
while (*signal != SIGNAL_RED) { while (*signal != SIGNAL_RED) {
usleep(100000); /* sleep for 100ms */ usleep(100000); /* sleep for 100ms */
clock_gettime(CLOCK_MONOTONIC_COARSE, &timer_now); clock_gettime(CLOCK_MONOTONIC_COARSE, &timer_now);
/* roughly 1 sec */
/* print stats every (roughly) 1 sec */
if (timer_now.tv_sec > timer_prev.tv_sec) { if (timer_now.tv_sec > timer_prev.tv_sec) {
/* adjust throttle rate if needed */
if (args->tpsmax != args->tpsmin) {
/* set the throttle factor between 0.0 and 1.0 */
switch (args->tpschange) {
case TPS_SIN:
sin_factor = sin((timer_now.tv_sec % args->tpsinterval) / (double)args->tpsinterval * M_PI * 2) / 2.0 + 0.5;
*throttle_factor = 1 - (sin_factor * (1.0 - ((double)args->tpsmin / args->tpsmax)));
break;
case TPS_SQUARE:
if (timer_now.tv_sec % args->tpsinterval < (args->tpsinterval / 2)) {
/* set to max */
*throttle_factor = 1.0;
} else {
/* set to min */
*throttle_factor = (double)args->tpsmin / (double)args->tpsmax;
}
break;
case TPS_PULSE:
if (timer_now.tv_sec % args->tpsinterval == 0) {
/* set to max */
*throttle_factor = 1.0;
} else {
/* set to min */
*throttle_factor = (double)args->tpsmin / (double)args->tpsmax;
}
break;
}
}
if (args->verbose >= VERBOSE_DEFAULT) if (args->verbose >= VERBOSE_DEFAULT)
print_stats(args, stats, &timer_now, &timer_prev); print_stats(args, stats, &timer_now, &timer_prev);
timer_prev.tv_sec = timer_now.tv_sec; timer_prev.tv_sec = timer_now.tv_sec;
timer_prev.tv_nsec = timer_now.tv_nsec; timer_prev.tv_nsec = timer_now.tv_nsec;
} }
} }
/* print report */ /* print report */
@ -1714,6 +1799,7 @@ int main(int argc, char *argv[]) {
/* get ready */ /* get ready */
shm->signal = SIGNAL_OFF; shm->signal = SIGNAL_OFF;
shm->readycount = 0; shm->readycount = 0;
shm->throttle_factor = 1.0;
/* fork worker processes */ /* fork worker processes */
worker_pids = (pid_t *)calloc(sizeof(pid_t), args.num_processes); worker_pids = (pid_t *)calloc(sizeof(pid_t), args.num_processes);
@ -1765,7 +1851,7 @@ int main(int argc, char *argv[]) {
/* no stats needed for clean mode */ /* no stats needed for clean mode */
exit(0); exit(0);
} }
stats_process_main(&args, stats, &shm->signal); stats_process_main(&args, stats, &shm->throttle_factor, &shm->signal);
exit(0); exit(0);
} }

View File

@ -34,36 +34,50 @@
*/ */
/* transaction specification */ /* transaction specification */
#define OP_GETREADVERSION 0 enum Operations {
#define OP_GET 1 OP_GETREADVERSION,
#define OP_GETRANGE 2 OP_GET,
#define OP_SGET 3 OP_GETRANGE,
#define OP_SGETRANGE 4 OP_SGET,
#define OP_UPDATE 5 OP_SGETRANGE,
#define OP_INSERT 6 OP_UPDATE,
#define OP_INSERTRANGE 7 OP_INSERT,
#define OP_CLEAR 8 OP_INSERTRANGE,
#define OP_SETCLEAR 9 OP_CLEAR,
#define OP_CLEARRANGE 10 OP_SETCLEAR,
#define OP_SETCLEARRANGE 11 OP_CLEARRANGE,
#define OP_COMMIT 12 OP_SETCLEARRANGE,
#define MAX_OP 13 /* update this when adding a new operation */ OP_COMMIT,
MAX_OP /* must be the last item */
};
#define OP_COUNT 0 #define OP_COUNT 0
#define OP_RANGE 1 #define OP_RANGE 1
#define OP_REVERSE 2 #define OP_REVERSE 2
/* for arguments */ /* for arguments */
#define ARG_KEYLEN 1 enum Arguments {
#define ARG_VALLEN 2 ARG_KEYLEN,
#define ARG_TPS 3 ARG_VALLEN,
#define ARG_COMMITGET 4 ARG_TPS,
#define ARG_SAMPLING 5 ARG_COMMITGET,
#define ARG_VERSION 6 ARG_SAMPLING,
#define ARG_KNOBS 7 ARG_VERSION,
#define ARG_FLATBUFFERS 8 ARG_KNOBS,
#define ARG_TRACE 9 ARG_FLATBUFFERS,
#define ARG_TRACEPATH 10 ARG_TRACE,
ARG_TRACEPATH,
ARG_TPSMAX,
ARG_TPSMIN,
ARG_TPSINTERVAL,
ARG_TPSCHANGE
};
enum TPSChangeTypes {
TPS_SIN,
TPS_SQUARE,
TPS_PULSE
};
#define KEYPREFIX "mako" #define KEYPREFIX "mako"
#define KEYPREFIXLEN 4 #define KEYPREFIXLEN 4
@ -84,7 +98,10 @@ typedef struct {
int rows; /* is 2 billion enough? */ int rows; /* is 2 billion enough? */
int seconds; int seconds;
int iteration; int iteration;
int tps; int tpsmax;
int tpsmin;
int tpsinterval;
int tpschange;
int sampling; int sampling;
int key_length; int key_length;
int value_length; int value_length;
@ -107,6 +124,7 @@ typedef struct {
typedef struct { typedef struct {
int signal; int signal;
int readycount; int readycount;
double throttle_factor;
} mako_shmhdr_t; } mako_shmhdr_t;
typedef struct { typedef struct {