Merge pull request #5737 from sfc-gh-qhoang/qhoang-support-multithreaded-client-mako

Support multithreaded client mako
This commit is contained in:
Kao Makino 2021-10-20 09:34:49 -07:00 committed by GitHub
commit 4f0732b510
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 124 additions and 51 deletions

View File

@ -1065,7 +1065,8 @@ void* worker_thread(void* thread_args) {
int worker_id = ((thread_args_t*)thread_args)->process->worker_id;
int thread_id = ((thread_args_t*)thread_args)->thread_id;
mako_args_t* args = ((thread_args_t*)thread_args)->process->args;
FDBDatabase* database = ((thread_args_t*)thread_args)->process->database;
size_t database_index = ((thread_args_t*)thread_args)->database_index;
FDBDatabase* database = ((thread_args_t*)thread_args)->process->databases[database_index];
fdb_error_t err;
int rc;
FDBTransaction* transaction;
@ -1099,11 +1100,12 @@ void* worker_thread(void* thread_args) {
}
fprintf(debugme,
"DEBUG: worker_id:%d (%d) thread_id:%d (%d) (tid:%lld)\n",
"DEBUG: worker_id:%d (%d) thread_id:%d (%d) database_index:%d (tid:%lld)\n",
worker_id,
args->num_processes,
thread_id,
args->num_threads,
database_index,
(uint64_t)pthread_self());
if (args->tpsmax) {
@ -1231,6 +1233,7 @@ int worker_process_main(mako_args_t* args, int worker_id, mako_shmhdr_t* shm, pi
fprintf(debugme, "DEBUG: worker %d started\n", worker_id);
/* Everything starts from here */
err = fdb_select_api_version(args->api_version);
if (err) {
fprintf(stderr, "ERROR: Failed at %s:%d (%s)\n", __FILE__, __LINE__, fdb_get_error(err));
@ -1291,6 +1294,17 @@ int worker_process_main(mako_args_t* args, int worker_id, mako_shmhdr_t* shm, pi
}
}
if (args->client_threads_per_version > 0) {
err = fdb_network_set_option(
FDB_NET_OPTION_CLIENT_THREADS_PER_VERSION, (uint8_t*)&args->client_threads_per_version, sizeof(uint32_t));
if (err) {
fprintf(stderr,
"ERROR: fdb_network_set_option (FDB_NET_OPTION_CLIENT_THREADS_PER_VERSION) (%d): %s\n",
(uint8_t*)&args->client_threads_per_version,
fdb_get_error(err));
}
}
/* Network thread must be setup before doing anything */
fprintf(debugme, "DEBUG: fdb_setup_network\n");
err = fdb_setup_network();
@ -1328,11 +1342,16 @@ int worker_process_main(mako_args_t* args, int worker_id, mako_shmhdr_t* shm, pi
fdb_future_destroy(f);
#else /* >= 610 */
fdb_create_database(args->cluster_file, &process.database);
#endif
if (args->disable_ryw) {
fdb_database_set_option(process.database, FDB_DB_OPTION_SNAPSHOT_RYW_DISABLE, (uint8_t*)NULL, 0);
for (size_t i = 0; i < args->num_databases; i++) {
size_t cluster_index = args->num_fdb_clusters <= 1 ? 0 : i % args->num_fdb_clusters;
fdb_create_database(args->cluster_files[cluster_index], &process.databases[i]);
fprintf(debugme, "DEBUG: creating database at cluster %s\n", args->cluster_files[cluster_index]);
if (args->disable_ryw) {
fdb_database_set_option(process.databases[i], FDB_DB_OPTION_SNAPSHOT_RYW_DISABLE, (uint8_t*)NULL, 0);
}
}
#endif
fprintf(debugme, "DEBUG: creating %d worker threads\n", args->num_threads);
worker_threads = (pthread_t*)calloc(sizeof(pthread_t), args->num_threads);
if (!worker_threads) {
@ -1349,6 +1368,8 @@ int worker_process_main(mako_args_t* args, int worker_id, mako_shmhdr_t* shm, pi
for (i = 0; i < args->num_threads; i++) {
thread_args[i].thread_id = i;
thread_args[i].database_index = i % args->num_databases;
for (int op = 0; op < MAX_OP; op++) {
if (args->txnspec.ops[op][OP_COUNT] > 0 || op == OP_TRANSACTION || op == OP_COMMIT) {
thread_args[i].block[op] = (lat_block_t*)malloc(sizeof(lat_block_t));
@ -1388,7 +1409,10 @@ failExit:
free(thread_args);
/* clean up database and cluster */
fdb_database_destroy(process.database);
for (size_t i = 0; i < args->num_databases; i++) {
fdb_database_destroy(process.databases[i]);
}
#if FDB_API_VERSION < 610
fdb_cluster_destroy(cluster);
#endif
@ -1414,6 +1438,8 @@ int init_args(mako_args_t* args) {
if (!args)
return -1;
memset(args, 0, sizeof(mako_args_t)); /* zero-out everything */
args->num_fdb_clusters = 0;
args->num_databases = 1;
args->api_version = fdb_get_max_api_version();
args->json = 0;
args->num_processes = 1;
@ -1446,6 +1472,7 @@ int init_args(mako_args_t* args) {
for (i = 0; i < MAX_OP; i++) {
args->txnspec.ops[i][OP_COUNT] = 0;
}
args->client_threads_per_version = 0;
args->disable_ryw = 0;
return 0;
}
@ -1579,6 +1606,7 @@ void usage() {
printf("%-24s %s\n", "-v, --verbose", "Specify verbosity");
printf("%-24s %s\n", "-a, --api_version=API_VERSION", "Specify API_VERSION to use");
printf("%-24s %s\n", "-c, --cluster=FILE", "Specify FDB cluster file");
printf("%-24s %s\n", "-d, --num_databases=NUM_DATABASES", "Specify number of databases");
printf("%-24s %s\n", "-p, --procs=PROCS", "Specify number of worker processes");
printf("%-24s %s\n", "-t, --threads=THREADS", "Specify number of worker threads");
printf("%-24s %s\n", "-r, --rows=ROWS", "Specify number of records");
@ -1620,50 +1648,54 @@ int parse_args(int argc, char* argv[], mako_args_t* args) {
int c;
int idx;
while (1) {
const char* short_options = "a:c:p:t:r:s:i:x:v:m:hjz";
static struct option long_options[] = { /* name, has_arg, flag, val */
{ "api_version", required_argument, NULL, 'a' },
{ "cluster", required_argument, NULL, 'c' },
{ "procs", required_argument, NULL, 'p' },
{ "threads", required_argument, NULL, 't' },
{ "rows", required_argument, NULL, 'r' },
{ "seconds", required_argument, NULL, 's' },
{ "iteration", required_argument, NULL, 'i' },
{ "keylen", required_argument, NULL, ARG_KEYLEN },
{ "vallen", required_argument, NULL, ARG_VALLEN },
{ "transaction", required_argument, NULL, 'x' },
{ "tps", required_argument, NULL, ARG_TPS },
{ "tpsmax", required_argument, NULL, ARG_TPSMAX },
{ "tpsmin", required_argument, NULL, ARG_TPSMIN },
{ "tpsinterval", required_argument, NULL, ARG_TPSINTERVAL },
{ "tpschange", required_argument, NULL, ARG_TPSCHANGE },
{ "sampling", required_argument, NULL, ARG_SAMPLING },
{ "verbose", required_argument, NULL, 'v' },
{ "mode", required_argument, NULL, 'm' },
{ "knobs", required_argument, NULL, ARG_KNOBS },
{ "loggroup", required_argument, NULL, ARG_LOGGROUP },
{ "tracepath", required_argument, NULL, ARG_TRACEPATH },
{ "trace_format", required_argument, NULL, ARG_TRACEFORMAT },
{ "streaming", required_argument, NULL, ARG_STREAMING_MODE },
{ "txntrace", required_argument, NULL, ARG_TXNTRACE },
/* no args */
{ "help", no_argument, NULL, 'h' },
{ "json", no_argument, NULL, 'j' },
{ "zipf", no_argument, NULL, 'z' },
{ "commitget", no_argument, NULL, ARG_COMMITGET },
{ "flatbuffers", no_argument, NULL, ARG_FLATBUFFERS },
{ "prefix_padding", no_argument, NULL, ARG_PREFIXPADDING },
{ "trace", no_argument, NULL, ARG_TRACE },
{ "txntagging", required_argument, NULL, ARG_TXNTAGGING },
{ "txntagging_prefix", required_argument, NULL, ARG_TXNTAGGINGPREFIX },
{ "version", no_argument, NULL, ARG_VERSION },
{ "disable_ryw", no_argument, NULL, ARG_DISABLE_RYW },
{ NULL, 0, NULL, 0 }
const char* short_options = "a:c:d:p:t:r:s:i:x:v:m:hjz";
static struct option long_options[] = {
/* name, has_arg, flag, val */
{ "api_version", required_argument, NULL, 'a' },
{ "cluster", required_argument, NULL, 'c' },
{ "num_databases", optional_argument, NULL, 'd' },
{ "procs", required_argument, NULL, 'p' },
{ "threads", required_argument, NULL, 't' },
{ "rows", required_argument, NULL, 'r' },
{ "seconds", required_argument, NULL, 's' },
{ "iteration", required_argument, NULL, 'i' },
{ "keylen", required_argument, NULL, ARG_KEYLEN },
{ "vallen", required_argument, NULL, ARG_VALLEN },
{ "transaction", required_argument, NULL, 'x' },
{ "tps", required_argument, NULL, ARG_TPS },
{ "tpsmax", required_argument, NULL, ARG_TPSMAX },
{ "tpsmin", required_argument, NULL, ARG_TPSMIN },
{ "tpsinterval", required_argument, NULL, ARG_TPSINTERVAL },
{ "tpschange", required_argument, NULL, ARG_TPSCHANGE },
{ "sampling", required_argument, NULL, ARG_SAMPLING },
{ "verbose", required_argument, NULL, 'v' },
{ "mode", required_argument, NULL, 'm' },
{ "knobs", required_argument, NULL, ARG_KNOBS },
{ "loggroup", required_argument, NULL, ARG_LOGGROUP },
{ "tracepath", required_argument, NULL, ARG_TRACEPATH },
{ "trace_format", required_argument, NULL, ARG_TRACEFORMAT },
{ "streaming", required_argument, NULL, ARG_STREAMING_MODE },
{ "txntrace", required_argument, NULL, ARG_TXNTRACE },
/* no args */
{ "help", no_argument, NULL, 'h' },
{ "json", no_argument, NULL, 'j' },
{ "zipf", no_argument, NULL, 'z' },
{ "commitget", no_argument, NULL, ARG_COMMITGET },
{ "flatbuffers", no_argument, NULL, ARG_FLATBUFFERS },
{ "prefix_padding", no_argument, NULL, ARG_PREFIXPADDING },
{ "trace", no_argument, NULL, ARG_TRACE },
{ "txntagging", required_argument, NULL, ARG_TXNTAGGING },
{ "txntagging_prefix", required_argument, NULL, ARG_TXNTAGGINGPREFIX },
{ "version", no_argument, NULL, ARG_VERSION },
{ "client_threads_per_version", required_argument, NULL, ARG_CLIENT_THREADS_PER_VERSION },
{ "disable_ryw", no_argument, NULL, ARG_DISABLE_RYW },
{ NULL, 0, NULL, 0 }
};
idx = 0;
c = getopt_long(argc, argv, short_options, long_options, &idx);
if (c < 0)
if (c < 0) {
break;
}
switch (c) {
case '?':
case 'h':
@ -1672,8 +1704,17 @@ int parse_args(int argc, char* argv[], mako_args_t* args) {
case 'a':
args->api_version = atoi(optarg);
break;
case 'c':
strcpy(args->cluster_file, optarg);
case 'c': {
const char delim[] = ",";
char* cluster_file = strtok(optarg, delim);
while (cluster_file != NULL) {
strcpy(args->cluster_files[args->num_fdb_clusters++], cluster_file);
cluster_file = strtok(NULL, delim);
}
break;
}
case 'd':
args->num_databases = atoi(optarg);
break;
case 'p':
args->num_processes = atoi(optarg);
@ -1812,6 +1853,9 @@ int parse_args(int argc, char* argv[], mako_args_t* args) {
}
memcpy(args->txntagging_prefix, optarg, strlen(optarg));
break;
case ARG_CLIENT_THREADS_PER_VERSION:
args->client_threads_per_version = atoi(optarg);
break;
case ARG_DISABLE_RYW:
args->disable_ryw = 1;
break;
@ -1858,6 +1902,28 @@ int validate_args(mako_args_t* args) {
fprintf(stderr, "ERROR: --vallen must be a positive integer\n");
return -1;
}
if (args->num_fdb_clusters > NUM_CLUSTERS_MAX) {
fprintf(stderr, "ERROR: Mako is not supported to do work to more than %d clusters\n", NUM_CLUSTERS_MAX);
return -1;
}
if (args->num_databases > NUM_DATABASES_MAX) {
fprintf(stderr, "ERROR: Mako is not supported to do work to more than %d databases\n", NUM_DATABASES_MAX);
return -1;
}
if (args->num_databases < args->num_fdb_clusters) {
fprintf(stderr,
"ERROR: --num_databases (%d) must be >= number of clusters(%d)\n",
args->num_databases,
args->num_fdb_clusters);
return -1;
}
if (args->num_threads < args->num_databases) {
fprintf(stderr,
"ERROR: --threads (%d) must be >= number of databases (%d)\n",
args->num_threads,
args->num_databases);
return -1;
}
if (args->key_length < 4 /* "mako" */ + digits(args->rows)) {
fprintf(stderr,
"ERROR: --keylen must be larger than %d to store \"mako\" prefix "

View File

@ -81,6 +81,7 @@ enum Arguments {
ARG_TXNTAGGING,
ARG_TXNTAGGINGPREFIX,
ARG_STREAMING_MODE,
ARG_CLIENT_THREADS_PER_VERSION,
ARG_DISABLE_RYW
};
@ -103,6 +104,8 @@ typedef struct {
#define LOGGROUP_MAX 256
#define KNOB_MAX 256
#define TAGPREFIXLENGTH_MAX 8
#define NUM_CLUSTERS_MAX 3
#define NUM_DATABASES_MAX 10
/* benchmark parameters */
typedef struct {
@ -125,7 +128,9 @@ typedef struct {
int commit_get;
int verbose;
mako_txnspec_t txnspec;
char cluster_file[PATH_MAX];
char cluster_files[NUM_CLUSTERS_MAX][PATH_MAX];
int num_fdb_clusters;
int num_databases;
char log_group[LOGGROUP_MAX];
int prefixpadding;
int trace;
@ -137,6 +142,7 @@ typedef struct {
int txntagging;
char txntagging_prefix[TAGPREFIXLENGTH_MAX];
FDBStreamingMode streaming_mode;
uint32_t client_threads_per_version;
int disable_ryw;
} mako_args_t;
@ -173,14 +179,15 @@ typedef struct {
typedef struct {
int worker_id;
pid_t parent_id;
FDBDatabase* database;
mako_args_t* args;
mako_shmhdr_t* shm;
FDBDatabase* databases[NUM_DATABASES_MAX];
} process_info_t;
/* args for threads */
typedef struct {
int thread_id;
int database_index; // index of the database to do work to
int elem_size[MAX_OP]; /* stores the multiple of LAT_BLOCK_SIZE to check the memory allocation of each operation */
bool is_memory_allocated[MAX_OP]; /* flag specified for each operation, whether the memory was allocated to that
specific operation */