changed temporary output location to /tmp

if the memory is full, then the data needs to be flushed to disk memory

Testing
This commit is contained in:
Bhawani Shankar Sharma 2020-06-26 15:15:42 +00:00
parent 9f34b957e5
commit 45cc2cacad
3 changed files with 87 additions and 78 deletions

View File

@ -518,7 +518,7 @@ retryTxn:
rc = run_op_insert(transaction, keystr, valstr);
if (rc == FDB_SUCCESS) {
/* commit insert so mutation goes to storage */
// timer_1
/* to measure commit latency */
clock_gettime(CLOCK_MONOTONIC, &timer_start_commit);
rc = commit_transaction(transaction);
if (rc == FDB_SUCCESS) {
@ -789,7 +789,7 @@ int run_workload(FDBTransaction* transaction, mako_args_t* args, int thread_tps,
return rc;
}
void get_file_name(char filename[], int worker_id, int thread_id, int op) {
void get_stats_file_name(char filename[], int worker_id, int thread_id, int op) {
char str1[256];
sprintf(str1, "/%d_%d_", worker_id + 1, thread_id + 1);
strcat(filename, str1);
@ -834,7 +834,7 @@ void get_file_name(char filename[], int worker_id, int thread_id, int op) {
strcat(filename, "COMMIT");
break;
case OP_TRANSACTION:
strcat(filename, "TPS");
strcat(filename, "TRANSACTION");
break;
}
}
@ -926,14 +926,14 @@ void* worker_thread(void* thread_args) {
if (args->mode == MODE_BUILD || args->mode == MODE_RUN) {
char str2[1000];
sprintf(str2, "/dev/shm/tempDataFile%d", *parent_id);
sprintf(str2, "%s%d", TEMP_DATA_STORE, *parent_id);
rc = mkdir(str2, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
for (op = 0; op < MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op == OP_COMMIT || op == OP_TRANSACTION) {
FILE* fp;
char file_name[NAME_MAX] = { '\0' };
strcat(file_name, str2);
get_file_name(file_name, worker_id, thread_id, op);
get_stats_file_name(file_name, worker_id, thread_id, op);
fp = fopen(file_name, "w");
lat_block_t* temp_block = ((thread_args_t*)thread_args)->block[op];
if (is_memory_allocated[op]) {
@ -960,7 +960,6 @@ void* worker_thread(void* thread_args) {
failExit:
for (op = 0; op < MAX_OP; op++) {
lat_block_t* curr = ((thread_args_t*)thread_args)->block[op];
;
lat_block_t* prev = NULL;
size = elem_size[op] / LAT_BLOCK_SIZE;
while (size--) {
@ -1093,15 +1092,17 @@ int worker_process_main(mako_args_t* args, int worker_id, mako_shmhdr_t* shm, pi
for (i = 0; i < args->num_threads; i++) {
thread_args[i].thread_id = i;
for (int j = 0; j < MAX_OP; j++) {
thread_args[i].block[j] = (lat_block_t*)malloc(sizeof(lat_block_t));
if (thread_args[i].block[j] == NULL) {
thread_args[i].is_memory_allocated[j] = false;
thread_args[i].elem_size[j] = 0;
} else {
thread_args[i].is_memory_allocated[j] = true;
thread_args[i].block[j]->next_block = NULL;
thread_args[i].elem_size[j] = LAT_BLOCK_SIZE;
for (int op = 0; op < MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op == OP_TRANSACTION || op == OP_COMMIT) {
thread_args[i].block[op] = (lat_block_t*)malloc(sizeof(lat_block_t));
if (thread_args[i].block[op] == NULL) {
thread_args[i].is_memory_allocated[op] = false;
thread_args[i].elem_size[op] = 0;
} else {
thread_args[i].is_memory_allocated[op] = true;
thread_args[i].block[op]->next_block = NULL;
thread_args[i].elem_size[op] = LAT_BLOCK_SIZE;
}
}
}
thread_args[i].process = &process;
@ -1609,12 +1610,13 @@ void print_stats(mako_args_t* args, mako_stats_t* stats, struct timespec* now, s
return;
}
void print_stats_header(mako_args_t* args, bool show_commit) {
void print_stats_header(mako_args_t* args, bool show_commit, bool is_first_header_empty, bool show_op_stats) {
int op;
int i;
/* header */
for (i = 0; i <= STATS_TITLE_WIDTH; i++) printf(" ");
if (is_first_header_empty)
for (i = 0; i <= STATS_TITLE_WIDTH; i++) printf(" ");
for (op = 0; op < MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0) {
switch (op) {
@ -1659,8 +1661,12 @@ void print_stats_header(mako_args_t* args, bool show_commit) {
}
if (show_commit) printf("%" STR(STATS_FIELD_WIDTH) "s ", "COMMIT");
printf("%" STR(STATS_FIELD_WIDTH) "s ", "TPS");
printf("%" STR(STATS_FIELD_WIDTH) "s\n", "Conflicts/s");
if (show_op_stats) {
printf("%" STR(STATS_FIELD_WIDTH) "s\n", "TRANSACTION");
} else {
printf("%" STR(STATS_FIELD_WIDTH) "s ", "TPS");
printf("%" STR(STATS_FIELD_WIDTH) "s\n", "Conflicts/s");
}
for (i = 0; i < STATS_TITLE_WIDTH; i++) printf("=");
printf(" ");
@ -1672,17 +1678,23 @@ void print_stats_header(mako_args_t* args, bool show_commit) {
}
/* COMMIT */
for (i = 0; i < STATS_FIELD_WIDTH; i++) printf("=");
printf(" ");
/* TPS */
if (show_commit) {
for (i = 0; i < STATS_FIELD_WIDTH; i++) printf("=");
printf(" ");
}
/* Conflicts */
for (i = 0; i < STATS_FIELD_WIDTH; i++) printf("=");
if (show_op_stats) {
/* TRANSACTION */
for (i = 0; i < STATS_FIELD_WIDTH; i++) printf("=");
printf(" ");
} else {
/* TPS */
for (i = 0; i < STATS_FIELD_WIDTH; i++) printf("=");
printf(" ");
/* Conflicts */
for (i = 0; i < STATS_FIELD_WIDTH; i++) printf("=");
}
printf("\n");
}
@ -1761,7 +1773,7 @@ void print_report(mako_args_t* args, mako_stats_t* stats, struct timespec* timer
printf("Overall TPS: %8lld\n\n", totalxacts * 1000000000 / durationns);
/* per-op stats */
print_stats_header(args, true);
print_stats_header(args, true, true, false);
/* OPS */
printf("%-" STR(STATS_TITLE_WIDTH) "s ", "Total OPS");
@ -1786,9 +1798,8 @@ void print_report(mako_args_t* args, mako_stats_t* stats, struct timespec* timer
}
printf("\n\n");
printf("%s\n", "Latency (us)");
for (i = 0; i < STATS_TITLE_WIDTH; i++) printf("=");
printf("\n");
printf("%s", "Latency (us)");
print_stats_header(args, true, false, true);
/* Total Samples */
printf("%-" STR(STATS_TITLE_WIDTH) "s ", "Samples");
@ -1857,8 +1868,8 @@ void print_report(mako_args_t* args, mako_stats_t* stats, struct timespec* timer
for (i = 0; i < args->num_processes; i++) {
for (j = 0; j < args->num_threads; j++) {
char file_name[NAME_MAX] = { '\0' };
sprintf(file_name, "/dev/shm/tempDataFile%d", *pid_main);
get_file_name(file_name, i, j, op);
sprintf(file_name, "%s%d", TEMP_DATA_STORE, *pid_main);
get_stats_file_name(file_name, i, j, op);
FILE* f = fopen(file_name, "r");
fseek(f, 0, SEEK_END);
int numPoints = ftell(f) / sizeof(uint64_t);
@ -1929,7 +1940,7 @@ void print_report(mako_args_t* args, mako_stats_t* stats, struct timespec* timer
printf("\n");
char command_remove[NAME_MAX] = { '\0' };
sprintf(command_remove, "rm -rf /dev/shm/tempDataFile%d", *pid_main);
sprintf(command_remove, "rm -rf %s%d", TEMP_DATA_STORE, *pid_main);
system(command_remove);
for (op = 0; op < MAX_OP; op++) {
@ -1949,7 +1960,7 @@ int stats_process_main(mako_args_t* args, mako_stats_t* stats, volatile double*
usleep(10000); /* 10ms */
}
if (args->verbose >= VERBOSE_DEFAULT) print_stats_header(args, false);
if (args->verbose >= VERBOSE_DEFAULT) print_stats_header(args, false, true, false);
clock_gettime(CLOCK_MONOTONIC_COARSE, &timer_start);
timer_prev.tv_sec = timer_start.tv_sec;

View File

@ -9,7 +9,7 @@
#include <foundationdb/fdb_c.h>
#include <pthread.h>
#include <sys/types.h>
#include <stdbool.h>
#include <stdbool.h>
#if defined(__linux__)
#include <linux/limits.h>
#elif defined(__APPLE__)
@ -33,7 +33,7 @@
#define FDB_ERROR_ABORT -2
#define FDB_ERROR_CONFLICT -3
#define LAT_BLOCK_SIZE 511 /* size of each block to get detailed latency for each operation */
#define LAT_BLOCK_SIZE 511 /* size of each block to get detailed latency for each operation */
/* transaction specification */
enum Operations {
@ -50,7 +50,7 @@ enum Operations {
OP_CLEARRANGE,
OP_SETCLEARRANGE,
OP_COMMIT,
OP_TRANSACTION, /* pseudo-operation */
OP_TRANSACTION, /* pseudo-operation - cumulative time for the operation + commit */
MAX_OP /* must be the last item */
};
@ -83,6 +83,8 @@ enum TPSChangeTypes { TPS_SIN, TPS_SQUARE, TPS_PULSE };
#define KEYPREFIX "mako"
#define KEYPREFIXLEN 4
#define TEMP_DATA_STORE "/tmp/makoTemp"
/* we set mako_txnspec_t and mako_args_t only once in the master process,
* and won't be touched by child processes.
*/
@ -136,9 +138,10 @@ typedef struct {
int stopcount;
} mako_shmhdr_t;
/* memory block allocated to each operation when collecting detailed latency */
typedef struct {
uint64_t data[LAT_BLOCK_SIZE];
void* next_block;
uint64_t data[LAT_BLOCK_SIZE];
void* next_block;
} lat_block_t;
typedef struct {
@ -164,8 +167,9 @@ typedef struct {
/* args for threads */
typedef struct {
int thread_id;
int elem_size[MAX_OP];
bool is_memory_allocated[MAX_OP];
int elem_size[MAX_OP]; /* stores the multiple of LAT_BLOCK_SIZE to check the memory allocation of each operation */
bool is_memory_allocated[MAX_OP]; /* flag specified for each operation, whether the memory was allocated to that
specific operation */
lat_block_t* block[MAX_OP];
process_info_t* process;
} thread_args_t;

View File

@ -4,8 +4,6 @@
#include <stdio.h>
#include <stdlib.h>
/* uniform-distribution random */
int urand(int low, int high) {
double r = rand() / (1.0 + RAND_MAX);
@ -80,43 +78,39 @@ void genkey(char* str, int num, int rows, int len) {
str[len - 1] = '\0';
}
uint64_t get_max(uint64_t arr[], int n)
{
uint64_t mx = arr[0];
for (int i = 1; i < n; i++)
if (arr[i] > mx)
mx = arr[i];
return mx;
uint64_t get_max(uint64_t arr[], int n) {
uint64_t mx = arr[0];
for (int i = 1; i < n; i++) {
if (arr[i] > mx) {
mx = arr[i];
}
}
return mx;
}
void bucket_data(uint64_t arr[], int n, uint64_t exp)
{
uint64_t output[n];
int i, count[10] = {0};
for (i = 0; i < n; i++)
count[ (arr[i]/exp)%10 ]++;
for (i = 1; i < 10; i++)
count[i] += count[i - 1];
for (i = n - 1; i >= 0; i--)
{
output[count[ (arr[i]/exp)%10 ] - 1] = arr[i];
count[ (arr[i]/exp)%10 ]--;
}
for (i = 0; i < n; i++)
arr[i] = output[i];
}
void bucket_data(uint64_t arr[], int n, uint64_t exp) {
uint64_t output[n];
int i, count[10] = { 0 };
// The main function to that sorts arr[] of size n using
// Radix Sort
void radix_sort(uint64_t arr[], int n) {
// Find the maximum number to know number of digits
uint64_t m = get_max(arr, n);
for (uint64_t exp = 1; m/exp > 0; exp *= 10)
bucket_data(arr, n, exp);
for (i = 0; i < n; i++) {
count[(arr[i] / exp) % 10]++;
}
for (i = 1; i < 10; i++) {
count[i] += count[i - 1];
}
for (i = n - 1; i >= 0; i--) {
output[count[(arr[i] / exp) % 10] - 1] = arr[i];
count[(arr[i] / exp) % 10]--;
}
for (i = 0; i < n; i++) {
arr[i] = output[i];
}
}
// The main function to that sorts arr[] of size n using
// Radix Sort
void radix_sort(uint64_t arr[], int n) {
// Find the maximum number to know number of digits
uint64_t m = get_max(arr, n);
for (uint64_t exp = 1; m / exp > 0; exp *= 10) bucket_data(arr, n, exp);
}