added median and 99.9 percentile latency measurement

Description

Testing
This commit is contained in:
Bhawani Shankar Sharma 2020-06-18 21:02:48 +00:00
parent 3fe8db55c5
commit b5113527f8
4 changed files with 259 additions and 11 deletions

View File

@ -9,6 +9,8 @@
#include <sys/wait.h>
#include <time.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#if defined(__linux__)
#include <linux/limits.h>
@ -111,6 +113,31 @@ void update_op_lat_stats(struct timespec* start, struct timespec* end, int op, m
if (latencyus > stats->latency_us_max[op]) {
stats->latency_us_max[op] = latencyus;
}
}
void update_op_lat_stats_Temp(struct timespec* start, struct timespec* end, int op, mako_stats_t* stats, uint64_t* data[], int* elem_size) {
uint64_t latencyus;
latencyus = (((uint64_t)end->tv_sec * 1000000000 + end->tv_nsec) -
((uint64_t)start->tv_sec * 1000000000 + start->tv_nsec)) /
1000;
stats->latency_samples[op]++;
stats->latency_us_total[op] += latencyus;
if (latencyus < stats->latency_us_min[op]) {
stats->latency_us_min[op] = latencyus;
}
if (latencyus > stats->latency_us_max[op]) {
stats->latency_us_max[op] = latencyus;
}
if(elem_size[op]<stats->latency_samples[op]){
elem_size[op] = elem_size[op]*2+1;
data[op] = (uint64_t*)realloc(data[op],sizeof(uint64_t)*(elem_size[op]));
//printf("%d_%lld_%d\n",op,latencyus,elem_size[op]);
}
data[op][stats->latency_samples[op]-1] = latencyus;
}
/* FDB network thread */
@ -393,7 +420,7 @@ int run_op_clearrange(FDBTransaction* transaction, char* keystr, char* keystr2)
/* run one transaction */
int run_one_transaction(FDBTransaction* transaction, mako_args_t* args, mako_stats_t* stats, char* keystr,
char* keystr2, char* valstr) {
char* keystr2, char* valstr, uint64_t* data[], int * elem_size) {
int i;
int count;
int rc;
@ -496,7 +523,7 @@ retryTxn:
if (rc == FDB_SUCCESS) {
stats->ops[OP_COMMIT]++;
clock_gettime(CLOCK_MONOTONIC, &timer_per_xact_end);
update_op_lat_stats(&timer_per_xact_start, &timer_per_xact_end, OP_COMMIT, stats);
update_op_lat_stats_Temp(&timer_per_xact_start, &timer_per_xact_end, OP_COMMIT, stats,data,elem_size);
} else {
/* error */
if (rc == FDB_ERROR_CONFLICT) {
@ -546,7 +573,7 @@ retryTxn:
if (rc == FDB_SUCCESS) {
stats->ops[OP_COMMIT]++;
clock_gettime(CLOCK_MONOTONIC, &timer_per_xact_end);
update_op_lat_stats(&timer_per_xact_start, &timer_per_xact_end, OP_COMMIT, stats);
update_op_lat_stats_Temp(&timer_per_xact_start, &timer_per_xact_end, OP_COMMIT, stats,data,elem_size);
} else {
/* error */
if (rc == FDB_ERROR_CONFLICT) {
@ -574,7 +601,7 @@ retryTxn:
clock_gettime(CLOCK_MONOTONIC, &timer_end);
if (rc == FDB_SUCCESS) {
/* per op latency, record successful transactions */
update_op_lat_stats(&timer_start, &timer_end, i, stats);
update_op_lat_stats_Temp(&timer_start, &timer_end, i, stats, data,elem_size);
}
}
@ -606,7 +633,7 @@ retryTxn:
/* success */
stats->ops[OP_COMMIT]++;
clock_gettime(CLOCK_MONOTONIC, &timer_per_xact_end);
update_op_lat_stats(&timer_per_xact_start, &timer_per_xact_end, OP_COMMIT, stats);
update_op_lat_stats_Temp(&timer_per_xact_start, &timer_per_xact_end, OP_COMMIT, stats, data,elem_size);
} else {
/* error */
if (rc == FDB_ERROR_CONFLICT) {
@ -631,7 +658,7 @@ retryTxn:
}
int run_workload(FDBTransaction* transaction, mako_args_t* args, int thread_tps, volatile double* throttle_factor,
int thread_iters, volatile int* signal, mako_stats_t* stats, int dotrace) {
int thread_iters, volatile int* signal, mako_stats_t* stats, int dotrace, uint64_t* data[], int* elem_size) {
int xacts = 0;
int64_t total_xacts = 0;
int rc = 0;
@ -715,7 +742,7 @@ int run_workload(FDBTransaction* transaction, mako_args_t* args, int thread_tps,
}
} /* throttle or txntrace */
rc = run_one_transaction(transaction, args, stats, keystr, keystr2, valstr);
rc = run_one_transaction(transaction, args, stats, keystr, keystr2, valstr,data, elem_size);
if (rc) {
/* FIXME: run_one_transaction should return something meaningful */
fprintf(annoyme, "ERROR: run_one_transaction failed (%d)\n", rc);
@ -743,6 +770,58 @@ int run_workload(FDBTransaction* transaction, mako_args_t* args, int thread_tps,
return rc;
}
void getFileName(char filename[], int worker_id, int thread_id, int op){
char str1[10],str2[10];
sprintf(str1,"%d",worker_id+1);
sprintf(str2,"%d",thread_id+1);
strcat(filename,"/dev/shm/tempDataFile/");
strcat(filename,str1);
strcat(filename,"_");
strcat(filename,str2);
strcat(filename,"_");
switch (op) {
case OP_GETREADVERSION:
strcat(filename, "GRV");
break;
case OP_GET:
strcat(filename, "GET");
break;
case OP_GETRANGE:
strcat(filename, "GETRANGE");
break;
case OP_SGET:
strcat(filename, "SGET");
break;
case OP_SGETRANGE:
strcat(filename, "SGETRANGE");
break;
case OP_UPDATE:
strcat(filename, "UPDATE");
break;
case OP_INSERT:
strcat(filename, "INSERT");
break;
case OP_INSERTRANGE:
strcat(filename, "INSERTRANGE");
break;
case OP_CLEAR:
strcat(filename, "CLEAR");
break;
case OP_SETCLEAR:
strcat(filename, "SETCLEAR");
break;
case OP_CLEARRANGE:
strcat(filename, "CLEARRANGE");
break;
case OP_SETCLEARRANGE:
strcat(filename, "SETCLRRANGE");
break;
case OP_COMMIT:
strcat(filename, "TPS");
break;
}
}
/* mako worker thread */
void* worker_thread(void* thread_args) {
int worker_id = ((thread_args_t*)thread_args)->process->worker_id;
@ -755,18 +834,24 @@ void* worker_thread(void* thread_args) {
int thread_tps = 0;
int thread_iters = 0;
int op;
int i;
int dotrace = (worker_id == 0 && thread_id == 0 && args->txntrace) ? args->txntrace : 0;
volatile int* signal = &((thread_args_t*)thread_args)->process->shm->signal;
volatile double* throttle_factor = &((thread_args_t*)thread_args)->process->shm->throttle_factor;
volatile int* readycount = &((thread_args_t*)thread_args)->process->shm->readycount;
volatile int* stopcount = &((thread_args_t*)thread_args)->process->shm->stopcount;
mako_stats_t* stats = (void*)((thread_args_t*)thread_args)->process->shm + sizeof(mako_shmhdr_t) /* skip header */
+ (sizeof(mako_stats_t) * (worker_id * args->num_threads + thread_id));
uint64_t** data = &((thread_args_t*)thread_args)->data[0];
int* elem_size = &((thread_args_t*)thread_args)->elem_size[0];
/* init latency */
for (op = 0; op < MAX_OP; op++) {
stats->latency_us_min[op] = 0xFFFFFFFFFFFFFFFF; /* uint64_t */
stats->latency_us_max[op] = 0;
stats->latency_us_total[op] = 0;
stats->latency_samples[op] = 0;
}
fprintf(debugme, "DEBUG: worker_id:%d (%d) thread_id:%d (%d) (tid:%d)\n", worker_id, args->num_processes, thread_id,
@ -809,14 +894,35 @@ void* worker_thread(void* thread_args) {
/* run the workload */
else if (args->mode == MODE_RUN) {
rc = run_workload(transaction, args, thread_tps, throttle_factor, thread_iters, signal, stats, dotrace);
rc = run_workload(transaction, args, thread_tps, throttle_factor, thread_iters, signal, stats, dotrace, data, elem_size);
if (rc < 0) {
fprintf(stderr, "ERROR: run_workload failed\n");
}
rc = mkdir("/dev/shm/tempDataFile", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
for (op = 0; op < MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op==OP_COMMIT) {
FILE* fp;
char fileName[NAME_MAX] = {'\0'};
getFileName(fileName, worker_id, thread_id, op);
fp = fopen(fileName,"w");
int size = stats->latency_samples[op];
for(i=0; i<size; i++){
fwrite(&data[op][i], sizeof(uint64_t),1,fp);
}
fclose(fp);
}
}
}
__sync_fetch_and_add(stopcount, 1);
/* fall through */
failExit:
for(op=0; i<MAX_OP; i++) {
if(args->txnspec.ops[op][OP_COUNT] > 0 || op==OP_COMMIT) {
if(stats->latency_samples[op]) free(data[op]);
}
}
fdb_transaction_destroy(transaction);
pthread_exit(0);
}
@ -940,6 +1046,10 @@ int worker_process_main(mako_args_t* args, int worker_id, mako_shmhdr_t* shm) {
for (i = 0; i < args->num_threads; i++) {
thread_args[i].thread_id = i;
for(int j=0;j<MAX_OP;j++){
thread_args[i].data[j] = (uint64_t*)malloc(sizeof(uint64_t)*INIT_STORE);
thread_args[i].elem_size[j] = 0;
}
thread_args[i].process = &process;
rc = pthread_create(&worker_threads[i], NULL, worker_thread, (void*)&thread_args[i]);
if (rc != 0) {
@ -959,6 +1069,7 @@ int worker_process_main(mako_args_t* args, int worker_id, mako_shmhdr_t* shm) {
}
}
failExit:
if (worker_threads) free(worker_threads);
if (thread_args) free(thread_args);
@ -1445,6 +1556,7 @@ void print_stats(mako_args_t* args, mako_stats_t* stats, struct timespec* now, s
return;
}
void print_stats_header(mako_args_t* args) {
int op;
int i;
@ -1513,7 +1625,7 @@ void print_stats_header(mako_args_t* args) {
}
void print_report(mako_args_t* args, mako_stats_t* stats, struct timespec* timer_now, struct timespec* timer_start) {
int i, j, op;
int i, j, k, op, index;
uint64_t totalxacts = 0;
uint64_t conflicts = 0;
uint64_t totalerrors = 0;
@ -1647,9 +1759,84 @@ void print_report(mako_args_t* args, mako_stats_t* stats, struct timespec* timer
}
}
printf("\n");
/* Total Samples */
printf("%-" STR(STATS_TITLE_WIDTH) "s ", "Total Sample");
for (op = 0; op < MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op == OP_COMMIT) {
if (lat_total[op]) {
printf("%" STR(STATS_FIELD_WIDTH) "lld ", lat_samples[op]);
} else {
printf("%" STR(STATS_FIELD_WIDTH) "s ", "N/A");
}
}
}
printf("\n");
uint64_t* dataPoints[MAX_OP];
uint64_t median;
int point_99pct;
printf("%-" STR(STATS_TITLE_WIDTH) "s ", "Median");
for (op = 0; op < MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op==OP_COMMIT) {
if(lat_total[op]){
dataPoints[op] = (uint64_t*)malloc(sizeof(uint64_t)*lat_samples[op]);
k = 0;
for (i = 0; i < args->num_processes; i++) {
for (j = 0; j < args->num_threads; j++) {
char fileName[NAME_MAX] = {'\0'};
getFileName(fileName, i, j, op);
FILE* f = fopen(fileName,"r");
fseek(f,0,SEEK_END);
int numPoints = ftell(f)/sizeof(uint64_t);
fseek(f,0,0);
index = 0;
while(index<numPoints) {
fread(&dataPoints[op][k++],sizeof(uint64_t),1,f);
++index;
}
fclose(f);
}
}
radix_sort(dataPoints[op],lat_samples[op]);
if(lat_samples[op] & 1){
median = dataPoints[op][lat_samples[op]/2];
}
else{
median = (dataPoints[op][lat_samples[op]/2]+dataPoints[op][lat_samples[op]/2-1])>>1;
}
printf("%" STR(STATS_FIELD_WIDTH) "lld ", median);
}
else {
printf("%" STR(STATS_FIELD_WIDTH) "s ", "N/A");
}
}
}
printf("\n");
printf("%-" STR(STATS_TITLE_WIDTH) "s ", "99.9 pctile");
for (op = 0; op < MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op==OP_COMMIT) {
if(lat_total[op]){
point_99pct = ((float)(lat_samples[op])*0.999)-1;
printf("%" STR(STATS_FIELD_WIDTH) "lld ", dataPoints[op][point_99pct]);
}
else{
printf("%" STR(STATS_FIELD_WIDTH) "s ", "N/A");
}
}
}
printf("\n");
for(op=0; i<MAX_OP; i++) {
if(args->txnspec.ops[op][OP_COUNT] > 0 || op==OP_COMMIT) {
if(lat_total[op]) free(dataPoints[op]);
}
}
}
int stats_process_main(mako_args_t* args, mako_stats_t* stats, volatile double* throttle_factor, volatile int* signal) {
int stats_process_main(mako_args_t* args, mako_stats_t* stats, volatile double* throttle_factor, volatile int* signal, volatile int* stopcount) {
struct timespec timer_start, timer_prev, timer_now;
double sin_factor;
@ -1709,6 +1896,9 @@ int stats_process_main(mako_args_t* args, mako_stats_t* stats, volatile double*
/* print report */
if (args->verbose >= VERBOSE_DEFAULT) {
clock_gettime(CLOCK_MONOTONIC_COARSE, &timer_now);
while (*stopcount < args->num_threads*args->num_processes) {
usleep(10000); /* 10ms */
}
print_report(args, stats, &timer_now, &timer_start);
}
@ -1786,6 +1976,7 @@ int main(int argc, char* argv[]) {
/* get ready */
shm->signal = SIGNAL_OFF;
shm->readycount = 0;
shm->stopcount = 0;
shm->throttle_factor = 1.0;
/* fork worker processes + 1 stats process */
@ -1837,7 +2028,7 @@ int main(int argc, char* argv[]) {
/* no stats needed for clean mode */
exit(0);
}
stats_process_main(&args, stats, &shm->throttle_factor, &shm->signal);
stats_process_main(&args, stats, &shm->throttle_factor, &shm->signal, &shm->stopcount);
exit(0);
}

View File

@ -32,6 +32,8 @@
#define FDB_ERROR_ABORT -2
#define FDB_ERROR_CONFLICT -3
#define INIT_STORE 0
/* transaction specification */
enum Operations {
OP_GETREADVERSION,
@ -129,6 +131,7 @@ typedef struct {
int signal;
int readycount;
double throttle_factor;
int stopcount;
} mako_shmhdr_t;
typedef struct {
@ -153,6 +156,8 @@ typedef struct {
/* args for threads */
typedef struct {
int thread_id;
int elem_size[MAX_OP];
uint64_t* data[MAX_OP];
process_info_t* process;
} thread_args_t;

View File

@ -4,6 +4,8 @@
#include <stdio.h>
#include <stdlib.h>
/* uniform-distribution random */
int urand(int low, int high) {
double r = rand() / (1.0 + RAND_MAX);
@ -77,3 +79,44 @@ void genkey(char* str, int num, int rows, int len) {
}
str[len - 1] = '\0';
}
uint64_t getMax(uint64_t arr[], int n)
{
uint64_t mx = arr[0];
for (int i = 1; i < n; i++)
if (arr[i] > mx)
mx = arr[i];
return mx;
}
void countSort(uint64_t arr[], int n, uint64_t exp)
{
uint64_t output[n];
int i, count[10] = {0};
for (i = 0; i < n; i++)
count[ (arr[i]/exp)%10 ]++;
for (i = 1; i < 10; i++)
count[i] += count[i - 1];
for (i = n - 1; i >= 0; i--)
{
output[count[ (arr[i]/exp)%10 ] - 1] = arr[i];
count[ (arr[i]/exp)%10 ]--;
}
for (i = 0; i < n; i++)
arr[i] = output[i];
}
// The main function to that sorts arr[] of size n using
// Radix Sort
void radix_sort(uint64_t arr[], int n) {
// Find the maximum number to know number of digits
uint64_t m = getMax(arr, n);
for (uint64_t exp = 1; m/exp > 0; exp *= 10)
countSort(arr, n, exp);
}

View File

@ -2,6 +2,8 @@
#define UTILS_H
#pragma once
#include <stdint.h>
/* uniform-distribution random */
/* return a uniform random number between low and high, both inclusive */
int urand(int low, int high);
@ -48,4 +50,11 @@ int digits(int num);
/* len is the buffer size, key length + null */
void genkey(char* str, int num, int rows, int len);
// The main function to that sorts arr[] of size n using
// Radix Sort
void radix_sort(uint64_t arr[], int n);
void countSort(uint64_t arr[], int n, uint64_t exp) ;
uint64_t getMax(uint64_t arr[], int n) ;
#endif /* UTILS_H */