Added detailed latency for each operation.

Description
Median, 95 %ile, 99 %ile, 99.9 %ile.

Testing
This commit is contained in:
Bhawani Shankar Sharma 2020-06-22 21:11:27 +00:00
parent b5113527f8
commit 6a7d3bafe3
4 changed files with 216 additions and 118 deletions

View File

@ -99,25 +99,7 @@ int commit_transaction(FDBTransaction* transaction) {
return FDB_SUCCESS;
}
void update_op_lat_stats(struct timespec* start, struct timespec* end, int op, mako_stats_t* stats) {
uint64_t latencyus;
latencyus = (((uint64_t)end->tv_sec * 1000000000 + end->tv_nsec) -
((uint64_t)start->tv_sec * 1000000000 + start->tv_nsec)) /
1000;
stats->latency_samples[op]++;
stats->latency_us_total[op] += latencyus;
if (latencyus < stats->latency_us_min[op]) {
stats->latency_us_min[op] = latencyus;
}
if (latencyus > stats->latency_us_max[op]) {
stats->latency_us_max[op] = latencyus;
}
}
void update_op_lat_stats_Temp(struct timespec* start, struct timespec* end, int op, mako_stats_t* stats, uint64_t* data[], int* elem_size) {
void update_op_lat_stats(struct timespec* start, struct timespec* end, int op, mako_stats_t* stats, lat_block_t* block[], int* elem_size, bool* is_memory_allocated) {
uint64_t latencyus;
latencyus = (((uint64_t)end->tv_sec * 1000000000 + end->tv_nsec) -
@ -131,13 +113,19 @@ void update_op_lat_stats_Temp(struct timespec* start, struct timespec* end, int
if (latencyus > stats->latency_us_max[op]) {
stats->latency_us_max[op] = latencyus;
}
if(!is_memory_allocated[op]) return;
if(elem_size[op]<stats->latency_samples[op]){
elem_size[op] = elem_size[op]*2+1;
data[op] = (uint64_t*)realloc(data[op],sizeof(uint64_t)*(elem_size[op]));
//printf("%d_%lld_%d\n",op,latencyus,elem_size[op]);
elem_size[op] = elem_size[op]+LAT_BLOCK_SIZE;
lat_block_t* temp_block = (lat_block_t*)malloc(sizeof(lat_block_t));
if(temp_block==NULL){
is_memory_allocated[op] = false;
return;
}
temp_block->next_block = NULL;
block[op]->next_block = (lat_block_t*)temp_block;
block[op] = temp_block;
}
data[op][stats->latency_samples[op]-1] = latencyus;
block[op]->data[(stats->latency_samples[op]-1)%LAT_BLOCK_SIZE] = latencyus;
}
/* FDB network thread */
@ -182,11 +170,12 @@ failExit:
/* populate database */
int populate(FDBTransaction* transaction, mako_args_t* args, int worker_id, int thread_id, int thread_tps,
mako_stats_t* stats) {
mako_stats_t* stats, lat_block_t* block[], int* elem_size, bool* is_memory_allocated) {
int i;
struct timespec timer_start, timer_end;
struct timespec timer_prev, timer_now; /* for throttling */
struct timespec timer_per_xact_start, timer_per_xact_end;
struct timespec timer_start_commit;
char* keystr;
char* valstr;
@ -264,12 +253,13 @@ int populate(FDBTransaction* transaction, mako_args_t* args, int worker_id, int
/* commit every 100 inserts (default) */
if (i % args->txnspec.ops[OP_INSERT][OP_COUNT] == 0) {
clock_gettime(CLOCK_MONOTONIC, &timer_start_commit);
if (commit_transaction(transaction) != FDB_SUCCESS) goto failExit;
/* xact latency stats */
clock_gettime(CLOCK_MONOTONIC, &timer_per_xact_end);
update_op_lat_stats(&timer_per_xact_start, &timer_per_xact_end, OP_COMMIT, stats);
update_op_lat_stats(&timer_start_commit, &timer_per_xact_end, OP_COMMIT, stats, block, elem_size, is_memory_allocated);
update_op_lat_stats(&timer_per_xact_start, &timer_per_xact_end, OP_TRANSACTION, stats, block, elem_size, is_memory_allocated);
stats->ops[OP_COMMIT]++;
clock_gettime(CLOCK_MONOTONIC, &timer_per_xact_start);
@ -279,11 +269,13 @@ int populate(FDBTransaction* transaction, mako_args_t* args, int worker_id, int
}
}
clock_gettime(CLOCK_MONOTONIC, &timer_start_commit);
if (commit_transaction(transaction) != FDB_SUCCESS) goto failExit;
/* xact latency stats */
clock_gettime(CLOCK_MONOTONIC, &timer_per_xact_end);
update_op_lat_stats(&timer_per_xact_start, &timer_per_xact_end, OP_COMMIT, stats);
update_op_lat_stats(&timer_start_commit, &timer_per_xact_end, OP_COMMIT, stats, block, elem_size, is_memory_allocated);
update_op_lat_stats(&timer_per_xact_start, &timer_per_xact_end, OP_TRANSACTION, stats, block, elem_size, is_memory_allocated);
clock_gettime(CLOCK_MONOTONIC, &timer_end);
stats->xacts++;
@ -420,12 +412,13 @@ int run_op_clearrange(FDBTransaction* transaction, char* keystr, char* keystr2)
/* run one transaction */
int run_one_transaction(FDBTransaction* transaction, mako_args_t* args, mako_stats_t* stats, char* keystr,
char* keystr2, char* valstr, uint64_t* data[], int * elem_size) {
char* keystr2, char* valstr, lat_block_t* block[], int * elem_size, bool* is_memory_allocated) {
int i;
int count;
int rc;
struct timespec timer_start, timer_end;
struct timespec timer_per_xact_start, timer_per_xact_end;
struct timespec timer_start_commit;
int docommit = 0;
int keynum;
int keyend;
@ -519,11 +512,14 @@ retryTxn:
rc = run_op_insert(transaction, keystr, valstr);
if (rc == FDB_SUCCESS) {
/* commit insert so mutation goes to storage */
// timer_1
clock_gettime(CLOCK_MONOTONIC, &timer_start_commit);
rc = commit_transaction(transaction);
if (rc == FDB_SUCCESS) {
stats->ops[OP_COMMIT]++;
clock_gettime(CLOCK_MONOTONIC, &timer_per_xact_end);
update_op_lat_stats_Temp(&timer_per_xact_start, &timer_per_xact_end, OP_COMMIT, stats,data,elem_size);
update_op_lat_stats(&timer_start_commit, &timer_per_xact_end, OP_COMMIT, stats, block, elem_size, is_memory_allocated);
update_op_lat_stats(&timer_per_xact_start, &timer_per_xact_end, OP_TRANSACTION, stats, block, elem_size, is_memory_allocated);
} else {
/* error */
if (rc == FDB_ERROR_CONFLICT) {
@ -569,11 +565,13 @@ retryTxn:
}
}
/* commit insert so mutation goes to storage */
clock_gettime(CLOCK_MONOTONIC, &timer_start_commit);
rc = commit_transaction(transaction);
if (rc == FDB_SUCCESS) {
stats->ops[OP_COMMIT]++;
clock_gettime(CLOCK_MONOTONIC, &timer_per_xact_end);
update_op_lat_stats_Temp(&timer_per_xact_start, &timer_per_xact_end, OP_COMMIT, stats,data,elem_size);
update_op_lat_stats(&timer_start_commit, &timer_per_xact_end, OP_COMMIT, stats, block, elem_size, is_memory_allocated);
update_op_lat_stats(&timer_per_xact_start, &timer_per_xact_end, OP_TRANSACTION, stats, block, elem_size, is_memory_allocated);
} else {
/* error */
if (rc == FDB_ERROR_CONFLICT) {
@ -601,7 +599,7 @@ retryTxn:
clock_gettime(CLOCK_MONOTONIC, &timer_end);
if (rc == FDB_SUCCESS) {
/* per op latency, record successful transactions */
update_op_lat_stats_Temp(&timer_start, &timer_end, i, stats, data,elem_size);
update_op_lat_stats(&timer_start, &timer_end, i, stats, block, elem_size, is_memory_allocated);
}
}
@ -628,12 +626,14 @@ retryTxn:
/* commit only successful transaction */
if (docommit | args->commit_get) {
clock_gettime(CLOCK_MONOTONIC, &timer_start_commit);
rc = commit_transaction(transaction);
if (rc == FDB_SUCCESS) {
/* success */
stats->ops[OP_COMMIT]++;
clock_gettime(CLOCK_MONOTONIC, &timer_per_xact_end);
update_op_lat_stats_Temp(&timer_per_xact_start, &timer_per_xact_end, OP_COMMIT, stats, data,elem_size);
update_op_lat_stats(&timer_start_commit, &timer_per_xact_end, OP_COMMIT, stats, block, elem_size, is_memory_allocated);
update_op_lat_stats(&timer_per_xact_start, &timer_per_xact_end, OP_TRANSACTION, stats, block, elem_size, is_memory_allocated);
} else {
/* error */
if (rc == FDB_ERROR_CONFLICT) {
@ -658,7 +658,7 @@ retryTxn:
}
int run_workload(FDBTransaction* transaction, mako_args_t* args, int thread_tps, volatile double* throttle_factor,
int thread_iters, volatile int* signal, mako_stats_t* stats, int dotrace, uint64_t* data[], int* elem_size) {
int thread_iters, volatile int* signal, mako_stats_t* stats, int dotrace, lat_block_t* block[], int* elem_size, bool* is_memory_allocated) {
int xacts = 0;
int64_t total_xacts = 0;
int rc = 0;
@ -742,7 +742,7 @@ int run_workload(FDBTransaction* transaction, mako_args_t* args, int thread_tps,
}
} /* throttle or txntrace */
rc = run_one_transaction(transaction, args, stats, keystr, keystr2, valstr,data, elem_size);
rc = run_one_transaction(transaction, args, stats, keystr, keystr2, valstr, block, elem_size, is_memory_allocated);
if (rc) {
/* FIXME: run_one_transaction should return something meaningful */
fprintf(annoyme, "ERROR: run_one_transaction failed (%d)\n", rc);
@ -770,15 +770,10 @@ int run_workload(FDBTransaction* transaction, mako_args_t* args, int thread_tps,
return rc;
}
void getFileName(char filename[], int worker_id, int thread_id, int op){
char str1[10],str2[10];
sprintf(str1,"%d",worker_id+1);
sprintf(str2,"%d",thread_id+1);
strcat(filename,"/dev/shm/tempDataFile/");
void get_file_name(char filename[], int worker_id, int thread_id, int op){
char str1[256];
sprintf(str1,"/%d_%d_",worker_id+1,thread_id+1);
strcat(filename,str1);
strcat(filename,"_");
strcat(filename,str2);
strcat(filename,"_");
switch (op) {
case OP_GETREADVERSION:
strcat(filename, "GRV");
@ -817,6 +812,9 @@ void getFileName(char filename[], int worker_id, int thread_id, int op){
strcat(filename, "SETCLRRANGE");
break;
case OP_COMMIT:
strcat(filename, "COMMIT");
break;
case OP_TRANSACTION:
strcat(filename, "TPS");
break;
}
@ -834,7 +832,7 @@ void* worker_thread(void* thread_args) {
int thread_tps = 0;
int thread_iters = 0;
int op;
int i;
int i, size;
int dotrace = (worker_id == 0 && thread_id == 0 && args->txntrace) ? args->txntrace : 0;
volatile int* signal = &((thread_args_t*)thread_args)->process->shm->signal;
volatile double* throttle_factor = &((thread_args_t*)thread_args)->process->shm->throttle_factor;
@ -843,8 +841,10 @@ void* worker_thread(void* thread_args) {
mako_stats_t* stats = (void*)((thread_args_t*)thread_args)->process->shm + sizeof(mako_shmhdr_t) /* skip header */
+ (sizeof(mako_stats_t) * (worker_id * args->num_threads + thread_id));
uint64_t** data = &((thread_args_t*)thread_args)->data[0];
lat_block_t** block = &((thread_args_t*)thread_args)->block[0];
int* elem_size = &((thread_args_t*)thread_args)->elem_size[0];
pid_t* parent_id = &((thread_args_t*)thread_args)->process->parent_id;
bool* is_memory_allocated = &((thread_args_t*)thread_args)->is_memory_allocated[0];
/* init latency */
for (op = 0; op < MAX_OP; op++) {
@ -886,7 +886,7 @@ void* worker_thread(void* thread_args) {
/* build/popualte */
else if (args->mode == MODE_BUILD) {
rc = populate(transaction, args, worker_id, thread_id, thread_tps, stats);
rc = populate(transaction, args, worker_id, thread_id, thread_tps, stats, block, elem_size, is_memory_allocated);
if (rc < 0) {
fprintf(stderr, "ERROR: populate failed\n");
}
@ -894,33 +894,57 @@ void* worker_thread(void* thread_args) {
/* run the workload */
else if (args->mode == MODE_RUN) {
rc = run_workload(transaction, args, thread_tps, throttle_factor, thread_iters, signal, stats, dotrace, data, elem_size);
rc = run_workload(transaction, args, thread_tps, throttle_factor, thread_iters, signal, stats, dotrace, block, elem_size, is_memory_allocated);
if (rc < 0) {
fprintf(stderr, "ERROR: run_workload failed\n");
}
rc = mkdir("/dev/shm/tempDataFile", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
}
if(args->mode == MODE_BUILD || args->mode == MODE_RUN) {
char str2[1000];
sprintf(str2, "/dev/shm/tempDataFile%d", *parent_id);
rc = mkdir(str2, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
for (op = 0; op < MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op==OP_COMMIT) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op==OP_COMMIT || op==OP_TRANSACTION) {
FILE* fp;
char fileName[NAME_MAX] = {'\0'};
getFileName(fileName, worker_id, thread_id, op);
fp = fopen(fileName,"w");
int size = stats->latency_samples[op];
for(i=0; i<size; i++){
fwrite(&data[op][i], sizeof(uint64_t),1,fp);
char file_name[NAME_MAX] = {'\0'};
strcat(file_name,str2);
get_file_name(file_name, worker_id, thread_id, op);
fp = fopen(file_name,"w");
lat_block_t* temp_block = block[op];
if(is_memory_allocated[op]){
size = stats->latency_samples[op]/LAT_BLOCK_SIZE;
for(i=0; i<size; i++){
fwrite(&temp_block->data, sizeof(uint64_t)*LAT_BLOCK_SIZE,1,fp);
temp_block = temp_block->next_block;
}
size = stats->latency_samples[op]%LAT_BLOCK_SIZE;
if(size != 0) fwrite(&temp_block->data, sizeof(uint64_t)*size,1,fp);
}
else {
while(temp_block){
fwrite(&temp_block->data, sizeof(uint64_t)*LAT_BLOCK_SIZE,1,fp);
temp_block = temp_block->next_block;
}
}
fclose(fp);
}
}
__sync_fetch_and_add(stopcount, 1);
}
__sync_fetch_and_add(stopcount, 1);
/* fall through */
failExit:
for(op=0; i<MAX_OP; i++) {
if(args->txnspec.ops[op][OP_COUNT] > 0 || op==OP_COMMIT) {
if(stats->latency_samples[op]) free(data[op]);
for(op=0; op<MAX_OP; op++) {
lat_block_t* curr = block[op];
lat_block_t* prev = NULL;
size = elem_size[op]/LAT_BLOCK_SIZE;
while(size--){
prev = curr;
curr = curr->next_block;
free(prev);
}
}
fdb_transaction_destroy(transaction);
@ -928,7 +952,7 @@ failExit:
}
/* mako worker process */
int worker_process_main(mako_args_t* args, int worker_id, mako_shmhdr_t* shm) {
int worker_process_main(mako_args_t* args, int worker_id, mako_shmhdr_t* shm, pid_t* pid_main) {
int i;
pthread_t network_thread; /* handle for thread which invoked fdb_run_network() */
pthread_t* worker_threads;
@ -941,6 +965,7 @@ int worker_process_main(mako_args_t* args, int worker_id, mako_shmhdr_t* shm) {
fdb_error_t err;
process.worker_id = worker_id;
process.parent_id = *pid_main;
process.args = args;
process.shm = (mako_shmhdr_t*)shm;
@ -1047,8 +1072,10 @@ int worker_process_main(mako_args_t* args, int worker_id, mako_shmhdr_t* shm) {
for (i = 0; i < args->num_threads; i++) {
thread_args[i].thread_id = i;
for(int j=0;j<MAX_OP;j++){
thread_args[i].data[j] = (uint64_t*)malloc(sizeof(uint64_t)*INIT_STORE);
thread_args[i].elem_size[j] = 0;
thread_args[i].block[j] = (lat_block_t*)malloc(sizeof(lat_block_t));
thread_args[i].block[j]->next_block = NULL;
thread_args[i].elem_size[j] = LAT_BLOCK_SIZE;
thread_args[i].is_memory_allocated[j] = true;
}
thread_args[i].process = &process;
rc = pthread_create(&worker_threads[i], NULL, worker_thread, (void*)&thread_args[i]);
@ -1536,7 +1563,7 @@ void print_stats(mako_args_t* args, mako_stats_t* stats, struct timespec* now, s
errors_total_prev[op] = errors_total[op];
}
}
/* TPS */
/* COMMITS */
printf("%" STR(STATS_FIELD_WIDTH) ".2f ", (totalxacts - totalxacts_prev) * 1000000000.0 / durationns);
totalxacts_prev = totalxacts;
@ -1557,7 +1584,7 @@ void print_stats(mako_args_t* args, mako_stats_t* stats, struct timespec* now, s
}
void print_stats_header(mako_args_t* args) {
void print_stats_header(mako_args_t* args, bool show_TPS) {
int op;
int i;
@ -1602,10 +1629,14 @@ void print_stats_header(mako_args_t* args) {
case OP_SETCLEARRANGE:
printf("%" STR(STATS_FIELD_WIDTH) "s ", "SETCLRRANGE");
break;
case OP_COMMIT:
printf("%" STR(STATS_FIELD_WIDTH) "s ", "COMMIT");
break;
}
}
}
printf("%" STR(STATS_FIELD_WIDTH) "s ", "TPS");
printf("%" STR(STATS_FIELD_WIDTH) "s ", "COMMITS");
if(show_TPS) printf("%" STR(STATS_FIELD_WIDTH) "s ", "TPS");
printf("%" STR(STATS_FIELD_WIDTH) "s\n", "Conflicts/s");
for (i = 0; i < STATS_TITLE_WIDTH; i++) printf("=");
@ -1616,15 +1647,21 @@ void print_stats_header(mako_args_t* args) {
printf(" ");
}
}
/* TPS */
/* COMMITS */
for (i = 0; i < STATS_FIELD_WIDTH; i++) printf("=");
printf(" ");
/* TPS */
if(show_TPS) {
for (i = 0; i < STATS_FIELD_WIDTH; i++) printf("=");
printf(" ");
}
/* Conflicts */
for (i = 0; i < STATS_FIELD_WIDTH; i++) printf("=");
printf("\n");
}
void print_report(mako_args_t* args, mako_stats_t* stats, struct timespec* timer_now, struct timespec* timer_start) {
void print_report(mako_args_t* args, mako_stats_t* stats, struct timespec* timer_now, struct timespec* timer_start, pid_t* pid_main) {
int i, j, k, op, index;
uint64_t totalxacts = 0;
uint64_t conflicts = 0;
@ -1652,7 +1689,7 @@ void print_report(mako_args_t* args, mako_stats_t* stats, struct timespec* timer
totalxacts += stats[idx].xacts;
conflicts += stats[idx].conflicts;
for (op = 0; op < MAX_OP; op++) {
if ((args->txnspec.ops[op][OP_COUNT] > 0) || (op == OP_COMMIT)) {
if ((args->txnspec.ops[op][OP_COUNT] > 0) || (op == OP_COMMIT) || (op == OP_TRANSACTION)) {
totalerrors += stats[idx].errors[op];
ops_total[op] += stats[idx].ops[op];
errors_total[op] += stats[idx].errors[op];
@ -1698,33 +1735,54 @@ void print_report(mako_args_t* args, mako_stats_t* stats, struct timespec* timer
printf("Overall TPS: %8lld\n\n", totalxacts * 1000000000 / durationns);
/* per-op stats */
print_stats_header(args);
print_stats_header(args, true);
/* OPS */
printf("%-" STR(STATS_TITLE_WIDTH) "s ", "Total OPS");
for (op = 0; op < MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0 && op != OP_COMMIT) {
if (args->txnspec.ops[op][OP_COUNT] > 0 && op != OP_COMMIT && op!= OP_TRANSACTION) {
printf("%" STR(STATS_FIELD_WIDTH) "lld ", ops_total[op]);
}
}
/* COMMITS */
printf("%" STR(STATS_FIELD_WIDTH) ".2f ", totalxacts * 1000000000.0 / durationns);
/* TPS */
printf("%" STR(STATS_FIELD_WIDTH) ".2f ", totalxacts * 1000000000.0 / durationns);
/* Conflicts */
printf("%" STR(STATS_FIELD_WIDTH) ".2f\n", conflicts * 1000000000.0 / durationns);
/* Errors */
printf("%-" STR(STATS_TITLE_WIDTH) "s ", "Errors");
for (op = 0; op < MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0 && op != OP_COMMIT) {
if (args->txnspec.ops[op][OP_COUNT] > 0 && op != OP_COMMIT && op!= OP_TRANSACTION) {
printf("%" STR(STATS_FIELD_WIDTH) "lld ", errors_total[op]);
}
}
printf("\n\n");
printf("%s\n", "Latency (us)");
for (i = 0; i < STATS_TITLE_WIDTH; i++) printf("=");
printf("\n");
/* Total Samples */
printf("%-" STR(STATS_TITLE_WIDTH) "s ", "Samples");
for (op = 0; op < MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op == OP_COMMIT || op == OP_TRANSACTION) {
if (lat_total[op]) {
printf("%" STR(STATS_FIELD_WIDTH) "lld ", lat_samples[op]);
} else {
printf("%" STR(STATS_FIELD_WIDTH) "s ", "N/A");
}
}
}
printf("\n");
/* Min Latency */
printf("%-" STR(STATS_TITLE_WIDTH) "s ", "Lat Min (us)");
printf("%-" STR(STATS_TITLE_WIDTH) "s ", "Min");
for (op = 0; op < MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op == OP_COMMIT) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op == OP_COMMIT || op == OP_TRANSACTION) {
if (lat_min[op] == -1) {
printf("%" STR(STATS_FIELD_WIDTH) "s ", "N/A");
} else {
@ -1735,9 +1793,9 @@ void print_report(mako_args_t* args, mako_stats_t* stats, struct timespec* timer
printf("\n");
/* Avg Latency */
printf("%-" STR(STATS_TITLE_WIDTH) "s ", "Lat Avg (us)");
printf("%-" STR(STATS_TITLE_WIDTH) "s ", "Avg");
for (op = 0; op < MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op == OP_COMMIT) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op == OP_COMMIT || op == OP_TRANSACTION) {
if (lat_total[op]) {
printf("%" STR(STATS_FIELD_WIDTH) "lld ", lat_total[op] / lat_samples[op]);
} else {
@ -1748,9 +1806,9 @@ void print_report(mako_args_t* args, mako_stats_t* stats, struct timespec* timer
printf("\n");
/* Max Latency */
printf("%-" STR(STATS_TITLE_WIDTH) "s ", "Lat Max (us)");
printf("%-" STR(STATS_TITLE_WIDTH) "s ", "Max");
for (op = 0; op < MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op == OP_COMMIT) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op == OP_COMMIT || op == OP_TRANSACTION) {
if (lat_max[op] == 0) {
printf("%" STR(STATS_FIELD_WIDTH) "s ", "N/A");
} else {
@ -1760,33 +1818,25 @@ void print_report(mako_args_t* args, mako_stats_t* stats, struct timespec* timer
}
printf("\n");
/* Total Samples */
printf("%-" STR(STATS_TITLE_WIDTH) "s ", "Total Sample");
for (op = 0; op < MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op == OP_COMMIT) {
if (lat_total[op]) {
printf("%" STR(STATS_FIELD_WIDTH) "lld ", lat_samples[op]);
} else {
printf("%" STR(STATS_FIELD_WIDTH) "s ", "N/A");
}
}
}
printf("\n");
uint64_t* dataPoints[MAX_OP];
uint64_t median;
int point_99pct;
int point_99_9pct, point_99pct, point_95pct;
/* Median Latency */
printf("%-" STR(STATS_TITLE_WIDTH) "s ", "Median");
int num_points[MAX_OP] = {0};
for (op = 0; op < MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op==OP_COMMIT) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op==OP_COMMIT || op==OP_TRANSACTION) {
if(lat_total[op]){
dataPoints[op] = (uint64_t*)malloc(sizeof(uint64_t)*lat_samples[op]);
k = 0;
for (i = 0; i < args->num_processes; i++) {
for (j = 0; j < args->num_threads; j++) {
char fileName[NAME_MAX] = {'\0'};
getFileName(fileName, i, j, op);
FILE* f = fopen(fileName,"r");
char file_name[NAME_MAX] = {'\0'};
sprintf(file_name, "/dev/shm/tempDataFile%d", *pid_main);
get_file_name(file_name, i, j, op);
FILE* f = fopen(file_name,"r");
fseek(f,0,SEEK_END);
int numPoints = ftell(f)/sizeof(uint64_t);
fseek(f,0,0);
@ -1798,12 +1848,13 @@ void print_report(mako_args_t* args, mako_stats_t* stats, struct timespec* timer
fclose(f);
}
}
radix_sort(dataPoints[op],lat_samples[op]);
if(lat_samples[op] & 1){
median = dataPoints[op][lat_samples[op]/2];
num_points[op] = k;
radix_sort(dataPoints[op],num_points[op]);
if(num_points[op] & 1){
median = dataPoints[op][num_points[op]/2];
}
else{
median = (dataPoints[op][lat_samples[op]/2]+dataPoints[op][lat_samples[op]/2-1])>>1;
median = (dataPoints[op][num_points[op]/2]+dataPoints[op][num_points[op]/2-1])>>1;
}
printf("%" STR(STATS_FIELD_WIDTH) "lld ", median);
}
@ -1814,11 +1865,27 @@ void print_report(mako_args_t* args, mako_stats_t* stats, struct timespec* timer
}
printf("\n");
printf("%-" STR(STATS_TITLE_WIDTH) "s ", "99.9 pctile");
/* 95%ile Latency */
printf("%-" STR(STATS_TITLE_WIDTH) "s ", "95.0 pctile");
for (op = 0; op < MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op==OP_COMMIT) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op==OP_COMMIT || op==OP_TRANSACTION) {
if(lat_total[op]){
point_99pct = ((float)(lat_samples[op])*0.999)-1;
point_95pct = ((float)(num_points[op])*0.95)-1;
printf("%" STR(STATS_FIELD_WIDTH) "lld ", dataPoints[op][point_95pct]);
}
else{
printf("%" STR(STATS_FIELD_WIDTH) "s ", "N/A");
}
}
}
printf("\n");
/* 99%ile Latency */
printf("%-" STR(STATS_TITLE_WIDTH) "s ", "99.0 pctile");
for (op = 0; op < MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op==OP_COMMIT || op==OP_TRANSACTION) {
if(lat_total[op]){
point_99pct = ((float)(num_points[op])*0.99)-1;
printf("%" STR(STATS_FIELD_WIDTH) "lld ", dataPoints[op][point_99pct]);
}
else{
@ -1828,15 +1895,34 @@ void print_report(mako_args_t* args, mako_stats_t* stats, struct timespec* timer
}
printf("\n");
for(op=0; i<MAX_OP; i++) {
if(args->txnspec.ops[op][OP_COUNT] > 0 || op==OP_COMMIT) {
/* 99.9%ile Latency */
printf("%-" STR(STATS_TITLE_WIDTH) "s ", "99.9 pctile");
for (op = 0; op < MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op==OP_COMMIT || op==OP_TRANSACTION) {
if(lat_total[op]){
point_99_9pct = ((float)(num_points[op])*0.999)-1;
printf("%" STR(STATS_FIELD_WIDTH) "lld ", dataPoints[op][point_99_9pct]);
}
else{
printf("%" STR(STATS_FIELD_WIDTH) "s ", "N/A");
}
}
}
printf("\n");
char command_remove[NAME_MAX] = {'\0'};
sprintf(command_remove, "rm -rf /dev/shm/tempDataFile%d", *pid_main);
system(command_remove);
for(op=0; op<MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op==OP_COMMIT || op==OP_TRANSACTION) {
if(lat_total[op]) free(dataPoints[op]);
}
}
}
int stats_process_main(mako_args_t* args, mako_stats_t* stats, volatile double* throttle_factor, volatile int* signal, volatile int* stopcount) {
int stats_process_main(mako_args_t* args, mako_stats_t* stats, volatile double* throttle_factor, volatile int* signal, volatile int* stopcount, pid_t* pid_main) {
struct timespec timer_start, timer_prev, timer_now;
double sin_factor;
@ -1845,7 +1931,7 @@ int stats_process_main(mako_args_t* args, mako_stats_t* stats, volatile double*
usleep(10000); /* 10ms */
}
if (args->verbose >= VERBOSE_DEFAULT) print_stats_header(args);
if (args->verbose >= VERBOSE_DEFAULT) print_stats_header(args, false);
clock_gettime(CLOCK_MONOTONIC_COARSE, &timer_start);
timer_prev.tv_sec = timer_start.tv_sec;
@ -1899,7 +1985,7 @@ int stats_process_main(mako_args_t* args, mako_stats_t* stats, volatile double*
while (*stopcount < args->num_threads*args->num_processes) {
usleep(10000); /* 10ms */
}
print_report(args, stats, &timer_now, &timer_start);
print_report(args, stats, &timer_now, &timer_start, pid_main);
}
return 0;
@ -1919,6 +2005,7 @@ int main(int argc, char* argv[]) {
char shmpath[NAME_MAX];
size_t shmsize;
mako_stats_t* stats;
pid_t pid_main;
rc = init_args(&args);
if (rc < 0) {
@ -1946,8 +2033,9 @@ int main(int argc, char* argv[]) {
}
}
pid_main = getpid();
/* create the shared memory for stats */
sprintf(shmpath, "mako%d", getpid());
sprintf(shmpath, "mako%d", pid_main);
shmfd = shm_open(shmpath, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
if (shmfd < 0) {
fprintf(stderr, "ERROR: shm_open failed\n");
@ -2019,7 +2107,7 @@ int main(int argc, char* argv[]) {
if (proc_type == proc_worker) {
/* worker process */
worker_process_main(&args, worker_id, shm);
worker_process_main(&args, worker_id, shm, &pid_main);
/* worker can exit here */
exit(0);
} else if (proc_type == proc_stats) {
@ -2028,12 +2116,11 @@ int main(int argc, char* argv[]) {
/* no stats needed for clean mode */
exit(0);
}
stats_process_main(&args, stats, &shm->throttle_factor, &shm->signal, &shm->stopcount);
stats_process_main(&args, stats, &shm->throttle_factor, &shm->signal, &shm->stopcount, &pid_main);
exit(0);
}
/* master */
/* wait for everyone to be ready */
while (shm->readycount < (args.num_processes * args.num_threads)) {
usleep(1000);

View File

@ -9,6 +9,7 @@
#include <foundationdb/fdb_c.h>
#include <pthread.h>
#include <sys/types.h>
#include <stdbool.h>
#if defined(__linux__)
#include <linux/limits.h>
#elif defined(__APPLE__)
@ -32,7 +33,7 @@
#define FDB_ERROR_ABORT -2
#define FDB_ERROR_CONFLICT -3
#define INIT_STORE 0
#define LAT_BLOCK_SIZE 511 /* size of each block to get detailed latency for each operation */
/* transaction specification */
enum Operations {
@ -49,6 +50,7 @@ enum Operations {
OP_CLEARRANGE,
OP_SETCLEARRANGE,
OP_COMMIT,
OP_TRANSACTION, /* pseudo-operation */
MAX_OP /* must be the last item */
};
@ -134,6 +136,11 @@ typedef struct {
int stopcount;
} mako_shmhdr_t;
typedef struct {
uint64_t data[LAT_BLOCK_SIZE];
void* next_block;
} lat_block_t;
typedef struct {
uint64_t xacts;
uint64_t conflicts;
@ -148,6 +155,7 @@ typedef struct {
/* per-process information */
typedef struct {
int worker_id;
pid_t parent_id;
FDBDatabase* database;
mako_args_t* args;
mako_shmhdr_t* shm;
@ -157,7 +165,8 @@ typedef struct {
typedef struct {
int thread_id;
int elem_size[MAX_OP];
uint64_t* data[MAX_OP];
bool is_memory_allocated[MAX_OP];
lat_block_t* block[MAX_OP];
process_info_t* process;
} thread_args_t;

View File

@ -81,7 +81,7 @@ void genkey(char* str, int num, int rows, int len) {
}
uint64_t getMax(uint64_t arr[], int n)
uint64_t get_max(uint64_t arr[], int n)
{
uint64_t mx = arr[0];
for (int i = 1; i < n; i++)
@ -90,7 +90,7 @@ uint64_t getMax(uint64_t arr[], int n)
return mx;
}
void countSort(uint64_t arr[], int n, uint64_t exp)
void bucket_data(uint64_t arr[], int n, uint64_t exp)
{
uint64_t output[n];
int i, count[10] = {0};
@ -116,7 +116,7 @@ void countSort(uint64_t arr[], int n, uint64_t exp)
// Radix Sort
void radix_sort(uint64_t arr[], int n) {
// Find the maximum number to know number of digits
uint64_t m = getMax(arr, n);
uint64_t m = get_max(arr, n);
for (uint64_t exp = 1; m/exp > 0; exp *= 10)
countSort(arr, n, exp);
bucket_data(arr, n, exp);
}

View File

@ -3,6 +3,7 @@
#pragma once
#include <stdint.h>
#include <ftw.h>
/* uniform-distribution random */
/* return a uniform random number between low and high, both inclusive */
@ -54,7 +55,8 @@ void genkey(char* str, int num, int rows, int len);
// The main function to that sorts arr[] of size n using
// Radix Sort
void radix_sort(uint64_t arr[], int n);
void countSort(uint64_t arr[], int n, uint64_t exp) ;
uint64_t getMax(uint64_t arr[], int n) ;
void bucket_data(uint64_t arr[], int n, uint64_t exp) ;
uint64_t get_max(uint64_t arr[], int n) ;
#endif /* UTILS_H */