Merge branch 'master' of https://github.com/apple/foundationdb into clean-sim-test-data-loss
This commit is contained in:
commit
16ae2b76e5
|
@ -1065,7 +1065,8 @@ void* worker_thread(void* thread_args) {
|
|||
int worker_id = ((thread_args_t*)thread_args)->process->worker_id;
|
||||
int thread_id = ((thread_args_t*)thread_args)->thread_id;
|
||||
mako_args_t* args = ((thread_args_t*)thread_args)->process->args;
|
||||
FDBDatabase* database = ((thread_args_t*)thread_args)->process->database;
|
||||
size_t database_index = ((thread_args_t*)thread_args)->database_index;
|
||||
FDBDatabase* database = ((thread_args_t*)thread_args)->process->databases[database_index];
|
||||
fdb_error_t err;
|
||||
int rc;
|
||||
FDBTransaction* transaction;
|
||||
|
@ -1099,11 +1100,12 @@ void* worker_thread(void* thread_args) {
|
|||
}
|
||||
|
||||
fprintf(debugme,
|
||||
"DEBUG: worker_id:%d (%d) thread_id:%d (%d) (tid:%lld)\n",
|
||||
"DEBUG: worker_id:%d (%d) thread_id:%d (%d) database_index:%d (tid:%lld)\n",
|
||||
worker_id,
|
||||
args->num_processes,
|
||||
thread_id,
|
||||
args->num_threads,
|
||||
database_index,
|
||||
(uint64_t)pthread_self());
|
||||
|
||||
if (args->tpsmax) {
|
||||
|
@ -1231,6 +1233,7 @@ int worker_process_main(mako_args_t* args, int worker_id, mako_shmhdr_t* shm, pi
|
|||
fprintf(debugme, "DEBUG: worker %d started\n", worker_id);
|
||||
|
||||
/* Everything starts from here */
|
||||
|
||||
err = fdb_select_api_version(args->api_version);
|
||||
if (err) {
|
||||
fprintf(stderr, "ERROR: Failed at %s:%d (%s)\n", __FILE__, __LINE__, fdb_get_error(err));
|
||||
|
@ -1291,6 +1294,17 @@ int worker_process_main(mako_args_t* args, int worker_id, mako_shmhdr_t* shm, pi
|
|||
}
|
||||
}
|
||||
|
||||
if (args->client_threads_per_version > 0) {
|
||||
err = fdb_network_set_option(
|
||||
FDB_NET_OPTION_CLIENT_THREADS_PER_VERSION, (uint8_t*)&args->client_threads_per_version, sizeof(uint32_t));
|
||||
if (err) {
|
||||
fprintf(stderr,
|
||||
"ERROR: fdb_network_set_option (FDB_NET_OPTION_CLIENT_THREADS_PER_VERSION) (%d): %s\n",
|
||||
(uint8_t*)&args->client_threads_per_version,
|
||||
fdb_get_error(err));
|
||||
}
|
||||
}
|
||||
|
||||
/* Network thread must be setup before doing anything */
|
||||
fprintf(debugme, "DEBUG: fdb_setup_network\n");
|
||||
err = fdb_setup_network();
|
||||
|
@ -1328,11 +1342,16 @@ int worker_process_main(mako_args_t* args, int worker_id, mako_shmhdr_t* shm, pi
|
|||
fdb_future_destroy(f);
|
||||
|
||||
#else /* >= 610 */
|
||||
fdb_create_database(args->cluster_file, &process.database);
|
||||
#endif
|
||||
if (args->disable_ryw) {
|
||||
fdb_database_set_option(process.database, FDB_DB_OPTION_SNAPSHOT_RYW_DISABLE, (uint8_t*)NULL, 0);
|
||||
for (size_t i = 0; i < args->num_databases; i++) {
|
||||
size_t cluster_index = args->num_fdb_clusters <= 1 ? 0 : i % args->num_fdb_clusters;
|
||||
fdb_create_database(args->cluster_files[cluster_index], &process.databases[i]);
|
||||
fprintf(debugme, "DEBUG: creating database at cluster %s\n", args->cluster_files[cluster_index]);
|
||||
if (args->disable_ryw) {
|
||||
fdb_database_set_option(process.databases[i], FDB_DB_OPTION_SNAPSHOT_RYW_DISABLE, (uint8_t*)NULL, 0);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
fprintf(debugme, "DEBUG: creating %d worker threads\n", args->num_threads);
|
||||
worker_threads = (pthread_t*)calloc(sizeof(pthread_t), args->num_threads);
|
||||
if (!worker_threads) {
|
||||
|
@ -1349,6 +1368,8 @@ int worker_process_main(mako_args_t* args, int worker_id, mako_shmhdr_t* shm, pi
|
|||
|
||||
for (i = 0; i < args->num_threads; i++) {
|
||||
thread_args[i].thread_id = i;
|
||||
thread_args[i].database_index = i % args->num_databases;
|
||||
|
||||
for (int op = 0; op < MAX_OP; op++) {
|
||||
if (args->txnspec.ops[op][OP_COUNT] > 0 || op == OP_TRANSACTION || op == OP_COMMIT) {
|
||||
thread_args[i].block[op] = (lat_block_t*)malloc(sizeof(lat_block_t));
|
||||
|
@ -1388,7 +1409,10 @@ failExit:
|
|||
free(thread_args);
|
||||
|
||||
/* clean up database and cluster */
|
||||
fdb_database_destroy(process.database);
|
||||
for (size_t i = 0; i < args->num_databases; i++) {
|
||||
fdb_database_destroy(process.databases[i]);
|
||||
}
|
||||
|
||||
#if FDB_API_VERSION < 610
|
||||
fdb_cluster_destroy(cluster);
|
||||
#endif
|
||||
|
@ -1414,6 +1438,8 @@ int init_args(mako_args_t* args) {
|
|||
if (!args)
|
||||
return -1;
|
||||
memset(args, 0, sizeof(mako_args_t)); /* zero-out everything */
|
||||
args->num_fdb_clusters = 0;
|
||||
args->num_databases = 1;
|
||||
args->api_version = fdb_get_max_api_version();
|
||||
args->json = 0;
|
||||
args->num_processes = 1;
|
||||
|
@ -1446,6 +1472,7 @@ int init_args(mako_args_t* args) {
|
|||
for (i = 0; i < MAX_OP; i++) {
|
||||
args->txnspec.ops[i][OP_COUNT] = 0;
|
||||
}
|
||||
args->client_threads_per_version = 0;
|
||||
args->disable_ryw = 0;
|
||||
return 0;
|
||||
}
|
||||
|
@ -1579,6 +1606,7 @@ void usage() {
|
|||
printf("%-24s %s\n", "-v, --verbose", "Specify verbosity");
|
||||
printf("%-24s %s\n", "-a, --api_version=API_VERSION", "Specify API_VERSION to use");
|
||||
printf("%-24s %s\n", "-c, --cluster=FILE", "Specify FDB cluster file");
|
||||
printf("%-24s %s\n", "-d, --num_databases=NUM_DATABASES", "Specify number of databases");
|
||||
printf("%-24s %s\n", "-p, --procs=PROCS", "Specify number of worker processes");
|
||||
printf("%-24s %s\n", "-t, --threads=THREADS", "Specify number of worker threads");
|
||||
printf("%-24s %s\n", "-r, --rows=ROWS", "Specify number of records");
|
||||
|
@ -1620,50 +1648,54 @@ int parse_args(int argc, char* argv[], mako_args_t* args) {
|
|||
int c;
|
||||
int idx;
|
||||
while (1) {
|
||||
const char* short_options = "a:c:p:t:r:s:i:x:v:m:hjz";
|
||||
static struct option long_options[] = { /* name, has_arg, flag, val */
|
||||
{ "api_version", required_argument, NULL, 'a' },
|
||||
{ "cluster", required_argument, NULL, 'c' },
|
||||
{ "procs", required_argument, NULL, 'p' },
|
||||
{ "threads", required_argument, NULL, 't' },
|
||||
{ "rows", required_argument, NULL, 'r' },
|
||||
{ "seconds", required_argument, NULL, 's' },
|
||||
{ "iteration", required_argument, NULL, 'i' },
|
||||
{ "keylen", required_argument, NULL, ARG_KEYLEN },
|
||||
{ "vallen", required_argument, NULL, ARG_VALLEN },
|
||||
{ "transaction", required_argument, NULL, 'x' },
|
||||
{ "tps", required_argument, NULL, ARG_TPS },
|
||||
{ "tpsmax", required_argument, NULL, ARG_TPSMAX },
|
||||
{ "tpsmin", required_argument, NULL, ARG_TPSMIN },
|
||||
{ "tpsinterval", required_argument, NULL, ARG_TPSINTERVAL },
|
||||
{ "tpschange", required_argument, NULL, ARG_TPSCHANGE },
|
||||
{ "sampling", required_argument, NULL, ARG_SAMPLING },
|
||||
{ "verbose", required_argument, NULL, 'v' },
|
||||
{ "mode", required_argument, NULL, 'm' },
|
||||
{ "knobs", required_argument, NULL, ARG_KNOBS },
|
||||
{ "loggroup", required_argument, NULL, ARG_LOGGROUP },
|
||||
{ "tracepath", required_argument, NULL, ARG_TRACEPATH },
|
||||
{ "trace_format", required_argument, NULL, ARG_TRACEFORMAT },
|
||||
{ "streaming", required_argument, NULL, ARG_STREAMING_MODE },
|
||||
{ "txntrace", required_argument, NULL, ARG_TXNTRACE },
|
||||
/* no args */
|
||||
{ "help", no_argument, NULL, 'h' },
|
||||
{ "json", no_argument, NULL, 'j' },
|
||||
{ "zipf", no_argument, NULL, 'z' },
|
||||
{ "commitget", no_argument, NULL, ARG_COMMITGET },
|
||||
{ "flatbuffers", no_argument, NULL, ARG_FLATBUFFERS },
|
||||
{ "prefix_padding", no_argument, NULL, ARG_PREFIXPADDING },
|
||||
{ "trace", no_argument, NULL, ARG_TRACE },
|
||||
{ "txntagging", required_argument, NULL, ARG_TXNTAGGING },
|
||||
{ "txntagging_prefix", required_argument, NULL, ARG_TXNTAGGINGPREFIX },
|
||||
{ "version", no_argument, NULL, ARG_VERSION },
|
||||
{ "disable_ryw", no_argument, NULL, ARG_DISABLE_RYW },
|
||||
{ NULL, 0, NULL, 0 }
|
||||
const char* short_options = "a:c:d:p:t:r:s:i:x:v:m:hjz";
|
||||
static struct option long_options[] = {
|
||||
/* name, has_arg, flag, val */
|
||||
{ "api_version", required_argument, NULL, 'a' },
|
||||
{ "cluster", required_argument, NULL, 'c' },
|
||||
{ "num_databases", optional_argument, NULL, 'd' },
|
||||
{ "procs", required_argument, NULL, 'p' },
|
||||
{ "threads", required_argument, NULL, 't' },
|
||||
{ "rows", required_argument, NULL, 'r' },
|
||||
{ "seconds", required_argument, NULL, 's' },
|
||||
{ "iteration", required_argument, NULL, 'i' },
|
||||
{ "keylen", required_argument, NULL, ARG_KEYLEN },
|
||||
{ "vallen", required_argument, NULL, ARG_VALLEN },
|
||||
{ "transaction", required_argument, NULL, 'x' },
|
||||
{ "tps", required_argument, NULL, ARG_TPS },
|
||||
{ "tpsmax", required_argument, NULL, ARG_TPSMAX },
|
||||
{ "tpsmin", required_argument, NULL, ARG_TPSMIN },
|
||||
{ "tpsinterval", required_argument, NULL, ARG_TPSINTERVAL },
|
||||
{ "tpschange", required_argument, NULL, ARG_TPSCHANGE },
|
||||
{ "sampling", required_argument, NULL, ARG_SAMPLING },
|
||||
{ "verbose", required_argument, NULL, 'v' },
|
||||
{ "mode", required_argument, NULL, 'm' },
|
||||
{ "knobs", required_argument, NULL, ARG_KNOBS },
|
||||
{ "loggroup", required_argument, NULL, ARG_LOGGROUP },
|
||||
{ "tracepath", required_argument, NULL, ARG_TRACEPATH },
|
||||
{ "trace_format", required_argument, NULL, ARG_TRACEFORMAT },
|
||||
{ "streaming", required_argument, NULL, ARG_STREAMING_MODE },
|
||||
{ "txntrace", required_argument, NULL, ARG_TXNTRACE },
|
||||
/* no args */
|
||||
{ "help", no_argument, NULL, 'h' },
|
||||
{ "json", no_argument, NULL, 'j' },
|
||||
{ "zipf", no_argument, NULL, 'z' },
|
||||
{ "commitget", no_argument, NULL, ARG_COMMITGET },
|
||||
{ "flatbuffers", no_argument, NULL, ARG_FLATBUFFERS },
|
||||
{ "prefix_padding", no_argument, NULL, ARG_PREFIXPADDING },
|
||||
{ "trace", no_argument, NULL, ARG_TRACE },
|
||||
{ "txntagging", required_argument, NULL, ARG_TXNTAGGING },
|
||||
{ "txntagging_prefix", required_argument, NULL, ARG_TXNTAGGINGPREFIX },
|
||||
{ "version", no_argument, NULL, ARG_VERSION },
|
||||
{ "client_threads_per_version", required_argument, NULL, ARG_CLIENT_THREADS_PER_VERSION },
|
||||
{ "disable_ryw", no_argument, NULL, ARG_DISABLE_RYW },
|
||||
{ NULL, 0, NULL, 0 }
|
||||
};
|
||||
idx = 0;
|
||||
c = getopt_long(argc, argv, short_options, long_options, &idx);
|
||||
if (c < 0)
|
||||
if (c < 0) {
|
||||
break;
|
||||
}
|
||||
switch (c) {
|
||||
case '?':
|
||||
case 'h':
|
||||
|
@ -1672,8 +1704,17 @@ int parse_args(int argc, char* argv[], mako_args_t* args) {
|
|||
case 'a':
|
||||
args->api_version = atoi(optarg);
|
||||
break;
|
||||
case 'c':
|
||||
strcpy(args->cluster_file, optarg);
|
||||
case 'c': {
|
||||
const char delim[] = ",";
|
||||
char* cluster_file = strtok(optarg, delim);
|
||||
while (cluster_file != NULL) {
|
||||
strcpy(args->cluster_files[args->num_fdb_clusters++], cluster_file);
|
||||
cluster_file = strtok(NULL, delim);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'd':
|
||||
args->num_databases = atoi(optarg);
|
||||
break;
|
||||
case 'p':
|
||||
args->num_processes = atoi(optarg);
|
||||
|
@ -1812,6 +1853,9 @@ int parse_args(int argc, char* argv[], mako_args_t* args) {
|
|||
}
|
||||
memcpy(args->txntagging_prefix, optarg, strlen(optarg));
|
||||
break;
|
||||
case ARG_CLIENT_THREADS_PER_VERSION:
|
||||
args->client_threads_per_version = atoi(optarg);
|
||||
break;
|
||||
case ARG_DISABLE_RYW:
|
||||
args->disable_ryw = 1;
|
||||
break;
|
||||
|
@ -1858,6 +1902,28 @@ int validate_args(mako_args_t* args) {
|
|||
fprintf(stderr, "ERROR: --vallen must be a positive integer\n");
|
||||
return -1;
|
||||
}
|
||||
if (args->num_fdb_clusters > NUM_CLUSTERS_MAX) {
|
||||
fprintf(stderr, "ERROR: Mako is not supported to do work to more than %d clusters\n", NUM_CLUSTERS_MAX);
|
||||
return -1;
|
||||
}
|
||||
if (args->num_databases > NUM_DATABASES_MAX) {
|
||||
fprintf(stderr, "ERROR: Mako is not supported to do work to more than %d databases\n", NUM_DATABASES_MAX);
|
||||
return -1;
|
||||
}
|
||||
if (args->num_databases < args->num_fdb_clusters) {
|
||||
fprintf(stderr,
|
||||
"ERROR: --num_databases (%d) must be >= number of clusters(%d)\n",
|
||||
args->num_databases,
|
||||
args->num_fdb_clusters);
|
||||
return -1;
|
||||
}
|
||||
if (args->num_threads < args->num_databases) {
|
||||
fprintf(stderr,
|
||||
"ERROR: --threads (%d) must be >= number of databases (%d)\n",
|
||||
args->num_threads,
|
||||
args->num_databases);
|
||||
return -1;
|
||||
}
|
||||
if (args->key_length < 4 /* "mako" */ + digits(args->rows)) {
|
||||
fprintf(stderr,
|
||||
"ERROR: --keylen must be larger than %d to store \"mako\" prefix "
|
||||
|
|
|
@ -81,6 +81,7 @@ enum Arguments {
|
|||
ARG_TXNTAGGING,
|
||||
ARG_TXNTAGGINGPREFIX,
|
||||
ARG_STREAMING_MODE,
|
||||
ARG_CLIENT_THREADS_PER_VERSION,
|
||||
ARG_DISABLE_RYW
|
||||
};
|
||||
|
||||
|
@ -103,6 +104,8 @@ typedef struct {
|
|||
#define LOGGROUP_MAX 256
|
||||
#define KNOB_MAX 256
|
||||
#define TAGPREFIXLENGTH_MAX 8
|
||||
#define NUM_CLUSTERS_MAX 3
|
||||
#define NUM_DATABASES_MAX 10
|
||||
|
||||
/* benchmark parameters */
|
||||
typedef struct {
|
||||
|
@ -125,7 +128,9 @@ typedef struct {
|
|||
int commit_get;
|
||||
int verbose;
|
||||
mako_txnspec_t txnspec;
|
||||
char cluster_file[PATH_MAX];
|
||||
char cluster_files[NUM_CLUSTERS_MAX][PATH_MAX];
|
||||
int num_fdb_clusters;
|
||||
int num_databases;
|
||||
char log_group[LOGGROUP_MAX];
|
||||
int prefixpadding;
|
||||
int trace;
|
||||
|
@ -137,6 +142,7 @@ typedef struct {
|
|||
int txntagging;
|
||||
char txntagging_prefix[TAGPREFIXLENGTH_MAX];
|
||||
FDBStreamingMode streaming_mode;
|
||||
uint32_t client_threads_per_version;
|
||||
int disable_ryw;
|
||||
} mako_args_t;
|
||||
|
||||
|
@ -173,14 +179,15 @@ typedef struct {
|
|||
typedef struct {
|
||||
int worker_id;
|
||||
pid_t parent_id;
|
||||
FDBDatabase* database;
|
||||
mako_args_t* args;
|
||||
mako_shmhdr_t* shm;
|
||||
FDBDatabase* databases[NUM_DATABASES_MAX];
|
||||
} process_info_t;
|
||||
|
||||
/* args for threads */
|
||||
typedef struct {
|
||||
int thread_id;
|
||||
int database_index; // index of the database to do work to
|
||||
int elem_size[MAX_OP]; /* stores the multiple of LAT_BLOCK_SIZE to check the memory allocation of each operation */
|
||||
bool is_memory_allocated[MAX_OP]; /* flag specified for each operation, whether the memory was allocated to that
|
||||
specific operation */
|
||||
|
|
|
@ -538,6 +538,8 @@ void DLApi::runNetwork() {
|
|||
hook.first(hook.second);
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "NetworkShutdownHookError").error(e);
|
||||
} catch (std::exception& e) {
|
||||
TraceEvent(SevError, "NetworkShutdownHookError").error(unknown_error()).detail("RootException", e.what());
|
||||
} catch (...) {
|
||||
TraceEvent(SevError, "NetworkShutdownHookError").error(unknown_error());
|
||||
}
|
||||
|
@ -1813,9 +1815,14 @@ THREAD_FUNC_RETURN runNetworkThread(void* param) {
|
|||
try {
|
||||
((ClientInfo*)param)->api->runNetwork();
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "RunNetworkError").error(e);
|
||||
TraceEvent(SevError, "ExternalRunNetworkError").error(e);
|
||||
} catch (std::exception& e) {
|
||||
TraceEvent(SevError, "ExternalRunNetworkError").error(unknown_error()).detail("RootException", e.what());
|
||||
} catch (...) {
|
||||
TraceEvent(SevError, "ExternalRunNetworkError").error(unknown_error());
|
||||
}
|
||||
|
||||
TraceEvent("ExternalNetworkThreadTerminating");
|
||||
THREAD_RETURN;
|
||||
}
|
||||
|
||||
|
@ -1852,6 +1859,7 @@ void MultiVersionApi::stopNetwork() {
|
|||
}
|
||||
lock.leave();
|
||||
|
||||
TraceEvent("MultiVersionStopNetwork");
|
||||
localClient->api->stopNetwork();
|
||||
|
||||
if (!bypassMultiClientApi) {
|
||||
|
|
|
@ -2164,6 +2164,7 @@ void stopNetwork() {
|
|||
if (!g_network)
|
||||
throw network_not_setup();
|
||||
|
||||
TraceEvent("ClientStopNetwork");
|
||||
g_network->stop();
|
||||
closeTraceFile();
|
||||
}
|
||||
|
|
|
@ -454,6 +454,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( SIM_SHUTDOWN_TIMEOUT, 10 );
|
||||
init( SHUTDOWN_TIMEOUT, 600 ); if( randomize && BUGGIFY ) SHUTDOWN_TIMEOUT = 60.0;
|
||||
init( MASTER_SPIN_DELAY, 1.0 ); if( randomize && BUGGIFY ) MASTER_SPIN_DELAY = 10.0;
|
||||
init( CC_PRUNE_CLIENTS_INTERVAL, 60.0 );
|
||||
init( CC_CHANGE_DELAY, 0.1 );
|
||||
init( CC_CLASS_DELAY, 0.01 );
|
||||
init( WAIT_FOR_GOOD_RECRUITMENT_DELAY, 1.0 );
|
||||
|
@ -740,6 +741,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( FASTRESTORE_EXPENSIVE_VALIDATION, false ); if( randomize && BUGGIFY ) { FASTRESTORE_EXPENSIVE_VALIDATION = deterministicRandom()->random01() < 0.5 ? true : false;}
|
||||
init( FASTRESTORE_WRITE_BW_MB, 70 ); if( randomize && BUGGIFY ) { FASTRESTORE_WRITE_BW_MB = deterministicRandom()->random01() < 0.5 ? 2 : 100;}
|
||||
init( FASTRESTORE_RATE_UPDATE_SECONDS, 1.0 ); if( randomize && BUGGIFY ) { FASTRESTORE_RATE_UPDATE_SECONDS = deterministicRandom()->random01() < 0.5 ? 0.1 : 2;}
|
||||
init( FASTRESTORE_DUMP_INSERT_RANGE_VERSION, false );
|
||||
|
||||
init( REDWOOD_DEFAULT_PAGE_SIZE, 4096 );
|
||||
init( REDWOOD_DEFAULT_EXTENT_SIZE, 32 * 1024 * 1024 );
|
||||
|
|
|
@ -377,6 +377,7 @@ public:
|
|||
double SIM_SHUTDOWN_TIMEOUT;
|
||||
double SHUTDOWN_TIMEOUT;
|
||||
double MASTER_SPIN_DELAY;
|
||||
double CC_PRUNE_CLIENTS_INTERVAL;
|
||||
double CC_CHANGE_DELAY;
|
||||
double CC_CLASS_DELAY;
|
||||
double WAIT_FOR_GOOD_RECRUITMENT_DELAY;
|
||||
|
@ -688,6 +689,8 @@ public:
|
|||
bool FASTRESTORE_EXPENSIVE_VALIDATION; // when set true, performance will be heavily affected
|
||||
double FASTRESTORE_WRITE_BW_MB; // target aggregated write bandwidth from all appliers
|
||||
double FASTRESTORE_RATE_UPDATE_SECONDS; // how long to update appliers target write rate
|
||||
bool FASTRESTORE_DUMP_INSERT_RANGE_VERSION; // Dump all the range version after insertion. This is for debugging
|
||||
// purpose.
|
||||
|
||||
int REDWOOD_DEFAULT_PAGE_SIZE; // Page size for new Redwood files
|
||||
int REDWOOD_DEFAULT_EXTENT_SIZE; // Extent size for new Redwood files
|
||||
|
|
|
@ -443,7 +443,14 @@ void ThreadSafeApi::runNetwork() {
|
|||
try {
|
||||
::runNetwork();
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "RunNetworkError").error(e);
|
||||
runErr = e;
|
||||
} catch (std::exception& e) {
|
||||
runErr = unknown_error();
|
||||
TraceEvent(SevError, "RunNetworkError").error(unknown_error()).detail("RootException", e.what());
|
||||
} catch (...) {
|
||||
runErr = unknown_error();
|
||||
TraceEvent(SevError, "RunNetworkError").error(unknown_error());
|
||||
}
|
||||
|
||||
for (auto& hook : threadCompletionHooks) {
|
||||
|
@ -451,6 +458,8 @@ void ThreadSafeApi::runNetwork() {
|
|||
hook.first(hook.second);
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "NetworkShutdownHookError").error(e);
|
||||
} catch (std::exception& e) {
|
||||
TraceEvent(SevError, "NetworkShutdownHookError").error(unknown_error()).detail("RootException", e.what());
|
||||
} catch (...) {
|
||||
TraceEvent(SevError, "NetworkShutdownHookError").error(unknown_error());
|
||||
}
|
||||
|
@ -459,6 +468,8 @@ void ThreadSafeApi::runNetwork() {
|
|||
if (runErr.present()) {
|
||||
throw runErr.get();
|
||||
}
|
||||
|
||||
TraceEvent("RunNetworkTerminating");
|
||||
}
|
||||
|
||||
void ThreadSafeApi::stopNetwork() {
|
||||
|
|
|
@ -210,6 +210,7 @@ public:
|
|||
|
||||
void operator=(RangeMap&& r) noexcept { map = std::move(r.map); }
|
||||
// void clear( const Val& value ) { ranges.clear(); ranges.insert(std::make_pair(Key(),value)); }
|
||||
void clear() { map.clear(); }
|
||||
|
||||
void insert(const Range& keys, const Val& value);
|
||||
|
||||
|
|
|
@ -133,6 +133,8 @@ public:
|
|||
int logGenerations;
|
||||
bool cachePopulated;
|
||||
std::map<NetworkAddress, std::pair<double, OpenDatabaseRequest>> clientStatus;
|
||||
Future<Void> clientCounter;
|
||||
int clientCount;
|
||||
|
||||
DBInfo()
|
||||
: clientInfo(new AsyncVar<ClientDBInfo>()), serverInfo(new AsyncVar<ServerDBInfo>()),
|
||||
|
@ -143,7 +145,9 @@ public:
|
|||
EnableLocalityLoadBalance::True,
|
||||
TaskPriority::DefaultEndpoint,
|
||||
LockAware::True)), // SOMEDAY: Locality!
|
||||
unfinishedRecoveries(0), logGenerations(0), cachePopulated(false) {}
|
||||
unfinishedRecoveries(0), logGenerations(0), cachePopulated(false), clientCount(0) {
|
||||
clientCounter = countClients(this);
|
||||
}
|
||||
|
||||
void setDistributor(const DataDistributorInterface& interf) {
|
||||
auto newInfo = serverInfo->get();
|
||||
|
@ -172,6 +176,22 @@ public:
|
|||
}
|
||||
serverInfo->set(newInfo);
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> countClients(DBInfo* self) {
|
||||
loop {
|
||||
wait(delay(SERVER_KNOBS->CC_PRUNE_CLIENTS_INTERVAL));
|
||||
|
||||
self->clientCount = 0;
|
||||
for (auto itr = self->clientStatus.begin(); itr != self->clientStatus.end();) {
|
||||
if (now() - itr->second.first < 2 * SERVER_KNOBS->COORDINATOR_REGISTER_INTERVAL) {
|
||||
self->clientCount += itr->second.second.clientCount;
|
||||
++itr;
|
||||
} else {
|
||||
itr = self->clientStatus.erase(itr);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
struct UpdateWorkerList {
|
||||
|
@ -3174,6 +3194,8 @@ public:
|
|||
serverInfo.myLocality = locality;
|
||||
db.serverInfo->set(serverInfo);
|
||||
cx = openDBOnServer(db.serverInfo, TaskPriority::DefaultEndpoint, LockAware::True);
|
||||
|
||||
specialCounter(clusterControllerMetrics, "ClientCount", [this]() { return db.clientCount; });
|
||||
}
|
||||
|
||||
~ClusterControllerData() {
|
||||
|
|
|
@ -6297,7 +6297,18 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
|
|||
self->teamCollection = nullptr;
|
||||
primaryTeamCollection = Reference<DDTeamCollection>();
|
||||
remoteTeamCollection = Reference<DDTeamCollection>();
|
||||
wait(shards.clearAsync());
|
||||
if (err.code() == error_code_actor_cancelled) {
|
||||
// When cancelled, we cannot clear asyncronously because
|
||||
// this will result in invalid memory access. This should only
|
||||
// be an issue in simulation.
|
||||
if (!g_network->isSimulated()) {
|
||||
TraceEvent(SevWarnAlways, "DataDistributorCancelled");
|
||||
}
|
||||
shards.clear();
|
||||
throw e;
|
||||
} else {
|
||||
wait(shards.clearAsync());
|
||||
}
|
||||
TraceEvent("DataDistributorTeamCollectionsDestroyed").error(err);
|
||||
if (removeFailedServer.getFuture().isReady() && !removeFailedServer.getFuture().isError()) {
|
||||
TraceEvent("RemoveFailedServer", removeFailedServer.getFuture().get()).error(err);
|
||||
|
|
|
@ -999,8 +999,8 @@ public:
|
|||
return endLocation();
|
||||
}
|
||||
|
||||
Future<Void> getError() override { return rawQueue->getError(); }
|
||||
Future<Void> onClosed() override { return rawQueue->onClosed(); }
|
||||
Future<Void> getError() const override { return rawQueue->getError(); }
|
||||
Future<Void> onClosed() const override { return rawQueue->onClosed(); }
|
||||
|
||||
void dispose() override {
|
||||
TraceEvent("DQDestroy", dbgid)
|
||||
|
@ -1551,8 +1551,8 @@ public:
|
|||
popped(0), committed(0){};
|
||||
|
||||
// IClosable
|
||||
Future<Void> getError() override { return queue->getError(); }
|
||||
Future<Void> onClosed() override { return queue->onClosed(); }
|
||||
Future<Void> getError() const override { return queue->getError(); }
|
||||
Future<Void> onClosed() const override { return queue->onClosed(); }
|
||||
void dispose() override {
|
||||
queue->dispose();
|
||||
delete this;
|
||||
|
|
|
@ -30,10 +30,12 @@ public:
|
|||
// IClosable is a base interface for any disk-backed data structure that needs to support asynchronous errors,
|
||||
// shutdown and deletion
|
||||
|
||||
virtual Future<Void> getError() = 0; // asynchronously throws an error if there is an internal error. Never set
|
||||
// inside (on the stack of) a call to another API function on this object.
|
||||
virtual Future<Void> onClosed() = 0; // the future is set to Void when this is totally shut down after dispose() or
|
||||
// close(). But this function cannot be called after dispose or close!
|
||||
virtual Future<Void> getError()
|
||||
const = 0; // asynchronously throws an error if there is an internal error. Never set
|
||||
// inside (on the stack of) a call to another API function on this object.
|
||||
virtual Future<Void> onClosed()
|
||||
const = 0; // the future is set to Void when this is totally shut down after dispose() or
|
||||
// close(). But this function cannot be called after dispose or close!
|
||||
virtual void dispose() = 0; // permanently delete the data AND invalidate this interface
|
||||
virtual void close() = 0; // invalidate this interface, but do not delete the data. Outstanding operations may or
|
||||
// may not take effect in the background.
|
||||
|
@ -47,16 +49,30 @@ public:
|
|||
virtual Future<Void> commit(
|
||||
bool sequential = false) = 0; // returns when prior sets and clears are (atomically) durable
|
||||
|
||||
virtual Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID = Optional<UID>()) = 0;
|
||||
enum class ReadType {
|
||||
EAGER,
|
||||
FETCH,
|
||||
LOW,
|
||||
NORMAL,
|
||||
HIGH,
|
||||
};
|
||||
|
||||
virtual Future<Optional<Value>> readValue(KeyRef key,
|
||||
ReadType type = ReadType::NORMAL,
|
||||
Optional<UID> debugID = Optional<UID>()) = 0;
|
||||
|
||||
// Like readValue(), but returns only the first maxLength bytes of the value if it is longer
|
||||
virtual Future<Optional<Value>> readValuePrefix(KeyRef key,
|
||||
int maxLength,
|
||||
ReadType type = ReadType::NORMAL,
|
||||
Optional<UID> debugID = Optional<UID>()) = 0;
|
||||
|
||||
// If rowLimit>=0, reads first rows sorted ascending, otherwise reads last rows sorted descending
|
||||
// The total size of the returned value (less the last entry) will be less than byteLimit
|
||||
virtual Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit = 1 << 30, int byteLimit = 1 << 30) = 0;
|
||||
virtual Future<RangeResult> readRange(KeyRangeRef keys,
|
||||
int rowLimit = 1 << 30,
|
||||
int byteLimit = 1 << 30,
|
||||
ReadType type = ReadType::NORMAL) = 0;
|
||||
|
||||
// To debug MEMORY_RADIXTREE type ONLY
|
||||
// Returns (1) how many key & value pairs have been inserted (2) how many nodes have been created (3) how many
|
||||
|
|
|
@ -35,8 +35,8 @@ struct KeyValueStoreCompressTestData final : IKeyValueStore {
|
|||
|
||||
KeyValueStoreCompressTestData(IKeyValueStore* store) : store(store) {}
|
||||
|
||||
Future<Void> getError() override { return store->getError(); }
|
||||
Future<Void> onClosed() override { return store->onClosed(); }
|
||||
Future<Void> getError() const override { return store->getError(); }
|
||||
Future<Void> onClosed() const override { return store->onClosed(); }
|
||||
void dispose() override {
|
||||
|
||||
store->dispose();
|
||||
|
@ -56,7 +56,7 @@ struct KeyValueStoreCompressTestData final : IKeyValueStore {
|
|||
void clear(KeyRangeRef range, const Arena* arena = nullptr) override { store->clear(range, arena); }
|
||||
Future<Void> commit(bool sequential = false) override { return store->commit(sequential); }
|
||||
|
||||
Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID = Optional<UID>()) override {
|
||||
Future<Optional<Value>> readValue(KeyRef key, IKeyValueStore::ReadType, Optional<UID> debugID) override {
|
||||
return doReadValue(store, key, debugID);
|
||||
}
|
||||
|
||||
|
@ -66,19 +66,20 @@ struct KeyValueStoreCompressTestData final : IKeyValueStore {
|
|||
// reason, you will need to fix this.
|
||||
Future<Optional<Value>> readValuePrefix(KeyRef key,
|
||||
int maxLength,
|
||||
Optional<UID> debugID = Optional<UID>()) override {
|
||||
IKeyValueStore::ReadType,
|
||||
Optional<UID> debugID) override {
|
||||
return doReadValuePrefix(store, key, maxLength, debugID);
|
||||
}
|
||||
|
||||
// If rowLimit>=0, reads first rows sorted ascending, otherwise reads last rows sorted descending
|
||||
// The total size of the returned value (less the last entry) will be less than byteLimit
|
||||
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit = 1 << 30, int byteLimit = 1 << 30) override {
|
||||
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit, int byteLimit, IKeyValueStore::ReadType) override {
|
||||
return doReadRange(store, keys, rowLimit, byteLimit);
|
||||
}
|
||||
|
||||
private:
|
||||
ACTOR static Future<Optional<Value>> doReadValue(IKeyValueStore* store, Key key, Optional<UID> debugID) {
|
||||
Optional<Value> v = wait(store->readValue(key, debugID));
|
||||
Optional<Value> v = wait(store->readValue(key, IKeyValueStore::ReadType::NORMAL, debugID));
|
||||
if (!v.present())
|
||||
return v;
|
||||
return unpack(v.get());
|
||||
|
|
|
@ -43,8 +43,8 @@ public:
|
|||
bool exactRecovery);
|
||||
|
||||
// IClosable
|
||||
Future<Void> getError() override { return log->getError(); }
|
||||
Future<Void> onClosed() override { return log->onClosed(); }
|
||||
Future<Void> getError() const override { return log->getError(); }
|
||||
Future<Void> onClosed() const override { return log->onClosed(); }
|
||||
void dispose() override {
|
||||
recovering.cancel();
|
||||
log->dispose();
|
||||
|
@ -194,7 +194,7 @@ public:
|
|||
return c;
|
||||
}
|
||||
|
||||
Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID = Optional<UID>()) override {
|
||||
Future<Optional<Value>> readValue(KeyRef key, IKeyValueStore::ReadType, Optional<UID> debugID) override {
|
||||
if (recovering.isError())
|
||||
throw recovering.getError();
|
||||
if (!recovering.isReady())
|
||||
|
@ -208,7 +208,8 @@ public:
|
|||
|
||||
Future<Optional<Value>> readValuePrefix(KeyRef key,
|
||||
int maxLength,
|
||||
Optional<UID> debugID = Optional<UID>()) override {
|
||||
IKeyValueStore::ReadType,
|
||||
Optional<UID> debugID) override {
|
||||
if (recovering.isError())
|
||||
throw recovering.getError();
|
||||
if (!recovering.isReady())
|
||||
|
@ -227,7 +228,7 @@ public:
|
|||
|
||||
// If rowLimit>=0, reads first rows sorted ascending, otherwise reads last rows sorted descending
|
||||
// The total size of the returned value (less the last entry) will be less than byteLimit
|
||||
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit = 1 << 30, int byteLimit = 1 << 30) override {
|
||||
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit, int byteLimit, IKeyValueStore::ReadType) override {
|
||||
if (recovering.isError())
|
||||
throw recovering.getError();
|
||||
if (!recovering.isReady())
|
||||
|
@ -826,18 +827,18 @@ private:
|
|||
|
||||
ACTOR static Future<Optional<Value>> waitAndReadValue(KeyValueStoreMemory* self, Key key) {
|
||||
wait(self->recovering);
|
||||
return self->readValue(key).get();
|
||||
return static_cast<IKeyValueStore*>(self)->readValue(key).get();
|
||||
}
|
||||
ACTOR static Future<Optional<Value>> waitAndReadValuePrefix(KeyValueStoreMemory* self, Key key, int maxLength) {
|
||||
wait(self->recovering);
|
||||
return self->readValuePrefix(key, maxLength).get();
|
||||
return static_cast<IKeyValueStore*>(self)->readValuePrefix(key, maxLength).get();
|
||||
}
|
||||
ACTOR static Future<RangeResult> waitAndReadRange(KeyValueStoreMemory* self,
|
||||
KeyRange keys,
|
||||
int rowLimit,
|
||||
int byteLimit) {
|
||||
wait(self->recovering);
|
||||
return self->readRange(keys, rowLimit, byteLimit).get();
|
||||
return static_cast<IKeyValueStore*>(self)->readRange(keys, rowLimit, byteLimit).get();
|
||||
}
|
||||
ACTOR static Future<Void> waitAndCommit(KeyValueStoreMemory* self, bool sequential) {
|
||||
wait(self->recovering);
|
||||
|
|
|
@ -581,7 +581,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
}
|
||||
}
|
||||
|
||||
Future<Void> getError() override { return errorPromise.getFuture(); }
|
||||
Future<Void> getError() const override { return errorPromise.getFuture(); }
|
||||
|
||||
ACTOR static void doClose(RocksDBKeyValueStore* self, bool deleteOnClose) {
|
||||
// The metrics future retains a reference to the DB, so stop it before we delete it.
|
||||
|
@ -600,7 +600,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
delete self;
|
||||
}
|
||||
|
||||
Future<Void> onClosed() override { return closePromise.getFuture(); }
|
||||
Future<Void> onClosed() const override { return closePromise.getFuture(); }
|
||||
|
||||
void dispose() override { doClose(this, true); }
|
||||
|
||||
|
@ -645,21 +645,24 @@ struct RocksDBKeyValueStore : IKeyValueStore {
|
|||
return res;
|
||||
}
|
||||
|
||||
Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID) override {
|
||||
Future<Optional<Value>> readValue(KeyRef key, IKeyValueStore::ReadType, Optional<UID> debugID) override {
|
||||
auto a = new Reader::ReadValueAction(key, debugID);
|
||||
auto res = a->result.getFuture();
|
||||
readThreads->post(a);
|
||||
return res;
|
||||
}
|
||||
|
||||
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, Optional<UID> debugID) override {
|
||||
Future<Optional<Value>> readValuePrefix(KeyRef key,
|
||||
int maxLength,
|
||||
IKeyValueStore::ReadType,
|
||||
Optional<UID> debugID) override {
|
||||
auto a = new Reader::ReadValuePrefixAction(key, maxLength, debugID);
|
||||
auto res = a->result.getFuture();
|
||||
readThreads->post(a);
|
||||
return res;
|
||||
}
|
||||
|
||||
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit, int byteLimit) override {
|
||||
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit, int byteLimit, IKeyValueStore::ReadType) override {
|
||||
auto a = new Reader::ReadRangeAction(keys, rowLimit, byteLimit);
|
||||
auto res = a->result.getFuture();
|
||||
readThreads->post(a);
|
||||
|
|
|
@ -1567,8 +1567,8 @@ public:
|
|||
void dispose() override { doClose(this, true); }
|
||||
void close() override { doClose(this, false); }
|
||||
|
||||
Future<Void> getError() override { return delayed(readThreads->getError() || writeThread->getError()); }
|
||||
Future<Void> onClosed() override { return stopped.getFuture(); }
|
||||
Future<Void> getError() const override { return delayed(readThreads->getError() || writeThread->getError()); }
|
||||
Future<Void> onClosed() const override { return stopped.getFuture(); }
|
||||
|
||||
KeyValueStoreType getType() const override { return type; }
|
||||
StorageBytes getStorageBytes() const override;
|
||||
|
@ -1577,9 +1577,12 @@ public:
|
|||
void clear(KeyRangeRef range, const Arena* arena = nullptr) override;
|
||||
Future<Void> commit(bool sequential = false) override;
|
||||
|
||||
Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID) override;
|
||||
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, Optional<UID> debugID) override;
|
||||
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit = 1 << 30, int byteLimit = 1 << 30) override;
|
||||
Future<Optional<Value>> readValue(KeyRef key, IKeyValueStore::ReadType, Optional<UID> debugID) override;
|
||||
Future<Optional<Value>> readValuePrefix(KeyRef key,
|
||||
int maxLength,
|
||||
IKeyValueStore::ReadType,
|
||||
Optional<UID> debugID) override;
|
||||
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit, int byteLimit, IKeyValueStore::ReadType) override;
|
||||
|
||||
KeyValueStoreSQLite(std::string const& filename,
|
||||
UID logID,
|
||||
|
@ -2192,21 +2195,27 @@ Future<Void> KeyValueStoreSQLite::commit(bool sequential) {
|
|||
writeThread->post(p);
|
||||
return f;
|
||||
}
|
||||
Future<Optional<Value>> KeyValueStoreSQLite::readValue(KeyRef key, Optional<UID> debugID) {
|
||||
Future<Optional<Value>> KeyValueStoreSQLite::readValue(KeyRef key, IKeyValueStore::ReadType, Optional<UID> debugID) {
|
||||
++readsRequested;
|
||||
auto p = new Reader::ReadValueAction(key, debugID);
|
||||
auto f = p->result.getFuture();
|
||||
readThreads->post(p);
|
||||
return f;
|
||||
}
|
||||
Future<Optional<Value>> KeyValueStoreSQLite::readValuePrefix(KeyRef key, int maxLength, Optional<UID> debugID) {
|
||||
Future<Optional<Value>> KeyValueStoreSQLite::readValuePrefix(KeyRef key,
|
||||
int maxLength,
|
||||
IKeyValueStore::ReadType,
|
||||
Optional<UID> debugID) {
|
||||
++readsRequested;
|
||||
auto p = new Reader::ReadValuePrefixAction(key, maxLength, debugID);
|
||||
auto f = p->result.getFuture();
|
||||
readThreads->post(p);
|
||||
return f;
|
||||
}
|
||||
Future<RangeResult> KeyValueStoreSQLite::readRange(KeyRangeRef keys, int rowLimit, int byteLimit) {
|
||||
Future<RangeResult> KeyValueStoreSQLite::readRange(KeyRangeRef keys,
|
||||
int rowLimit,
|
||||
int byteLimit,
|
||||
IKeyValueStore::ReadType) {
|
||||
++readsRequested;
|
||||
auto p = new Reader::ReadRangeAction(keys, rowLimit, byteLimit);
|
||||
auto f = p->result.getFuture();
|
||||
|
|
|
@ -218,11 +218,11 @@ Future<Void> LogSystemDiskQueueAdapter::commit() {
|
|||
return cm.acknowledge.getFuture();
|
||||
}
|
||||
|
||||
Future<Void> LogSystemDiskQueueAdapter::getError() {
|
||||
Future<Void> LogSystemDiskQueueAdapter::getError() const {
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> LogSystemDiskQueueAdapter::onClosed() {
|
||||
Future<Void> LogSystemDiskQueueAdapter::onClosed() const {
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
|
|
@ -88,8 +88,8 @@ public:
|
|||
Future<CommitMessage> getCommitMessage();
|
||||
|
||||
// IClosable interface
|
||||
Future<Void> getError() override;
|
||||
Future<Void> onClosed() override;
|
||||
Future<Void> getError() const override;
|
||||
Future<Void> onClosed() const override;
|
||||
void dispose() override;
|
||||
void close() override;
|
||||
|
||||
|
|
|
@ -157,8 +157,8 @@ public:
|
|||
Future<Void> commit() { return queue->commit(); }
|
||||
|
||||
// Implements IClosable
|
||||
Future<Void> getError() override { return queue->getError(); }
|
||||
Future<Void> onClosed() override { return queue->onClosed(); }
|
||||
Future<Void> getError() const override { return queue->getError(); }
|
||||
Future<Void> onClosed() const override { return queue->onClosed(); }
|
||||
void dispose() override {
|
||||
queue->dispose();
|
||||
delete this;
|
||||
|
|
|
@ -123,8 +123,8 @@ public:
|
|||
Future<Void> commit() { return queue->commit(); }
|
||||
|
||||
// Implements IClosable
|
||||
Future<Void> getError() override { return queue->getError(); }
|
||||
Future<Void> onClosed() override { return queue->onClosed(); }
|
||||
Future<Void> getError() const override { return queue->getError(); }
|
||||
Future<Void> onClosed() const override { return queue->onClosed(); }
|
||||
void dispose() override {
|
||||
queue->dispose();
|
||||
delete this;
|
||||
|
|
|
@ -127,8 +127,8 @@ public:
|
|||
Future<Void> commit() { return queue->commit(); }
|
||||
|
||||
// Implements IClosable
|
||||
Future<Void> getError() override { return queue->getError(); }
|
||||
Future<Void> onClosed() override { return queue->onClosed(); }
|
||||
Future<Void> getError() const override { return queue->getError(); }
|
||||
Future<Void> onClosed() const override { return queue->onClosed(); }
|
||||
void dispose() override {
|
||||
queue->dispose();
|
||||
delete this;
|
||||
|
|
|
@ -56,11 +56,11 @@ IKeyValueStore* OnDemandStore::operator->() {
|
|||
return get();
|
||||
}
|
||||
|
||||
Future<Void> OnDemandStore::getError() {
|
||||
Future<Void> OnDemandStore::getError() const {
|
||||
return onErr(err.getFuture());
|
||||
}
|
||||
|
||||
Future<Void> OnDemandStore::onClosed() {
|
||||
Future<Void> OnDemandStore::onClosed() const {
|
||||
return store->onClosed();
|
||||
}
|
||||
|
||||
|
|
|
@ -41,8 +41,8 @@ public:
|
|||
bool exists() const;
|
||||
IKeyValueStore* operator->();
|
||||
|
||||
Future<Void> getError() override;
|
||||
Future<Void> onClosed() override;
|
||||
Future<Void> getError() const override;
|
||||
Future<Void> onClosed() const override;
|
||||
void dispose() override;
|
||||
void close() override;
|
||||
};
|
||||
|
|
|
@ -638,6 +638,11 @@ ACTOR Future<Void> waitForQuietDatabase(Database cx,
|
|||
if (g_network->isSimulated())
|
||||
wait(delay(5.0));
|
||||
|
||||
TraceEvent("QuietDatabaseWaitingOnFullRecovery").log();
|
||||
while (dbInfo->get().recoveryState != RecoveryState::FULLY_RECOVERED) {
|
||||
wait(dbInfo->onChange());
|
||||
}
|
||||
|
||||
// The quiet database check (which runs at the end of every test) will always time out due to active data movement.
|
||||
// To get around this, quiet Database will disable the perpetual wiggle in the setup phase.
|
||||
|
||||
|
|
|
@ -856,18 +856,20 @@ ACTOR static Future<Void> insertRangeVersion(KeyRangeMap<Version>* pRangeVersion
|
|||
r->value() = std::max(r->value(), file->version);
|
||||
}
|
||||
|
||||
// Dump the new key ranges
|
||||
ranges = pRangeVersions->ranges();
|
||||
int i = 0;
|
||||
for (auto r = ranges.begin(); r != ranges.end(); ++r) {
|
||||
TraceEvent(SevDebug, "RangeVersionsAfterUpdate")
|
||||
.detail("File", file->toString())
|
||||
.detail("FileRange", fileRange.toString())
|
||||
.detail("FileVersion", file->version)
|
||||
.detail("RangeIndex", i++)
|
||||
.detail("RangeBegin", r->begin())
|
||||
.detail("RangeEnd", r->end())
|
||||
.detail("RangeVersion", r->value());
|
||||
if (SERVER_KNOBS->FASTRESTORE_DUMP_INSERT_RANGE_VERSION) {
|
||||
// Dump the new key ranges for debugging purpose.
|
||||
ranges = pRangeVersions->ranges();
|
||||
int i = 0;
|
||||
for (auto r = ranges.begin(); r != ranges.end(); ++r) {
|
||||
TraceEvent(SevDebug, "RangeVersionsAfterUpdate")
|
||||
.detail("File", file->toString())
|
||||
.detail("FileRange", fileRange.toString())
|
||||
.detail("FileVersion", file->version)
|
||||
.detail("RangeIndex", i++)
|
||||
.detail("RangeBegin", r->begin())
|
||||
.detail("RangeEnd", r->end())
|
||||
.detail("RangeVersion", r->value());
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
|
|
@ -128,8 +128,8 @@ public:
|
|||
Future<Void> commit() { return queue->commit(); }
|
||||
|
||||
// Implements IClosable
|
||||
Future<Void> getError() override { return queue->getError(); }
|
||||
Future<Void> onClosed() override { return queue->onClosed(); }
|
||||
Future<Void> getError() const override { return queue->getError(); }
|
||||
Future<Void> onClosed() const override { return queue->onClosed(); }
|
||||
void dispose() override {
|
||||
queue->dispose();
|
||||
delete this;
|
||||
|
|
|
@ -3403,9 +3403,9 @@ public:
|
|||
|
||||
void close() override { shutdown(this, false); }
|
||||
|
||||
Future<Void> getError() override { return errorPromise.getFuture(); }
|
||||
Future<Void> getError() const override { return errorPromise.getFuture(); }
|
||||
|
||||
Future<Void> onClosed() override { return closedPromise.getFuture(); }
|
||||
Future<Void> onClosed() const override { return closedPromise.getFuture(); }
|
||||
|
||||
StorageBytes getStorageBytes() const override {
|
||||
int64_t free;
|
||||
|
@ -4477,9 +4477,9 @@ public:
|
|||
|
||||
// All async opts on the btree are based on pager reads, writes, and commits, so
|
||||
// we can mostly forward these next few functions to the pager
|
||||
Future<Void> getError() { return m_pager->getError(); }
|
||||
Future<Void> getError() const { return m_pager->getError(); }
|
||||
|
||||
Future<Void> onClosed() { return m_pager->onClosed(); }
|
||||
Future<Void> onClosed() const { return m_pager->onClosed(); }
|
||||
|
||||
void close_impl(bool dispose) {
|
||||
auto* pager = m_pager;
|
||||
|
@ -7052,7 +7052,7 @@ public:
|
|||
|
||||
void dispose() override { shutdown(this, true); }
|
||||
|
||||
Future<Void> onClosed() override { return m_closed.getFuture(); }
|
||||
Future<Void> onClosed() const override { return m_closed.getFuture(); }
|
||||
|
||||
Future<Void> commit(bool sequential = false) override {
|
||||
Future<Void> c = m_tree->commit(m_nextCommitVersion);
|
||||
|
@ -7066,7 +7066,7 @@ public:
|
|||
|
||||
StorageBytes getStorageBytes() const override { return m_tree->getStorageBytes(); }
|
||||
|
||||
Future<Void> getError() override { return delayed(m_error.getFuture()); };
|
||||
Future<Void> getError() const override { return delayed(m_error.getFuture()); };
|
||||
|
||||
void clear(KeyRangeRef range, const Arena* arena = 0) override {
|
||||
debug_printf("CLEAR %s\n", printable(range).c_str());
|
||||
|
@ -7078,7 +7078,7 @@ public:
|
|||
m_tree->set(keyValue);
|
||||
}
|
||||
|
||||
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit = 1 << 30, int byteLimit = 1 << 30) override {
|
||||
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit, int byteLimit, IKeyValueStore::ReadType) override {
|
||||
debug_printf("READRANGE %s\n", printable(keys).c_str());
|
||||
return catchError(readRange_impl(this, keys, rowLimit, byteLimit));
|
||||
}
|
||||
|
@ -7245,13 +7245,14 @@ public:
|
|||
return Optional<Value>();
|
||||
}
|
||||
|
||||
Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID = Optional<UID>()) override {
|
||||
Future<Optional<Value>> readValue(KeyRef key, IKeyValueStore::ReadType, Optional<UID> debugID) override {
|
||||
return catchError(readValue_impl(this, key, debugID));
|
||||
}
|
||||
|
||||
Future<Optional<Value>> readValuePrefix(KeyRef key,
|
||||
int maxLength,
|
||||
Optional<UID> debugID = Optional<UID>()) override {
|
||||
IKeyValueStore::ReadType,
|
||||
Optional<UID> debugID) override {
|
||||
return catchError(map(readValue_impl(this, key, debugID), [maxLength](Optional<Value> v) {
|
||||
if (v.present() && v.get().size() > maxLength) {
|
||||
v.get().contents() = v.get().substr(0, maxLength);
|
||||
|
|
|
@ -195,15 +195,25 @@ struct StorageServerDisk {
|
|||
Future<Void> commit() { return storage->commit(); }
|
||||
|
||||
// SOMEDAY: Put readNextKeyInclusive in IKeyValueStore
|
||||
Future<Key> readNextKeyInclusive(KeyRef key) { return readFirstKey(storage, KeyRangeRef(key, allKeys.end)); }
|
||||
Future<Optional<Value>> readValue(KeyRef key, Optional<UID> debugID = Optional<UID>()) {
|
||||
return storage->readValue(key, debugID);
|
||||
Future<Key> readNextKeyInclusive(KeyRef key, IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL) {
|
||||
return readFirstKey(storage, KeyRangeRef(key, allKeys.end), type);
|
||||
}
|
||||
Future<Optional<Value>> readValuePrefix(KeyRef key, int maxLength, Optional<UID> debugID = Optional<UID>()) {
|
||||
return storage->readValuePrefix(key, maxLength, debugID);
|
||||
Future<Optional<Value>> readValue(KeyRef key,
|
||||
IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL,
|
||||
Optional<UID> debugID = Optional<UID>()) {
|
||||
return storage->readValue(key, type, debugID);
|
||||
}
|
||||
Future<RangeResult> readRange(KeyRangeRef keys, int rowLimit = 1 << 30, int byteLimit = 1 << 30) {
|
||||
return storage->readRange(keys, rowLimit, byteLimit);
|
||||
Future<Optional<Value>> readValuePrefix(KeyRef key,
|
||||
int maxLength,
|
||||
IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL,
|
||||
Optional<UID> debugID = Optional<UID>()) {
|
||||
return storage->readValuePrefix(key, maxLength, type, debugID);
|
||||
}
|
||||
Future<RangeResult> readRange(KeyRangeRef keys,
|
||||
int rowLimit = 1 << 30,
|
||||
int byteLimit = 1 << 30,
|
||||
IKeyValueStore::ReadType type = IKeyValueStore::ReadType::NORMAL) {
|
||||
return storage->readRange(keys, rowLimit, byteLimit, type);
|
||||
}
|
||||
|
||||
KeyValueStoreType getKeyValueStoreType() const { return storage->getType(); }
|
||||
|
@ -216,8 +226,8 @@ private:
|
|||
|
||||
void writeMutations(const VectorRef<MutationRef>& mutations, Version debugVersion, const char* debugContext);
|
||||
|
||||
ACTOR static Future<Key> readFirstKey(IKeyValueStore* storage, KeyRangeRef range) {
|
||||
RangeResult r = wait(storage->readRange(range, 1));
|
||||
ACTOR static Future<Key> readFirstKey(IKeyValueStore* storage, KeyRangeRef range, IKeyValueStore::ReadType type) {
|
||||
RangeResult r = wait(storage->readRange(range, 1, 1 << 30, type));
|
||||
if (r.size())
|
||||
return r[0].key;
|
||||
else
|
||||
|
@ -1282,7 +1292,7 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
|
|||
path = 1;
|
||||
} else if (!i || !i->isClearTo() || i->getEndKey() <= req.key) {
|
||||
path = 2;
|
||||
Optional<Value> vv = wait(data->storage.readValue(req.key, req.debugID));
|
||||
Optional<Value> vv = wait(data->storage.readValue(req.key, IKeyValueStore::ReadType::NORMAL, req.debugID));
|
||||
// Validate that while we were reading the data we didn't lose the version or shard
|
||||
if (version < data->storageVersion()) {
|
||||
TEST(true); // transaction_too_old after readValue
|
||||
|
@ -1631,7 +1641,8 @@ ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
|
|||
KeyRange range,
|
||||
int limit,
|
||||
int* pLimitBytes,
|
||||
SpanID parentSpan) {
|
||||
SpanID parentSpan,
|
||||
IKeyValueStore::ReadType type) {
|
||||
state GetKeyValuesReply result;
|
||||
state StorageServer::VersionedData::ViewAtVersion view = data->data().at(version);
|
||||
state StorageServer::VersionedData::iterator vCurrent = view.end();
|
||||
|
@ -1695,7 +1706,7 @@ ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
|
|||
// Read the data on disk up to vCurrent (or the end of the range)
|
||||
readEnd = vCurrent ? std::min(vCurrent.key(), range.end) : range.end;
|
||||
RangeResult atStorageVersion =
|
||||
wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes));
|
||||
wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes, type));
|
||||
|
||||
ASSERT(atStorageVersion.size() <= limit);
|
||||
if (data->storageVersion() > version)
|
||||
|
@ -1776,7 +1787,7 @@ ACTOR Future<GetKeyValuesReply> readRange(StorageServer* data,
|
|||
readBegin = vCurrent ? std::max(vCurrent->isClearTo() ? vCurrent->getEndKey() : vCurrent.key(), range.begin)
|
||||
: range.begin;
|
||||
RangeResult atStorageVersion =
|
||||
wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes));
|
||||
wait(data->storage.readRange(KeyRangeRef(readBegin, readEnd), limit, *pLimitBytes, type));
|
||||
|
||||
ASSERT(atStorageVersion.size() <= -limit);
|
||||
if (data->storageVersion() > version)
|
||||
|
@ -1833,7 +1844,8 @@ ACTOR Future<Key> findKey(StorageServer* data,
|
|||
Version version,
|
||||
KeyRange range,
|
||||
int* pOffset,
|
||||
SpanID parentSpan)
|
||||
SpanID parentSpan,
|
||||
IKeyValueStore::ReadType type)
|
||||
// Attempts to find the key indicated by sel in the data at version, within range.
|
||||
// Precondition: selectorInRange(sel, range)
|
||||
// If it is found, offset is set to 0 and a key is returned which falls inside range.
|
||||
|
@ -1871,7 +1883,8 @@ ACTOR Future<Key> findKey(StorageServer* data,
|
|||
forward ? KeyRangeRef(sel.getKey(), range.end) : KeyRangeRef(range.begin, keyAfter(sel.getKey())),
|
||||
(distance + skipEqualKey) * sign,
|
||||
&maxBytes,
|
||||
span.context));
|
||||
span.context,
|
||||
type));
|
||||
state bool more = rep.more && rep.data.size() != distance + skipEqualKey;
|
||||
|
||||
// If we get only one result in the reverse direction as a result of the data being too large, we could get stuck in
|
||||
|
@ -1879,8 +1892,8 @@ ACTOR Future<Key> findKey(StorageServer* data,
|
|||
if (more && !forward && rep.data.size() == 1) {
|
||||
TEST(true); // Reverse key selector returned only one result in range read
|
||||
maxBytes = std::numeric_limits<int>::max();
|
||||
GetKeyValuesReply rep2 = wait(
|
||||
readRange(data, version, KeyRangeRef(range.begin, keyAfter(sel.getKey())), -2, &maxBytes, span.context));
|
||||
GetKeyValuesReply rep2 = wait(readRange(
|
||||
data, version, KeyRangeRef(range.begin, keyAfter(sel.getKey())), -2, &maxBytes, span.context, type));
|
||||
rep = rep2;
|
||||
more = rep.more && rep.data.size() != distance + skipEqualKey;
|
||||
ASSERT(rep.data.size() == 2 || !more);
|
||||
|
@ -1945,6 +1958,8 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
|
|||
{
|
||||
state Span span("SS:getKeyValues"_loc, { req.spanContext });
|
||||
state int64_t resultSize = 0;
|
||||
state IKeyValueStore::ReadType type =
|
||||
req.isFetchKeys ? IKeyValueStore::ReadType::FETCH : IKeyValueStore::ReadType::NORMAL;
|
||||
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
|
||||
|
||||
++data->counters.getRangeQueries;
|
||||
|
@ -1989,10 +2004,10 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
|
|||
state int offset2;
|
||||
state Future<Key> fBegin = req.begin.isFirstGreaterOrEqual()
|
||||
? Future<Key>(req.begin.getKey())
|
||||
: findKey(data, req.begin, version, shard, &offset1, span.context);
|
||||
: findKey(data, req.begin, version, shard, &offset1, span.context, type);
|
||||
state Future<Key> fEnd = req.end.isFirstGreaterOrEqual()
|
||||
? Future<Key>(req.end.getKey())
|
||||
: findKey(data, req.end, version, shard, &offset2, span.context);
|
||||
: findKey(data, req.end, version, shard, &offset2, span.context, type);
|
||||
state Key begin = wait(fBegin);
|
||||
state Key end = wait(fEnd);
|
||||
|
||||
|
@ -2032,8 +2047,8 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
|
|||
} else {
|
||||
state int remainingLimitBytes = req.limitBytes;
|
||||
|
||||
GetKeyValuesReply _r =
|
||||
wait(readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes, span.context));
|
||||
GetKeyValuesReply _r = wait(
|
||||
readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes, span.context, type));
|
||||
GetKeyValuesReply r = _r;
|
||||
|
||||
if (req.debugID.present())
|
||||
|
@ -2110,6 +2125,8 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
|
|||
{
|
||||
state Span span("SS:getKeyValuesStream"_loc, { req.spanContext });
|
||||
state int64_t resultSize = 0;
|
||||
state IKeyValueStore::ReadType type =
|
||||
req.isFetchKeys ? IKeyValueStore::ReadType::FETCH : IKeyValueStore::ReadType::NORMAL;
|
||||
|
||||
req.reply.setByteLimit(SERVER_KNOBS->RANGESTREAM_LIMIT_BYTES);
|
||||
++data->counters.getRangeStreamQueries;
|
||||
|
@ -2155,10 +2172,10 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
|
|||
state int offset2;
|
||||
state Future<Key> fBegin = req.begin.isFirstGreaterOrEqual()
|
||||
? Future<Key>(req.begin.getKey())
|
||||
: findKey(data, req.begin, version, shard, &offset1, span.context);
|
||||
: findKey(data, req.begin, version, shard, &offset1, span.context, type);
|
||||
state Future<Key> fEnd = req.end.isFirstGreaterOrEqual()
|
||||
? Future<Key>(req.end.getKey())
|
||||
: findKey(data, req.end, version, shard, &offset2, span.context);
|
||||
: findKey(data, req.end, version, shard, &offset2, span.context, type);
|
||||
state Key begin = wait(fBegin);
|
||||
state Key end = wait(fEnd);
|
||||
if (req.debugID.present())
|
||||
|
@ -2207,7 +2224,7 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
|
|||
? 1
|
||||
: CLIENT_KNOBS->REPLY_BYTE_LIMIT;
|
||||
GetKeyValuesReply _r =
|
||||
wait(readRange(data, version, KeyRangeRef(begin, end), req.limit, &byteLimit, span.context));
|
||||
wait(readRange(data, version, KeyRangeRef(begin, end), req.limit, &byteLimit, span.context, type));
|
||||
GetKeyValuesStreamReply r(_r);
|
||||
|
||||
if (req.debugID.present())
|
||||
|
@ -2308,7 +2325,8 @@ ACTOR Future<Void> getKeyQ(StorageServer* data, GetKeyRequest req) {
|
|||
state KeyRange shard = getShardKeyRange(data, req.sel);
|
||||
|
||||
state int offset;
|
||||
Key k = wait(findKey(data, req.sel, version, shard, &offset, req.spanContext));
|
||||
Key k =
|
||||
wait(findKey(data, req.sel, version, shard, &offset, req.spanContext, IKeyValueStore::ReadType::NORMAL));
|
||||
|
||||
data->checkChangeCounter(
|
||||
changeCounter, KeyRangeRef(std::min<KeyRef>(req.sel.getKey(), k), std::max<KeyRef>(req.sel.getKey(), k)));
|
||||
|
@ -2406,7 +2424,7 @@ ACTOR Future<Void> doEagerReads(StorageServer* data, UpdateEagerReadInfo* eager)
|
|||
if (SERVER_KNOBS->ENABLE_CLEAR_RANGE_EAGER_READS) {
|
||||
std::vector<Future<Key>> keyEnd(eager->keyBegin.size());
|
||||
for (int i = 0; i < keyEnd.size(); i++)
|
||||
keyEnd[i] = data->storage.readNextKeyInclusive(eager->keyBegin[i]);
|
||||
keyEnd[i] = data->storage.readNextKeyInclusive(eager->keyBegin[i], IKeyValueStore::ReadType::EAGER);
|
||||
|
||||
state Future<std::vector<Key>> futureKeyEnds = getAll(keyEnd);
|
||||
state std::vector<Key> keyEndVal = wait(futureKeyEnds);
|
||||
|
@ -2415,7 +2433,8 @@ ACTOR Future<Void> doEagerReads(StorageServer* data, UpdateEagerReadInfo* eager)
|
|||
|
||||
std::vector<Future<Optional<Value>>> value(eager->keys.size());
|
||||
for (int i = 0; i < value.size(); i++)
|
||||
value[i] = data->storage.readValuePrefix(eager->keys[i].first, eager->keys[i].second);
|
||||
value[i] =
|
||||
data->storage.readValuePrefix(eager->keys[i].first, eager->keys[i].second, IKeyValueStore::ReadType::EAGER);
|
||||
|
||||
state Future<std::vector<Optional<Value>>> futureValues = getAll(value);
|
||||
std::vector<Optional<Value>> optionalValues = wait(futureValues);
|
||||
|
|
|
@ -57,6 +57,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
|
|||
init( SLOWTASK_PROFILING_LOG_INTERVAL, 0 ); // A value of 0 means use RUN_LOOP_PROFILING_INTERVAL
|
||||
init( SLOWTASK_PROFILING_MAX_LOG_INTERVAL, 1.0 );
|
||||
init( SLOWTASK_PROFILING_LOG_BACKOFF, 2.0 );
|
||||
init( SLOWTASK_BLOCKED_INTERVAL, 60.0 );
|
||||
init( SATURATION_PROFILING_LOG_INTERVAL, 0.5 ); // A value of 0 means use RUN_LOOP_PROFILING_INTERVAL
|
||||
init( SATURATION_PROFILING_MAX_LOG_INTERVAL, 5.0 );
|
||||
init( SATURATION_PROFILING_LOG_BACKOFF, 2.0 );
|
||||
|
|
|
@ -125,6 +125,7 @@ public:
|
|||
double SLOWTASK_PROFILING_LOG_INTERVAL;
|
||||
double SLOWTASK_PROFILING_MAX_LOG_INTERVAL;
|
||||
double SLOWTASK_PROFILING_LOG_BACKOFF;
|
||||
double SLOWTASK_BLOCKED_INTERVAL;
|
||||
double SATURATION_PROFILING_LOG_INTERVAL;
|
||||
double SATURATION_PROFILING_MAX_LOG_INTERVAL;
|
||||
double SATURATION_PROFILING_LOG_BACKOFF;
|
||||
|
|
|
@ -3581,8 +3581,10 @@ void* checkThread(void* arg) {
|
|||
int64_t lastRunLoopIterations = net2RunLoopIterations.load();
|
||||
int64_t lastRunLoopSleeps = net2RunLoopSleeps.load();
|
||||
|
||||
double slowTaskStart = 0;
|
||||
double lastSlowTaskSignal = 0;
|
||||
double lastSaturatedSignal = 0;
|
||||
double lastSlowTaskBlockedLog = 0;
|
||||
|
||||
const double minSlowTaskLogInterval =
|
||||
std::max(FLOW_KNOBS->SLOWTASK_PROFILING_LOG_INTERVAL, FLOW_KNOBS->RUN_LOOP_PROFILING_INTERVAL);
|
||||
|
@ -3603,7 +3605,19 @@ void* checkThread(void* arg) {
|
|||
|
||||
if (slowTask) {
|
||||
double t = timer();
|
||||
if (lastSlowTaskSignal == 0 || t - lastSlowTaskSignal >= slowTaskLogInterval) {
|
||||
bool newSlowTask = lastSlowTaskSignal == 0;
|
||||
|
||||
if (newSlowTask) {
|
||||
slowTaskStart = t;
|
||||
} else if (t - std::max(slowTaskStart, lastSlowTaskBlockedLog) > FLOW_KNOBS->SLOWTASK_BLOCKED_INTERVAL) {
|
||||
lastSlowTaskBlockedLog = t;
|
||||
// When this gets logged, it will be with a current timestamp (using timer()). If the network thread
|
||||
// unblocks, it will log any slow task related events at an earlier timestamp. That means the order of
|
||||
// events during this sequence will not match their timestamp order.
|
||||
TraceEvent(SevWarnAlways, "RunLoopBlocked").detail("Duration", t - slowTaskStart);
|
||||
}
|
||||
|
||||
if (newSlowTask || t - lastSlowTaskSignal >= slowTaskLogInterval) {
|
||||
if (lastSlowTaskSignal > 0) {
|
||||
slowTaskLogInterval = std::min(FLOW_KNOBS->SLOWTASK_PROFILING_MAX_LOG_INTERVAL,
|
||||
FLOW_KNOBS->SLOWTASK_PROFILING_LOG_BACKOFF * slowTaskLogInterval);
|
||||
|
@ -3614,6 +3628,7 @@ void* checkThread(void* arg) {
|
|||
pthread_kill(mainThread, SIGPROF);
|
||||
}
|
||||
} else {
|
||||
slowTaskStart = 0;
|
||||
lastSlowTaskSignal = 0;
|
||||
lastRunLoopIterations = currentRunLoopIterations;
|
||||
slowTaskLogInterval = minSlowTaskLogInterval;
|
||||
|
|
|
@ -1,17 +1,17 @@
|
|||
#!/bin/sh
|
||||
#!/usr/bin/env bash
|
||||
set -Eeuo pipefail
|
||||
set -x
|
||||
|
||||
set -euxo pipefail
|
||||
DOCKER_ROOT="$(realpath "$(dirname "${BASH_SOURCE[0]}")")"
|
||||
BUILD_OUTPUT=$(realpath "${DOCKER_ROOT}"/../..)
|
||||
|
||||
DOCKER_ROOT=$(realpath $(dirname ${BASH_SOURCE[0]}))
|
||||
BUILD_OUTPUT=$(realpath ${DOCKER_ROOT}/../..)
|
||||
echo Docker root: "${DOCKER_ROOT}"
|
||||
echo Build output: "${BUILD_OUTPUT}"
|
||||
|
||||
echo Docker root: $DOCKER_ROOT
|
||||
echo Build output: $BUILD_OUTPUT
|
||||
|
||||
cd ${DOCKER_ROOT}
|
||||
cd "${DOCKER_ROOT}"
|
||||
|
||||
## eg: CMAKE_PROJECT_VERSION:STATIC=7.0.0
|
||||
FDB_VERSION=$(grep CMAKE_PROJECT_VERSION\: ${BUILD_OUTPUT}/CMakeCache.txt | cut -d '=' -f 2)
|
||||
FDB_VERSION=$(grep CMAKE_PROJECT_VERSION: "${BUILD_OUTPUT}"/CMakeCache.txt | cut -d '=' -f 2)
|
||||
|
||||
# Options (passed via environment variables)
|
||||
|
||||
|
@ -20,18 +20,18 @@ FDB_VERSION=$(grep CMAKE_PROJECT_VERSION\: ${BUILD_OUTPUT}/CMakeCache.txt | cut
|
|||
TAG=${TAG:-${FDB_VERSION}-${OKTETO_NAME}}
|
||||
ECR=${ECR:-112664522426.dkr.ecr.us-west-2.amazonaws.com}
|
||||
|
||||
echo Building with tag ${TAG}
|
||||
echo Building with tag "${TAG}"
|
||||
|
||||
# Login to ECR
|
||||
# TODO: Move this to a common place instead of repeatedly copy-pasting it.
|
||||
aws ecr get-login-password | docker login --username AWS --password-stdin ${ECR}
|
||||
aws ecr get-login-password | docker login --username AWS --password-stdin "${ECR}"
|
||||
|
||||
docker pull ${ECR}/amazonlinux:2.0.20210326.0
|
||||
docker tag ${ECR}/amazonlinux:2.0.20210326.0 amazonlinux:2.0.20210326.0
|
||||
docker pull "${ECR}"/amazonlinux:2.0.20210326.0
|
||||
docker tag "${ECR}"/amazonlinux:2.0.20210326.0 amazonlinux:2.0.20210326.0
|
||||
|
||||
|
||||
|
||||
#derived variables
|
||||
# derived variables
|
||||
IMAGE=foundationdb/foundationdb:${TAG}
|
||||
SIDECAR=foundationdb/foundationdb-kubernetes-sidecar:${TAG}-1
|
||||
STRIPPED=${STRIPPED:-false}
|
||||
|
@ -41,20 +41,22 @@ STRIPPED=${STRIPPED:-false}
|
|||
|
||||
|
||||
if $STRIPPED; then
|
||||
rsync -av --delete --exclude=*.xml ${BUILD_OUTPUT}/packages/bin .
|
||||
rsync -av --delete --exclude=*.a --exclude=*.xml ${BUILD_OUTPUT}/packages/lib .
|
||||
rsync -av --delete --exclude=*.xml "${BUILD_OUTPUT}"/packages/bin .
|
||||
rsync -av --delete --exclude=*.a --exclude=*.xml "${BUILD_OUTPUT}"/packages/lib .
|
||||
else
|
||||
rsync -av --delete --exclude=*.xml ${BUILD_OUTPUT}/bin .
|
||||
rsync -av --delete --exclude=*.a --exclude=*.xml ${BUILD_OUTPUT}/lib .
|
||||
rsync -av --delete --exclude=*.xml "${BUILD_OUTPUT}"/bin .
|
||||
rsync -av --delete --exclude=*.a --exclude=*.xml "${BUILD_OUTPUT}"/lib .
|
||||
fi
|
||||
|
||||
BUILD_ARGS="--build-arg FDB_VERSION=$FDB_VERSION"
|
||||
|
||||
docker build ${BUILD_ARGS} -t ${IMAGE} --target foundationdb -f Dockerfile.eks .
|
||||
docker build ${BUILD_ARGS} -t ${SIDECAR} --target sidecar -f Dockerfile.eks .
|
||||
|
||||
docker tag ${IMAGE} ${ECR}/${IMAGE}
|
||||
docker tag ${SIDECAR} ${ECR}/${SIDECAR}
|
||||
|
||||
docker push ${ECR}/${IMAGE}
|
||||
docker push ${ECR}/${SIDECAR}
|
||||
docker build ${BUILD_ARGS} -t "${IMAGE}" --target foundationdb -f Dockerfile.eks .
|
||||
docker build ${BUILD_ARGS} -t "${SIDECAR}" --target sidecar -f Dockerfile.eks .
|
||||
|
||||
docker tag "${IMAGE}" "${ECR}"/"${IMAGE}"
|
||||
docker tag "${SIDECAR}" "${ECR}"/"${SIDECAR}"
|
||||
|
||||
docker push "${ECR}"/"${IMAGE}"
|
||||
docker push "${ECR}"/"${SIDECAR}"
|
||||
|
|
|
@ -1,17 +1,17 @@
|
|||
#!/bin/bash
|
||||
#!/usr/bin/env bash
|
||||
set -Eeuo pipefail
|
||||
set -x
|
||||
|
||||
set -euxo pipefail
|
||||
DOCKER_ROOT="$(realpath "$(dirname "${BASH_SOURCE[0]}")")"
|
||||
BUILD_OUTPUT=$(realpath "${DOCKER_ROOT}"/../..)
|
||||
|
||||
DOCKER_ROOT=$(realpath $(dirname ${BASH_SOURCE[0]}))
|
||||
BUILD_OUTPUT=$(realpath ${DOCKER_ROOT}/../..)
|
||||
echo Docker root: "${DOCKER_ROOT}"
|
||||
echo Build output: "${BUILD_OUTPUT}"
|
||||
|
||||
echo Docker root: $DOCKER_ROOT
|
||||
echo Build output: $BUILD_OUTPUT
|
||||
|
||||
cd ${DOCKER_ROOT}
|
||||
cd "${DOCKER_ROOT}"
|
||||
|
||||
## eg: CMAKE_PROJECT_VERSION:STATIC=7.0.0
|
||||
FDB_VERSION=$(grep CMAKE_PROJECT_VERSION\: ${BUILD_OUTPUT}/CMakeCache.txt | cut -d '=' -f 2)
|
||||
FDB_VERSION=$(grep CMAKE_PROJECT_VERSION: "${BUILD_OUTPUT}"/CMakeCache.txt | cut -d '=' -f 2)
|
||||
|
||||
# Options (passed via environment variables)
|
||||
|
||||
|
@ -20,43 +20,43 @@ FDB_VERSION=$(grep CMAKE_PROJECT_VERSION\: ${BUILD_OUTPUT}/CMakeCache.txt | cut
|
|||
TAG=${TAG:-${FDB_VERSION}-${OKTETO_NAME}}
|
||||
ECR=${ECR:-112664522426.dkr.ecr.us-west-2.amazonaws.com}
|
||||
|
||||
echo Building with tag ${TAG}
|
||||
echo Building with tag "${TAG}"
|
||||
|
||||
# Login to ECR
|
||||
# TODO: Move this to a common place instead of repeatedly copy-pasting it.
|
||||
aws ecr get-login-password | docker login --username AWS --password-stdin ${ECR}
|
||||
aws ecr get-login-password | docker login --username AWS --password-stdin "${ECR}"
|
||||
|
||||
docker pull ${ECR}/ubuntu:18.04
|
||||
docker tag ${ECR}/ubuntu:18.04 ubuntu:18.04
|
||||
docker pull ${ECR}/python:3.9-slim
|
||||
docker tag ${ECR}/python:3.9-slim python:3.9-slim
|
||||
docker pull "${ECR}"/ubuntu:18.04
|
||||
docker tag "${ECR}"/ubuntu:18.04 ubuntu:18.04
|
||||
docker pull "${ECR}"/python:3.9-slim
|
||||
docker tag "${ECR}"/python:3.9-slim python:3.9-slim
|
||||
|
||||
# derived variables
|
||||
IMAGE=foundationdb/foundationdb:${TAG}
|
||||
SIDECAR=foundationdb/foundationdb-kubernetes-sidecar:${TAG}-1
|
||||
IMAGE=foundationdb/foundationdb:"${TAG}"
|
||||
SIDECAR=foundationdb/foundationdb-kubernetes-sidecar:"${TAG}"-1
|
||||
STRIPPED=${STRIPPED:-false}
|
||||
|
||||
WEBSITE_BIN_DIR=website/downloads/${FDB_VERSION}/linux/
|
||||
TARBALL=${WEBSITE_BIN_DIR}/fdb_${FDB_VERSION}.tar.gz
|
||||
mkdir -p ${WEBSITE_BIN_DIR}
|
||||
WEBSITE_BIN_DIR=website/downloads/"${FDB_VERSION}"/linux
|
||||
TARBALL=${WEBSITE_BIN_DIR}/fdb_"${FDB_VERSION}".tar.gz
|
||||
mkdir -p "${WEBSITE_BIN_DIR}"
|
||||
|
||||
if $STRIPPED; then
|
||||
tar -C ~/build_output/packages/ -zcvf ${TARBALL} bin lib
|
||||
cp ~/build_output/packages/lib/libfdb_c.so ${WEBSITE_BIN_DIR}/libfdb_c_${FDB_VERSION}.so
|
||||
tar -C ~/build_output/packages/ -zcvf "${TARBALL}" bin lib
|
||||
cp ~/build_output/packages/lib/libfdb_c.so "${WEBSITE_BIN_DIR}"/libfdb_c_"${FDB_VERSION}".so
|
||||
else
|
||||
tar -C ~/build_output/ -zcvf ${TARBALL} bin lib
|
||||
cp ~/build_output/lib/libfdb_c.so ${WEBSITE_BIN_DIR}/libfdb_c_${FDB_VERSION}.so
|
||||
tar -C ~/build_output/ -zcvf "${TARBALL}" bin lib
|
||||
cp ~/build_output/lib/libfdb_c.so "${WEBSITE_BIN_DIR}"/libfdb_c_"${FDB_VERSION}".so
|
||||
fi
|
||||
|
||||
BUILD_ARGS="--build-arg FDB_WEBSITE=file:///mnt/website "
|
||||
BUILD_ARGS+="--build-arg FDB_VERSION=$FDB_VERSION "
|
||||
BUILD_ARGS+="--build-arg FDB_ADDITIONAL_VERSIONS=$FDB_VERSION"
|
||||
BUILD_ARGS="--build-arg FDB_VERSION=${FDB_VERSION}"
|
||||
BUILD_ARGS+=" --build-arg FDB_WEBSITE=file:///mnt/website"
|
||||
BUILD_ARGS+=" --build-arg FDB_ADDITIONAL_VERSIONS=${FDB_VERSION}"
|
||||
|
||||
docker build -t ${IMAGE} ${BUILD_ARGS} -f release/Dockerfile .
|
||||
docker build -t ${SIDECAR} ${BUILD_ARGS} -f sidecar/Dockerfile .
|
||||
docker build ${BUILD_ARGS} -t "${IMAGE}" -f release/Dockerfile .
|
||||
docker build ${BUILD_ARGS} -t "${SIDECAR}" -f sidecar/Dockerfile .
|
||||
|
||||
docker tag ${IMAGE} ${ECR}/${IMAGE}
|
||||
docker tag ${SIDECAR} ${ECR}/${SIDECAR}
|
||||
docker tag "${IMAGE}" "${ECR}"/"${IMAGE}"
|
||||
docker tag "${SIDECAR}" "${ECR}"/"${SIDECAR}"
|
||||
|
||||
docker push ${ECR}/${IMAGE}
|
||||
docker push ${ECR}/${SIDECAR}
|
||||
docker push "${ECR}"/"${IMAGE}"
|
||||
docker push "${ECR}"/"${SIDECAR}"
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
#!/usr/bin/env bash
|
||||
set -Eeuo pipefail
|
||||
set -x
|
||||
|
||||
DOCKER_ROOT="$(realpath "$(dirname "${BASH_SOURCE[0]}")")"
|
||||
BUILD_OUTPUT=$(realpath "${DOCKER_ROOT}"/../..)
|
||||
|
||||
echo Docker root: "${DOCKER_ROOT}"
|
||||
echo Build output: "${BUILD_OUTPUT}"
|
||||
|
||||
cd "${DOCKER_ROOT}"
|
||||
|
||||
## eg: CMAKE_PROJECT_VERSION:STATIC=7.0.0
|
||||
FDB_VERSION=$(grep CMAKE_PROJECT_VERSION: "${BUILD_OUTPUT}"/CMakeCache.txt | cut -d '=' -f 2)
|
||||
|
||||
# Options (passed via environment variables)
|
||||
|
||||
# Feel free to customize the image tag.
|
||||
# TODO: add a mechanism to set TAG=FDB_VERSION when we're building public releases.
|
||||
TAG=${TAG:-${FDB_VERSION}-${OKTETO_NAME}}
|
||||
ECR=${ECR:-112664522426.dkr.ecr.us-west-2.amazonaws.com}
|
||||
|
||||
echo Building with tag "${TAG}"
|
||||
|
||||
# Login to ECR
|
||||
# TODO: Move this to a common place instead of repeatedly copy-pasting it.
|
||||
aws ecr get-login-password | docker login --username AWS --password-stdin "${ECR}"
|
||||
|
||||
docker pull "${ECR}"/openjdk:17-slim
|
||||
docker tag "${ECR}"/openjdk:17-slim openjdk:17-slim
|
||||
|
||||
|
||||
|
||||
# derived variables
|
||||
IMAGE=foundationdb/ycsb:"${TAG}"
|
||||
|
||||
# mvn install fdb-java, compile YCSB
|
||||
mvn install:install-file \
|
||||
-Dfile="${BUILD_OUTPUT}"/packages/fdb-java-"${FDB_VERSION}"-PRERELEASE.jar \
|
||||
-DgroupId=org.foundationdb \
|
||||
-DartifactId=fdb-java \
|
||||
-Dversion="${FDB_VERSION}"-PRERELEASE \
|
||||
-Dpackaging=jar \
|
||||
-DgeneratePom=true
|
||||
mkdir "${DOCKER_ROOT}"/YCSB && cd "${DOCKER_ROOT}"/YCSB
|
||||
git clone https://github.com/FoundationDB/YCSB.git .
|
||||
sed -i "s/<foundationdb.version>[0-9]\+.[0-9]\+.[0-9]\+<\/foundationdb.version>/<foundationdb.version>${FDB_VERSION}-PRERELEASE<\/foundationdb.version>/g" pom.xml
|
||||
mvn -pl site.ycsb:foundationdb-binding -am clean package
|
||||
mkdir -p core/target/dependency
|
||||
# shellcheck disable=SC2046
|
||||
cp $(find ~/.m2/ -name jax\*.jar) core/target/dependency/
|
||||
# shellcheck disable=SC2046
|
||||
cp $(find ~/.m2/ -name htrace\*.jar) core/target/dependency/
|
||||
# shellcheck disable=SC2046
|
||||
cp $(find ~/.m2/ -name HdrHistogram\*.jar) core/target/dependency/
|
||||
rm -rf .git && cd ..
|
||||
|
||||
docker build -t "${IMAGE}" -f ycsb/Dockerfile .
|
||||
|
||||
|
||||
docker tag "${IMAGE}" "${ECR}"/"${IMAGE}"
|
||||
|
||||
|
||||
docker push "${ECR}"/"${IMAGE}"
|
|
@ -2,7 +2,7 @@
|
|||
#
|
||||
# This source file is part of the FoundationDB open source project
|
||||
#
|
||||
# Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
# Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
|
@ -31,7 +31,8 @@ RUN apt-get update && \
|
|||
less>=487-0.1 \
|
||||
vim>=2:8.0.1453-1ubuntu1.4 \
|
||||
net-tools>=1.60+git20161116.90da8a0-1ubuntu1 \
|
||||
jq>=1.5+dfsg-2 && \
|
||||
jq>=1.5+dfsg-2 \
|
||||
openssl>=1.1.1-1ubuntu2.1~18.04.9 && \
|
||||
rm -rf /var/lib/apt/lists/*
|
||||
|
||||
COPY misc/tini-amd64.sha256sum /tmp/
|
||||
|
|
|
@ -24,4 +24,4 @@ if [[ -n "$ADDITIONAL_ENV_FILE" ]]; then
|
|||
source $ADDITIONAL_ENV_FILE
|
||||
fi
|
||||
|
||||
/sidecar.py $*
|
||||
exec /sidecar.py $*
|
||||
|
|
Loading…
Reference in New Issue