Merge branch 'master' into add-data-distribution-metrics
This commit is contained in:
commit
e2b6acffce
|
@ -52,25 +52,6 @@ struct FDBLibTLSVerifyTest {
|
|||
std::map<int, Criteria> root_criteria;
|
||||
};
|
||||
|
||||
static std::string printable( std::string const& val ) {
|
||||
static char const digits[] = "0123456789ABCDEF";
|
||||
std::string s;
|
||||
|
||||
for ( int i = 0; i < val.size(); i++ ) {
|
||||
uint8_t b = val[i];
|
||||
if (b >= 32 && b < 127 && b != '\\')
|
||||
s += (char)b;
|
||||
else if (b == '\\')
|
||||
s += "\\\\";
|
||||
else {
|
||||
s += "\\x";
|
||||
s += digits[(b >> 4) & 15];
|
||||
s += digits[b & 15];
|
||||
}
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
static std::string criteriaToString(std::map<int, Criteria> const& criteria) {
|
||||
std::string s;
|
||||
for (auto &pair: criteria) {
|
||||
|
|
|
@ -49,17 +49,30 @@ endif()
|
|||
|
||||
# The tests don't build on windows
|
||||
if(NOT WIN32)
|
||||
set(MAKO_SRCS
|
||||
test/mako/mako.c
|
||||
test/mako/mako.h
|
||||
test/mako/utils.c
|
||||
test/mako/utils.h
|
||||
test/mako/zipf.c
|
||||
test/mako/zipf.h)
|
||||
|
||||
if(OPEN_FOR_IDE)
|
||||
add_library(fdb_c_performance_test OBJECT test/performance_test.c test/test.h)
|
||||
add_library(fdb_c_ryw_benchmark OBJECT test/ryw_benchmark.c test/test.h)
|
||||
add_library(mako OBJECT ${MAKO_SRCS})
|
||||
else()
|
||||
add_executable(fdb_c_performance_test test/performance_test.c test/test.h)
|
||||
add_executable(fdb_c_ryw_benchmark test/ryw_benchmark.c test/test.h)
|
||||
add_executable(mako ${MAKO_SRCS})
|
||||
strip_debug_symbols(fdb_c_performance_test)
|
||||
strip_debug_symbols(fdb_c_ryw_benchmark)
|
||||
endif()
|
||||
target_link_libraries(fdb_c_performance_test PRIVATE fdb_c)
|
||||
target_link_libraries(fdb_c_ryw_benchmark PRIVATE fdb_c)
|
||||
# do not set RPATH for mako
|
||||
set_property(TARGET mako PROPERTY SKIP_BUILD_RPATH TRUE)
|
||||
target_link_libraries(mako PRIVATE fdb_c)
|
||||
endif()
|
||||
|
||||
# TODO: re-enable once the old vcxproj-based build system is removed.
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,138 @@
|
|||
#ifndef MAKO_H
|
||||
#define MAKO_H
|
||||
#pragma once
|
||||
|
||||
#ifndef FDB_API_VERSION
|
||||
#define FDB_API_VERSION 610
|
||||
#endif
|
||||
|
||||
#include <foundationdb/fdb_c.h>
|
||||
#include <pthread.h>
|
||||
#include <sys/types.h>
|
||||
#if defined(__linux__)
|
||||
#include <linux/limits.h>
|
||||
#elif defined(__APPLE__)
|
||||
#include <sys/syslimits.h>
|
||||
#else
|
||||
#include <limits.h>
|
||||
#endif
|
||||
|
||||
#define VERBOSE_NONE 0
|
||||
#define VERBOSE_DEFAULT 1
|
||||
#define VERBOSE_ANNOYING 2
|
||||
#define VERBOSE_DEBUG 3
|
||||
|
||||
#define MODE_INVALID -1
|
||||
#define MODE_CLEAN 0
|
||||
#define MODE_BUILD 1
|
||||
#define MODE_RUN 2
|
||||
|
||||
/* we set mako_txn_t and mako_args_t only once in the master process,
|
||||
* and won't be touched by child processes.
|
||||
*/
|
||||
|
||||
/* transaction specification */
|
||||
#define OP_GETREADVERSION 0
|
||||
#define OP_GET 1
|
||||
#define OP_GETRANGE 2
|
||||
#define OP_SGET 3
|
||||
#define OP_SGETRANGE 4
|
||||
#define OP_UPDATE 5
|
||||
#define OP_INSERT 6
|
||||
#define OP_INSERTRANGE 7
|
||||
#define OP_CLEAR 8
|
||||
#define OP_SETCLEAR 9
|
||||
#define OP_CLEARRANGE 10
|
||||
#define OP_SETCLEARRANGE 11
|
||||
#define OP_COMMIT 12
|
||||
#define MAX_OP 13 /* update this when adding a new operation */
|
||||
|
||||
#define OP_COUNT 0
|
||||
#define OP_RANGE 1
|
||||
#define OP_REVERSE 2
|
||||
|
||||
/* for arguments */
|
||||
#define ARG_KEYLEN 1
|
||||
#define ARG_VALLEN 2
|
||||
#define ARG_TPS 3
|
||||
#define ARG_COMMITGET 4
|
||||
#define ARG_SAMPLING 5
|
||||
#define ARG_VERSION 6
|
||||
#define ARG_KNOBS 7
|
||||
#define ARG_FLATBUFFERS 8
|
||||
#define ARG_TRACE 9
|
||||
#define ARG_TRACEPATH 10
|
||||
|
||||
#define KEYPREFIX "mako"
|
||||
#define KEYPREFIXLEN 4
|
||||
|
||||
typedef struct {
|
||||
/* for each operation, it stores "count", "range" and "reverse" */
|
||||
int ops[MAX_OP][3];
|
||||
} mako_txnspec_t;
|
||||
|
||||
#define KNOB_MAX 256
|
||||
|
||||
/* benchmark parameters */
|
||||
typedef struct {
|
||||
int json;
|
||||
int num_processes;
|
||||
int num_threads;
|
||||
int mode;
|
||||
int rows; /* is 2 billion enough? */
|
||||
int seconds;
|
||||
int iteration;
|
||||
int tps;
|
||||
int sampling;
|
||||
int key_length;
|
||||
int value_length;
|
||||
int zipf;
|
||||
int commit_get;
|
||||
int verbose;
|
||||
mako_txnspec_t txnspec;
|
||||
char cluster_file[PATH_MAX];
|
||||
int trace;
|
||||
char tracepath[PATH_MAX];
|
||||
char knobs[KNOB_MAX];
|
||||
uint8_t flatbuffers;
|
||||
} mako_args_t;
|
||||
|
||||
/* shared memory */
|
||||
#define SIGNAL_RED 0
|
||||
#define SIGNAL_GREEN 1
|
||||
#define SIGNAL_OFF 2
|
||||
|
||||
typedef struct {
|
||||
int signal;
|
||||
int readycount;
|
||||
} mako_shmhdr_t;
|
||||
|
||||
typedef struct {
|
||||
uint64_t xacts;
|
||||
uint64_t conflicts;
|
||||
uint64_t ops[MAX_OP];
|
||||
uint64_t errors[MAX_OP];
|
||||
uint64_t latency_samples[MAX_OP];
|
||||
uint64_t latency_us_total[MAX_OP];
|
||||
uint64_t latency_us_min[MAX_OP];
|
||||
uint64_t latency_us_max[MAX_OP];
|
||||
} mako_stats_t;
|
||||
|
||||
/* per-process information */
|
||||
typedef struct {
|
||||
int worker_id;
|
||||
FDBDatabase *database;
|
||||
mako_args_t *args;
|
||||
mako_shmhdr_t *shm;
|
||||
} process_info_t;
|
||||
|
||||
/* args for threads */
|
||||
typedef struct {
|
||||
int thread_id;
|
||||
process_info_t *process;
|
||||
} thread_args_t;
|
||||
|
||||
/* process type */
|
||||
typedef enum { proc_master = 0, proc_worker, proc_stats } proc_type_t;
|
||||
|
||||
#endif /* MAKO_H */
|
|
@ -0,0 +1,160 @@
|
|||
##############
|
||||
mako Benchmark
|
||||
##############
|
||||
|
||||
| mako (named after a small, but very fast shark) is a micro-benchmark for FoundationDB
|
||||
| which is designed to be very light and flexible
|
||||
| so that you can stress a particular part of an FoundationDB cluster without introducing unnecessary overhead.
|
||||
|
||||
|
||||
How to Build
|
||||
============
|
||||
| ``mako`` gets build automatically when you build FoundationDB.
|
||||
| To build ``mako`` manually, simply build ``mako`` target in the FoundationDB build directory.
|
||||
| e.g. If you're using Unix Makefiles
|
||||
| ``make mako``
|
||||
|
||||
|
||||
Architecture
|
||||
============
|
||||
- mako is a stand-alone program written in C,
|
||||
which communicates to FoundationDB using C binding API (``libfdb_c.so``)
|
||||
- It creates one master process, and one or more worker processes (multi-process)
|
||||
- Each worker process creates one or more multiple threads (multi-thread)
|
||||
- All threads within the same process share the same network thread
|
||||
|
||||
|
||||
Data Specification
|
||||
==================
|
||||
- Key has a fixed prefix + sequential number + padding (e.g. ``mako000000xxxxxx``)
|
||||
- Value is a random string (e.g. ``;+)Rf?H.DS&UmZpf``)
|
||||
|
||||
|
||||
Arguments
|
||||
=========
|
||||
- | ``--mode <mode>``
|
||||
| One of the following modes must be specified. (Required)
|
||||
| - ``clean``: Clean up existing data
|
||||
| - ``build``: Populate data
|
||||
| - ``run``: Run the benchmark
|
||||
|
||||
- | ``-c | --cluster <cluster file>``
|
||||
| FDB cluster file (Required)
|
||||
|
||||
- | ``-p | --procs <procs>``
|
||||
| Number of worker processes (Default: 1)
|
||||
|
||||
- | ``-t | --threads <threads>``
|
||||
| Number of threads per worker process (Default: 1)
|
||||
|
||||
- | ``-r | --rows <rows>``
|
||||
| Number of rows populated (Default: 10000)
|
||||
|
||||
- | ``-s | --seconds <seconds>``
|
||||
| Test duration in seconds (Default: 30)
|
||||
| This option cannot be set with ``--iteration``.
|
||||
|
||||
- | ``-i | --iteration <iters>``
|
||||
| Specify the number of operations to be executed.
|
||||
| This option cannot be set with ``--seconds``.
|
||||
|
||||
- | ``--tps <tps>``
|
||||
| Target total transaction-per-second (TPS) of all worker processes/threads
|
||||
| (Default: Unset / Unthrottled)
|
||||
|
||||
- | ``--keylen <num>``
|
||||
| Key string length in bytes (Default and Minimum: 16)
|
||||
|
||||
- | ``--vallen <num>``
|
||||
| Value string length in bytes (Default and Minimum: 16)
|
||||
|
||||
- | ``-x | --transaction <string>``
|
||||
| Transaction specification described in details in the following section. (Default: ``g10``)
|
||||
|
||||
- | ``-z | --zipf``
|
||||
| Generate a skewed workload based on Zipf distribution (Default: Unset = Uniform)
|
||||
|
||||
- | ``--sampling <num>``
|
||||
| Sampling rate (1 sample / <num> ops) for latency stats
|
||||
|
||||
- | ``--trace``
|
||||
| Enable tracing. The trace file will be created in the current directory.
|
||||
|
||||
- | ``--tracepath <path>``
|
||||
| Enable tracing and set the trace file path.
|
||||
|
||||
- | ``--knobs <knobs>``
|
||||
| Set client knobs
|
||||
|
||||
- | ``--flatbuffers``
|
||||
| Enable flatbuffers
|
||||
|
||||
- | ``--commitget``
|
||||
| Force commit for read-only transactions
|
||||
|
||||
- | ``-v | --verbose <level>``
|
||||
| Set verbose level (Default: 1)
|
||||
| - 0 – Minimal
|
||||
| - 1 – Default
|
||||
| - 2 – Annoying
|
||||
| - 3 – Very Annoying (a.k.a. DEBUG)
|
||||
|
||||
|
||||
Transaction Specification
|
||||
=========================
|
||||
| A transaction may contain multiple operations of multiple types.
|
||||
| You can specify multiple operations for one operation type by specifying "Count".
|
||||
| For RANGE operations, "Range" needs to be specified in addition to "Count".
|
||||
| Every transaction is committed unless it contains only GET / GET RANGE operations.
|
||||
|
||||
Operation Types
|
||||
---------------
|
||||
- ``g`` – GET
|
||||
- ``gr`` – GET RANGE
|
||||
- ``sg`` – Snapshot GET
|
||||
- ``sgr`` – Snapshot GET RANGE
|
||||
- ``u`` – Update (= GET followed by SET)
|
||||
- ``i`` – Insert (= SET with a new key)
|
||||
- ``ir`` – Insert Range (Sequential)
|
||||
- ``c`` – CLEAR
|
||||
- ``sc`` – SET & CLEAR
|
||||
- ``cr`` – CLEAR RANGE
|
||||
- ``scr`` – SET & CLEAR RANGE
|
||||
- ``grv`` – GetReadVersion()
|
||||
|
||||
Format
|
||||
------
|
||||
| One operation type is defined as ``<Type><Count>`` or ``<Type><Count>:<Range>``.
|
||||
| When Count is omitted, it's equivalent to setting it to 1. (e.g. ``g`` is equivalent to ``g1``)
|
||||
| Multiple operation types can be concatenated. (e.g. ``g9u1`` = 9 GETs and 1 update)
|
||||
|
||||
Transaction Specification Examples
|
||||
----------------------------------
|
||||
- | 100 GETs (No Commit)
|
||||
| ``g100``
|
||||
|
||||
- | 10 GET RANGE with Range of 50 (No Commit)
|
||||
| ``gr10:50``
|
||||
|
||||
- | 90 GETs and 10 Updates (Committed)
|
||||
| ``g90u10``
|
||||
|
||||
- | 80 GETs, 10 Updates and 10 Inserts (Committed)
|
||||
| ``g90u10i10``
|
||||
|
||||
|
||||
Execution Examples
|
||||
==================
|
||||
|
||||
Preparation
|
||||
-----------
|
||||
- Start the FoundationDB cluster and create a database
|
||||
- Set LD_LIBRARY_PATH pointing to a proper ``libfdb_c.so``
|
||||
|
||||
Build
|
||||
-----
|
||||
``mako --cluster /etc/foundationdb/fdb.cluster --mode build --rows 1000000 --procs 4``
|
||||
|
||||
Run
|
||||
---
|
||||
``mako --cluster /etc/foundationdb/fdb.cluster --mode run --rows 1000000 --procs 2 --threads 8 --transaction "g8ui" --seconds 60 --tps 1000``
|
|
@ -0,0 +1,81 @@
|
|||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <math.h>
|
||||
#include "utils.h"
|
||||
#include "mako.h"
|
||||
|
||||
/* uniform-distribution random */
|
||||
int urand(int low, int high) {
|
||||
double r = rand() / (1.0 + RAND_MAX);
|
||||
int range = high - low + 1;
|
||||
return (int)((r * range) + low);
|
||||
}
|
||||
|
||||
/* random string */
|
||||
/* len is the buffer size, must include null */
|
||||
void randstr(char *str, int len) {
|
||||
int i;
|
||||
for (i = 0; i < len-1; i++) {
|
||||
str[i] = '!' + urand(0, 'z'-'!'); /* generage a char from '!' to 'z' */
|
||||
}
|
||||
str[len-1] = '\0';
|
||||
}
|
||||
|
||||
/* random numeric string */
|
||||
/* len is the buffer size, must include null */
|
||||
void randnumstr(char *str, int len) {
|
||||
int i;
|
||||
for (i = 0; i < len-1; i++) {
|
||||
str[i] = '0' + urand(0, 9); /* generage a char from '!' to 'z' */
|
||||
}
|
||||
str[len-1] = '\0';
|
||||
}
|
||||
|
||||
/* return the first key to be inserted */
|
||||
int insert_begin(int rows, int p_idx, int t_idx, int total_p, int total_t) {
|
||||
double interval = (double)rows / total_p / total_t;
|
||||
return (int)(round(interval * ((p_idx * total_t) + t_idx)));
|
||||
}
|
||||
|
||||
/* return the last key to be inserted */
|
||||
int insert_end(int rows, int p_idx, int t_idx, int total_p, int total_t) {
|
||||
double interval = (double)rows / total_p / total_t;
|
||||
return (int)(round(interval * ((p_idx * total_t) + t_idx + 1) - 1));
|
||||
}
|
||||
|
||||
/* devide val equally among threads */
|
||||
int compute_thread_portion(int val, int p_idx, int t_idx, int total_p, int total_t) {
|
||||
int interval = val / total_p / total_t;
|
||||
int remaining = val - (interval * total_p * total_t);
|
||||
if ((p_idx * total_t + t_idx) < remaining) {
|
||||
return interval+1;
|
||||
} else if (interval == 0) {
|
||||
return -1;
|
||||
}
|
||||
/* else */
|
||||
return interval;
|
||||
}
|
||||
|
||||
/* number of digits */
|
||||
int digits(int num) {
|
||||
int digits = 0;
|
||||
while (num > 0) {
|
||||
num /= 10;
|
||||
digits++;
|
||||
}
|
||||
return digits;
|
||||
}
|
||||
|
||||
|
||||
/* generate a key for a given key number */
|
||||
/* len is the buffer size, key length + null */
|
||||
void genkey(char *str, int num, int rows, int len) {
|
||||
int i;
|
||||
int rowdigit = digits(rows);
|
||||
sprintf(str, KEYPREFIX "%0.*d", rowdigit, num);
|
||||
for (i = (KEYPREFIXLEN + rowdigit); i < len-1; i++) {
|
||||
str[i] = 'x';
|
||||
}
|
||||
str[len-1] = '\0';
|
||||
}
|
||||
|
|
@ -0,0 +1,52 @@
|
|||
#ifndef UTILS_H
|
||||
#define UTILS_H
|
||||
#pragma once
|
||||
|
||||
/* uniform-distribution random */
|
||||
/* return a uniform random number between low and high, both inclusive */
|
||||
int urand(int low, int high);
|
||||
|
||||
/* write a random string of the length of (len-1) to memory pointed by str
|
||||
* with a null-termination character at str[len-1].
|
||||
*/
|
||||
void randstr(char *str, int len);
|
||||
|
||||
/* write a random numeric string of the length of (len-1) to memory pointed by str
|
||||
* with a null-termination character at str[len-1].
|
||||
*/
|
||||
void randnumstr(char *str, int len);
|
||||
|
||||
/* given the total number of rows to be inserted,
|
||||
* the worker process index p_idx and the thread index t_idx (both 0-based),
|
||||
* and the total number of processes, total_p, and threads, total_t,
|
||||
* returns the first row number assigned to this partition.
|
||||
*/
|
||||
int insert_begin(int rows, int p_idx, int t_idx, int total_p, int total_t);
|
||||
|
||||
/* similar to insert_begin, insert_end returns the last row numer */
|
||||
int insert_end(int rows, int p_idx, int t_idx, int total_p, int total_t);
|
||||
|
||||
/* devide a value equally among threads */
|
||||
int compute_thread_portion(int val, int p_idx, int t_idx, int total_p,
|
||||
int total_t);
|
||||
|
||||
/* similar to insert_begin/end, compute_thread_tps computes
|
||||
* the per-thread target TPS for given configuration.
|
||||
*/
|
||||
#define compute_thread_tps(val, p_idx, t_idx, total_p, total_t) \
|
||||
compute_thread_portion(val, p_idx, t_idx, total_p, total_t)
|
||||
|
||||
/* similar to compute_thread_tps,
|
||||
* compute_thread_iters computs the number of iterations.
|
||||
*/
|
||||
#define compute_thread_iters(val, p_idx, t_idx, total_p, total_t) \
|
||||
compute_thread_portion(val, p_idx, t_idx, total_p, total_t)
|
||||
|
||||
/* get the number of digits */
|
||||
int digits(int num);
|
||||
|
||||
/* generate a key for a given key number */
|
||||
/* len is the buffer size, key length + null */
|
||||
void genkey(char *str, int num, int rows, int len);
|
||||
|
||||
#endif /* UTILS_H */
|
|
@ -0,0 +1,132 @@
|
|||
/*
|
||||
* zipfian distribution copied from YCSB
|
||||
* https://github.com/brianfrankcooper/YCSB
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <math.h>
|
||||
#include <time.h>
|
||||
#include "zipf.h"
|
||||
|
||||
/* global static */
|
||||
static int items;
|
||||
static int base;
|
||||
static double zipfianconstant;
|
||||
static double alpha, zetan, eta, theta, zeta2theta;
|
||||
static int countforzeta;
|
||||
static int allowitemcountdecrease = 0;
|
||||
|
||||
/* declarations */
|
||||
double zetastatic2(int st, int n, double theta, double initialsum);
|
||||
double zeta2(int st, int n, double theta_val, double initialsum);
|
||||
double zetastatic(int n, double theta);
|
||||
double zeta(int n, double theta_val);
|
||||
|
||||
|
||||
double rand_double() {
|
||||
return (double)rand() / (double)RAND_MAX;
|
||||
}
|
||||
|
||||
|
||||
int next_int(int itemcount) {
|
||||
double u, uz;
|
||||
int ret;
|
||||
|
||||
if (itemcount != countforzeta) {
|
||||
zetan = zeta2(countforzeta, itemcount, theta, zetan);
|
||||
} else if ((itemcount < countforzeta) && (allowitemcountdecrease)) {
|
||||
zetan = zeta(itemcount, theta);
|
||||
}
|
||||
eta = (1 - pow(2.0 / items, 1 - theta)) / (1 - zeta2theta / zetan);
|
||||
|
||||
u = rand_double();
|
||||
uz = u * zetan;
|
||||
|
||||
if (uz < 1.0) {
|
||||
return base;
|
||||
}
|
||||
|
||||
if (uz < 1.0 + pow(0.5, theta)) {
|
||||
return base + 1;
|
||||
}
|
||||
|
||||
ret = base + (int)(itemcount * pow(eta * u - eta + 1, alpha));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int zipfian_next() {
|
||||
return next_int(items);
|
||||
}
|
||||
|
||||
double zetastatic2(int st, int n, double theta, double initialsum) {
|
||||
int i;
|
||||
double sum = initialsum;
|
||||
for (i = st; i < n; i++) {
|
||||
sum += 1 / pow(i + 1, theta);
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
double zeta2(int st, int n, double theta_val, double initialsum) {
|
||||
countforzeta = n;
|
||||
return zetastatic2(st, n, theta_val, initialsum);
|
||||
}
|
||||
|
||||
double zetastatic(int n, double theta) {
|
||||
return zetastatic2(0, n, theta, 0);
|
||||
}
|
||||
|
||||
double zeta(int n, double theta_val) {
|
||||
countforzeta = n;
|
||||
return zetastatic(n, theta_val);
|
||||
}
|
||||
|
||||
void zipfian_generator4(int min, int max, double _zipfianconstant, double _zetan) {
|
||||
items = max - min + 1;
|
||||
base = min;
|
||||
zipfianconstant = _zipfianconstant;
|
||||
|
||||
theta = zipfianconstant;
|
||||
zeta2theta = zeta(2, theta);
|
||||
alpha = 1.0 / (1.0 - theta);
|
||||
zetan = _zetan;
|
||||
countforzeta = items;
|
||||
eta = (1 - pow(2.0 / items, 1 - theta)) / (1 - zeta2theta / zetan);
|
||||
|
||||
zipfian_next();
|
||||
}
|
||||
|
||||
void zipfian_generator3(int min, int max, double zipfianconstant) {
|
||||
zipfian_generator4(min, max, zipfianconstant, zetastatic(max - min + 1, zipfianconstant));
|
||||
}
|
||||
|
||||
void zipfian_generator2(int min, int max) {
|
||||
zipfian_generator3(min, max, ZIPFIAN_CONSTANT);
|
||||
}
|
||||
|
||||
void zipfian_generator(int items) {
|
||||
zipfian_generator2(0, items - 1);
|
||||
}
|
||||
|
||||
|
||||
#if 0 /* test */
|
||||
void main() {
|
||||
int i = 0;
|
||||
int histogram[1000] = { 0 };
|
||||
|
||||
srand(time(0));
|
||||
|
||||
zipfian_generator(1000);
|
||||
|
||||
for (i = 0; i < 1000000; i++) {
|
||||
int val = next_value();
|
||||
//printf("%d\n", val);
|
||||
histogram[val]++;
|
||||
}
|
||||
|
||||
for (i = 0; i < 1000; i++) {
|
||||
printf("%d\n", histogram[i]);
|
||||
}
|
||||
}
|
||||
#endif
|
|
@ -0,0 +1,15 @@
|
|||
/*
|
||||
* zipfian distribution copied from YCSB
|
||||
* https://github.com/brianfrankcooper/YCSB
|
||||
*/
|
||||
|
||||
#ifndef ZIPF_H
|
||||
#define ZIPF_H
|
||||
#pragma once
|
||||
|
||||
#define ZIPFIAN_CONSTANT 0.99
|
||||
|
||||
void zipfian_generator(int items);
|
||||
int zipfian_next();
|
||||
|
||||
#endif /* ZIPF_H */
|
|
@ -258,8 +258,6 @@ namespace FDB {
|
|||
|
||||
typedef Standalone<KeyRangeRef> KeyRange;
|
||||
|
||||
std::string printable( const StringRef& val );
|
||||
|
||||
template <class T>
|
||||
static std::string describe(T const& item) {
|
||||
return item.toString();
|
||||
|
|
|
@ -81,10 +81,6 @@ void fdb_flow_test() {
|
|||
fdb->setupNetwork();
|
||||
startThread(networkThread, fdb);
|
||||
|
||||
int randomSeed = platform::getRandomSeed();
|
||||
|
||||
setThreadLocalDeterministicRandomSeed(randomSeed);
|
||||
|
||||
g_network = newNet2( false );
|
||||
|
||||
openTraceFile(NetworkAddress(), 1000000, 1000000, ".");
|
||||
|
@ -426,16 +422,4 @@ namespace FDB {
|
|||
void TransactionImpl::reset() {
|
||||
fdb_transaction_reset( tr );
|
||||
}
|
||||
|
||||
std::string printable( const StringRef& val ) {
|
||||
std::string s;
|
||||
for(int i=0; i<val.size(); i++) {
|
||||
uint8_t b = val[i];
|
||||
if (b >= 32 && b < 127 && b != '\\') s += (char)b;
|
||||
else if (b == '\\') s += "\\\\";
|
||||
else s += format("\\x%02x", b);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -107,7 +107,7 @@ struct DirectoryCreateSubspaceFunc : InstructionFunc {
|
|||
state Tuple path = wait(popTuple(data));
|
||||
Tuple rawPrefix = wait(data->stack.waitAndPop());
|
||||
|
||||
logOp(format("Created subspace at %s: %s", tupleToString(path).c_str(), printable(rawPrefix.getString(0)).c_str()));
|
||||
logOp(format("Created subspace at %s: %s", tupleToString(path).c_str(), rawPrefix.getString(0).printable().c_str()));
|
||||
data->directoryData.push(new Subspace(path, rawPrefix.getString(0)));
|
||||
return Void();
|
||||
}
|
||||
|
@ -133,7 +133,7 @@ struct DirectoryCreateLayerFunc : InstructionFunc {
|
|||
else {
|
||||
Subspace* nodeSubspace = data->directoryData.directoryList[index1].subspace.get();
|
||||
Subspace* contentSubspace = data->directoryData.directoryList[index2].subspace.get();
|
||||
logOp(format("Create directory layer: node_subspace (%d) = %s, content_subspace (%d) = %s, allow_manual_prefixes = %d", index1, printable(nodeSubspace->key()).c_str(), index2, printable(nodeSubspace->key()).c_str(), allowManualPrefixes));
|
||||
logOp(format("Create directory layer: node_subspace (%d) = %s, content_subspace (%d) = %s, allow_manual_prefixes = %d", index1, nodeSubspace->key().printable().c_str(), index2, nodeSubspace->key().printable().c_str(), allowManualPrefixes));
|
||||
data->directoryData.push(Reference<IDirectory>(new DirectoryLayer(*nodeSubspace, *contentSubspace, allowManualPrefixes)));
|
||||
}
|
||||
|
||||
|
@ -158,7 +158,7 @@ struct DirectoryChangeFunc : InstructionFunc {
|
|||
|
||||
if(LOG_DIRS) {
|
||||
DirectoryOrSubspace d = data->directoryData.directoryList[data->directoryData.directoryListIndex];
|
||||
printf("Changed directory to %d (%s @\'%s\')\n", data->directoryData.directoryListIndex, d.typeString().c_str(), d.directory.present() ? pathToString(d.directory.get()->getPath()).c_str() : printable(d.subspace.get()->key()).c_str());
|
||||
printf("Changed directory to %d (%s @\'%s\')\n", data->directoryData.directoryListIndex, d.typeString().c_str(), d.directory.present() ? pathToString(d.directory.get()->getPath()).c_str() : d.subspace.get()->key().printable().c_str());
|
||||
fflush(stdout);
|
||||
}
|
||||
|
||||
|
@ -192,7 +192,7 @@ struct DirectoryCreateOrOpenFunc : InstructionFunc {
|
|||
Standalone<StringRef> layer = layerTuple.getType(0) == Tuple::NULL_TYPE ? StringRef() : layerTuple.getString(0);
|
||||
|
||||
Reference<IDirectory> directory = data->directoryData.directory();
|
||||
logOp(format("create_or_open %s: layer=%s", pathToString(combinePaths(directory->getPath(), path)).c_str(), printable(layer).c_str()));
|
||||
logOp(format("create_or_open %s: layer=%s", pathToString(combinePaths(directory->getPath(), path)).c_str(), layer.printable().c_str()));
|
||||
|
||||
Reference<DirectorySubspace> dirSubspace = wait(executeMutation(instruction, [this, directory, layer] () {
|
||||
return directory->createOrOpen(instruction->tr, path, layer);
|
||||
|
@ -217,7 +217,7 @@ struct DirectoryCreateFunc : InstructionFunc {
|
|||
Optional<Standalone<StringRef>> prefix = args[1].getType(0) == Tuple::NULL_TYPE ? Optional<Standalone<StringRef>>() : args[1].getString(0);
|
||||
|
||||
Reference<IDirectory> directory = data->directoryData.directory();
|
||||
logOp(format("create %s: layer=%s, prefix=%s", pathToString(combinePaths(directory->getPath(), path)).c_str(), printable(layer).c_str(), prefix.present() ? printable(prefix.get()).c_str() : "<not present>"));
|
||||
logOp(format("create %s: layer=%s, prefix=%s", pathToString(combinePaths(directory->getPath(), path)).c_str(), layer.printable().c_str(), prefix.present() ? prefix.get().printable().c_str() : "<not present>"));
|
||||
|
||||
Reference<DirectorySubspace> dirSubspace = wait(executeMutation(instruction, [this, directory, layer, prefix] () {
|
||||
return directory->create(instruction->tr, path, layer, prefix);
|
||||
|
@ -241,7 +241,7 @@ struct DirectoryOpenFunc : InstructionFunc {
|
|||
Standalone<StringRef> layer = layerTuple.getType(0) == Tuple::NULL_TYPE ? StringRef() : layerTuple.getString(0);
|
||||
|
||||
Reference<IDirectory> directory = data->directoryData.directory();
|
||||
logOp(format("open %s: layer=%s", pathToString(combinePaths(directory->getPath(), path)).c_str(), printable(layer).c_str()));
|
||||
logOp(format("open %s: layer=%s", pathToString(combinePaths(directory->getPath(), path)).c_str(), layer.printable().c_str()));
|
||||
Reference<DirectorySubspace> dirSubspace = wait(directory->open(instruction->tr, path, layer));
|
||||
data->directoryData.push(dirSubspace);
|
||||
|
||||
|
@ -433,7 +433,7 @@ struct DirectoryUnpackKeyFunc : InstructionFunc {
|
|||
ACTOR static Future<Void> call(Reference<FlowTesterData> data, Reference<InstructionData> instruction) {
|
||||
Tuple key = wait(data->stack.waitAndPop());
|
||||
Subspace *subspace = data->directoryData.subspace();
|
||||
logOp(format("Unpack %s in subspace with prefix %s", printable(key.getString(0)).c_str(), printable(subspace->key()).c_str()));
|
||||
logOp(format("Unpack %s in subspace with prefix %s", key.getString(0).printable().c_str(), subspace->key().printable().c_str()));
|
||||
Tuple tuple = subspace->unpack(key.getString(0));
|
||||
for(int i = 0; i < tuple.size(); ++i) {
|
||||
data->stack.push(tuple.subTuple(i, i+1).pack());
|
||||
|
@ -483,7 +483,7 @@ struct DirectoryOpenSubspaceFunc : InstructionFunc {
|
|||
ACTOR static Future<Void> call(Reference<FlowTesterData> data, Reference<InstructionData> instruction) {
|
||||
Tuple tuple = wait(popTuple(data));
|
||||
Subspace *subspace = data->directoryData.subspace();
|
||||
logOp(format("open_subspace %s (at %s)", tupleToString(tuple).c_str(), printable(subspace->key()).c_str()));
|
||||
logOp(format("open_subspace %s (at %s)", tupleToString(tuple).c_str(), subspace->key().printable().c_str()));
|
||||
Subspace *child = new Subspace(subspace->subspace(tuple));
|
||||
data->directoryData.push(child);
|
||||
|
||||
|
|
|
@ -97,7 +97,7 @@ std::string tupleToString(Tuple const& tuple) {
|
|||
if(type == Tuple::UTF8) {
|
||||
str += "u";
|
||||
}
|
||||
str += "\'" + printable(tuple.getString(i)) + "\'";
|
||||
str += "\'" + tuple.getString(i).printable() + "\'";
|
||||
}
|
||||
else if(type == Tuple::INT) {
|
||||
str += format("%ld", tuple.getInt(i));
|
||||
|
@ -220,9 +220,9 @@ ACTOR static Future<Void> debugPrintRange(Reference<Transaction> tr, std::string
|
|||
|
||||
Standalone<RangeResultRef> results = wait(getRange(tr, KeyRange(KeyRangeRef(subspace + '\x00', subspace + '\xff'))));
|
||||
printf("==================================================DB:%s:%s, count:%d\n", msg.c_str(),
|
||||
printable(subspace).c_str(), results.size());
|
||||
StringRef(subspace).printable().c_str(), results.size());
|
||||
for (auto & s : results) {
|
||||
printf("=====key:%s, value:%s\n", printable(StringRef(s.key)).c_str(), printable(StringRef(s.value)).c_str());
|
||||
printf("=====key:%s, value:%s\n", StringRef(s.key).printable().c_str(), StringRef(s.value).printable().c_str());
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
@ -1791,7 +1791,7 @@ ACTOR void _test_versionstamp() {
|
|||
|
||||
ASSERT(trVersion.compare(dbVersion) == 0);
|
||||
|
||||
fprintf(stderr, "%s\n", printable(trVersion).c_str());
|
||||
fprintf(stderr, "%s\n", trVersion.printable().c_str());
|
||||
|
||||
g_network->stop();
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ package directory
|
|||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/apple/foundationdb/bindings/go/src/fdb"
|
||||
"github.com/apple/foundationdb/bindings/go/src/fdb/subspace"
|
||||
)
|
||||
|
@ -54,6 +55,18 @@ const (
|
|||
_MICROVERSION int32 = 0
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrDirAlreadyExists is returned when trying to create a directory while it already exists.
|
||||
ErrDirAlreadyExists = errors.New("the directory already exists")
|
||||
|
||||
// ErrDirNotExists is returned when opening or listing a directory that does not exist.
|
||||
ErrDirNotExists = errors.New("the directory does not exist")
|
||||
|
||||
// ErrParentDirDoesNotExist is returned when opening a directory and one or more
|
||||
// parent directories in the path do not exist.
|
||||
ErrParentDirDoesNotExist = errors.New("the parent directory does not exist")
|
||||
)
|
||||
|
||||
// Directory represents a subspace of keys in a FoundationDB database,
|
||||
// identified by a hierarchical path.
|
||||
type Directory interface {
|
||||
|
@ -69,8 +82,9 @@ type Directory interface {
|
|||
CreateOrOpen(t fdb.Transactor, path []string, layer []byte) (DirectorySubspace, error)
|
||||
|
||||
// Open opens the directory specified by path (relative to this Directory),
|
||||
// and returns the directory and its contents as a DirectorySubspace (or an
|
||||
// error if the directory does not exist).
|
||||
// and returns the directory and its contents as a DirectorySubspace (or ErrDirNotExists
|
||||
// error if the directory does not exist, or ErrParentDirDoesNotExist if one of the parent
|
||||
// directories in the path does not exist).
|
||||
//
|
||||
// If the byte slice layer is specified, it is compared against the layer
|
||||
// specified when the directory was created, and an error is returned if
|
||||
|
@ -79,7 +93,7 @@ type Directory interface {
|
|||
|
||||
// Create creates a directory specified by path (relative to this
|
||||
// Directory), and returns the directory and its contents as a
|
||||
// DirectorySubspace (or an error if the directory already exists).
|
||||
// DirectorySubspace (or ErrDirAlreadyExists if the directory already exists).
|
||||
//
|
||||
// If the byte slice layer is specified, it is recorded as the layer and
|
||||
// will be checked when opening the directory in the future.
|
||||
|
|
|
@ -99,7 +99,7 @@ func (dl directoryLayer) createOrOpen(rtr fdb.ReadTransaction, tr *fdb.Transacti
|
|||
}
|
||||
|
||||
if !allowOpen {
|
||||
return nil, errors.New("the directory already exists")
|
||||
return nil, ErrDirAlreadyExists
|
||||
}
|
||||
|
||||
if layer != nil {
|
||||
|
@ -112,7 +112,7 @@ func (dl directoryLayer) createOrOpen(rtr fdb.ReadTransaction, tr *fdb.Transacti
|
|||
}
|
||||
|
||||
if !allowCreate {
|
||||
return nil, errors.New("the directory does not exist")
|
||||
return nil, ErrDirNotExists
|
||||
}
|
||||
|
||||
if e := dl.checkVersion(rtr, tr); e != nil {
|
||||
|
@ -161,7 +161,7 @@ func (dl directoryLayer) createOrOpen(rtr fdb.ReadTransaction, tr *fdb.Transacti
|
|||
}
|
||||
|
||||
if parentNode == nil {
|
||||
return nil, errors.New("the parent directory does not exist")
|
||||
return nil, ErrParentDirDoesNotExist
|
||||
}
|
||||
|
||||
node := dl.nodeWithPrefix(prefix)
|
||||
|
@ -254,7 +254,7 @@ func (dl directoryLayer) List(rt fdb.ReadTransactor, path []string) ([]string, e
|
|||
|
||||
node := dl.find(rtr, path).prefetchMetadata(rtr)
|
||||
if !node.exists() {
|
||||
return nil, errors.New("the directory does not exist")
|
||||
return nil, ErrDirNotExists
|
||||
}
|
||||
|
||||
if node.isInPartition(nil, true) {
|
||||
|
|
|
@ -6,6 +6,8 @@ set(WITH_UNDODB OFF CACHE BOOL "Use rr or undodb")
|
|||
set(USE_ASAN OFF CACHE BOOL "Compile with address sanitizer")
|
||||
set(FDB_RELEASE OFF CACHE BOOL "This is a building of a final release")
|
||||
set(USE_LD "LD" CACHE STRING "The linker to use for building: can be LD (system default, default choice), GOLD, or LLD")
|
||||
set(USE_LIBCXX OFF CACHE BOOL "Use libc++")
|
||||
set(USE_CCACHE OFF CACHE BOOL "Use ccache for compilation if available")
|
||||
|
||||
if(USE_GPERFTOOLS)
|
||||
find_package(Gperftools REQUIRED)
|
||||
|
@ -44,6 +46,22 @@ else()
|
|||
add_definitions(-DUSE_UCONTEXT)
|
||||
endif()
|
||||
|
||||
if ((NOT USE_CCACHE) AND (NOT "$ENV{USE_CCACHE}" STREQUAL ""))
|
||||
string(TOUPPER "$ENV{USE_CCACHE}" USE_CCACHEENV)
|
||||
if (("${USE_CCACHEENV}" STREQUAL "ON") OR ("${USE_CCACHEENV}" STREQUAL "1") OR ("${USE_CCACHEENV}" STREQUAL "YES"))
|
||||
set(USE_CCACHE ON)
|
||||
endif()
|
||||
endif()
|
||||
if (USE_CCACHE)
|
||||
FIND_PROGRAM(CCACHE_FOUND "ccache")
|
||||
if(CCACHE_FOUND)
|
||||
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE ccache)
|
||||
set_property(GLOBAL PROPERTY RULE_LAUNCH_LINK ccache)
|
||||
else()
|
||||
message(SEND_ERROR "CCACHE is ON, but ccache was not found")
|
||||
endif()
|
||||
endif()
|
||||
|
||||
include(CheckFunctionExists)
|
||||
set(CMAKE_REQUIRED_INCLUDES stdlib.h malloc.h)
|
||||
set(CMAKE_REQUIRED_LIBRARIES c)
|
||||
|
@ -115,8 +133,12 @@ else()
|
|||
add_compile_options(-DVALGRIND -DUSE_VALGRIND)
|
||||
endif()
|
||||
if (CLANG)
|
||||
if (APPLE)
|
||||
add_compile_options(-stdlib=libc++)
|
||||
if (APPLE OR USE_LIBCXX)
|
||||
add_compile_options($<$<COMPILE_LANGUAGE:CXX>:-stdlib=libc++>)
|
||||
add_compile_definitions(WITH_LIBCXX)
|
||||
if (NOT APPLE)
|
||||
add_link_options(-stdlib=libc++ -lc++abi -Wl,-build-id=sha1)
|
||||
endif()
|
||||
endif()
|
||||
add_compile_options(
|
||||
-Wno-unknown-warning-option
|
||||
|
|
|
@ -1,5 +1,13 @@
|
|||
set(FORCE_ALL_COMPONENTS OFF CACHE BOOL "Fails cmake if not all dependencies are found")
|
||||
|
||||
################################################################################
|
||||
# Valgrind
|
||||
################################################################################
|
||||
|
||||
if(USE_VALGRIND)
|
||||
find_package(Valgrind REQUIRED)
|
||||
endif()
|
||||
|
||||
################################################################################
|
||||
# LibreSSL
|
||||
################################################################################
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
find_path(VALGRIND_INCLUDE_DIR
|
||||
NAMES
|
||||
valgrind.h
|
||||
PATH_SUFFIXES include valgrind)
|
||||
|
||||
find_package_handle_standard_args(Valgrind
|
||||
REQUIRED_VARS VALGRIND_INCLUDE_DIR
|
||||
FAIL_MESSAGE "Could not find Valgrind header files, try set the path to the Valgrind headers in the variable Valgrind_ROOT")
|
||||
|
||||
if(VALGRIND_FOUND)
|
||||
add_library(Valgrind INTERFACE)
|
||||
target_include_directories(Valgrind INTERFACE "${VALGRIND_INCLUDE_DIR}")
|
||||
endif()
|
|
@ -0,0 +1,91 @@
|
|||
## FDB Backup Data Format
|
||||
|
||||
### Introduction
|
||||
This document describes the data format of the files generated by FoundationDB (FDB) backup procedure.
|
||||
The target readers who may benefit from reading this document are:
|
||||
* who make changes on the current backup or restore procedure;
|
||||
* who writes tools to digest the backup data for analytical purpose;
|
||||
* who wants to understand the internals of how backup and restore works.
|
||||
|
||||
The description of the backup data format is based on FDB 5.2 to FDB 6.1. The backup data format may (although unlikely) change after FDB 6.1.
|
||||
|
||||
|
||||
### Files generated by backup
|
||||
The backup procedure generates two types of files: range files and log files.
|
||||
* A range file describes key-value pairs in a range at the version when the backup process takes a snapshot of the range. Different range files have data for different ranges at different versions.
|
||||
* A log file describes the mutations taken from a version v<sub>1</sub> to v<sub>2</sub> during the backup procedure.
|
||||
|
||||
With the key-value pairs in range file and the mutations in log file, the restore procedure can restore the database into a consistent state at a user-provided version v<sub>k</sub> if the backup data is claimed by the restore as restorable at v<sub>k</sub>. (The details of determining if a set of backup data is restorable at a version is out of scope of this document and can be found at [backup.md](https://github.com/xumengpanda/foundationdb/blob/cd873831ecd18653c5bf459d6f72d14a99b619c4/design/backup.md).
|
||||
|
||||
|
||||
### Filename conventions
|
||||
The backup files will be saved in a directory (i.e., url) specified by users. Under the directory, the range files are in the `snapshots` folder. The log files are in the `logs` folder.
|
||||
|
||||
The convention of the range filename is ` snapshots/snapshot,beginVersion,beginVersion,blockSize`, where `beginVersion` is the version when the key-values in the range file are recorded, and blockSize is the size of data blocks in the range file.
|
||||
|
||||
The convention of the log filename is `logs/,versionPrefix/log,beginVersion,endVersion,randomUID, blockSize`, where the versionPrefix is a 2-level path (`x/y`) where beginVersion should go such that `x/y/*` contains (10^smallestBucket) possible versions; the randomUID is a random UID, the `beginVersion` and `endVersion` are the version range (left inclusive, right exclusive) when the mutations are recorded; and the `blockSize` is the data block size in the log file.
|
||||
|
||||
We will use an example to explain what each field in the range and log filename means.
|
||||
Suppose under the backup directory, we have a range file `snapshots/snapshot,78994177,78994177,97` and a log file `logs/0000/0000/log,78655645,98655645,149a0bdfedecafa2f648219d5eba816e,1048576`.
|
||||
The range file’s filename tells us that all key-value pairs decoded from the file are the KV value in DB at the version `78994177`. The data block size is `97` bytes.
|
||||
The log file’s filename tells us that the mutations in the log file were the mutations in the DB during the version range `[78655645,98655645)`, and the data block size is `1048576` bytes.
|
||||
|
||||
|
||||
### Data format in a range file
|
||||
A range file can have one to many data blocks. Each data block has a set of key-value pairs.
|
||||
A data block is encoded as follows: `Header startKey k1v1 k2v2 Padding`.
|
||||
|
||||
|
||||
Example:
|
||||
|
||||
The client code writes keys in this sequence:
|
||||
a c d e f g h i j z
|
||||
The backup procedure records the key-value pairs in the database into range file.
|
||||
|
||||
H = header P = padding a...z = keys v = value | = block boundary
|
||||
|
||||
Encoded file: H a cv dv ev P | H e ev fv gv hv P | H h hv iv jv z
|
||||
Decoded in blocks yields:
|
||||
Block 1: range [a, e) with kv pairs cv, dv
|
||||
Block 2: range [e, h) with kv pairs ev, fv, gv
|
||||
Block 3: range [h, z) with kv pairs hv, iv, jv
|
||||
|
||||
NOTE: All blocks except for the final block will have one last value which will not be used. This isn't actually a waste since if the next KV pair wouldn't fit within the block after the value then the space after the final key to the next 1MB boundary would just be padding anyway.
|
||||
|
||||
The code related to how a range file is written is in the `struct RangeFileWriter` in `namespace fileBackup`.
|
||||
|
||||
The code that decodes a range block is in `ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeRangeFileBlock(Reference<IAsyncFile> file, int64_t offset, int len)`.
|
||||
|
||||
|
||||
### Data format in a log file
|
||||
A log file can have one to many data blocks.
|
||||
Each block is encoded as `Header, [Param1, Param2]... padding`.
|
||||
The first 32bits in `Param1` and `Param2` specifies the length of the `Param1` and `Param2`.
|
||||
`Param1` specifies the version when the mutations happened;
|
||||
`Param2` encodes the group of mutations happened at the version.
|
||||
|
||||
Note that if the group of mutations is bigger than the block size, the mutation group will be split across multiple data blocks.
|
||||
For example, we may get `[Param1, Param2_part0]`, `[Param1, Param2_part1]`. By concatenating the `Param2_part0` and `Param2_part1`, we can get the group of all mutations happened in the version specified in `Param1`.
|
||||
|
||||
The encoding format for `Param1` is as follows:
|
||||
`hashValue|commitVersion|part`,
|
||||
where `hashValue` is the hash of the commitVersion, `commitVersion` is the version when the mutations in `Param2`(s) are taken, and `part` is the part number in case we need to concatenate the `Param2` to get the group of all mutations.
|
||||
`hashValue` takes 8bits, `commitVersion` takes 64bits, and `part` takes 32bits.
|
||||
|
||||
Note that in case of concatenating the partial group of mutations in `Param2` to get the full group of all mutations, the part number should be continuous.
|
||||
|
||||
The encoding format for the group of mutations, which is Param2 or the concatenated Param2 in case of partial group of mutations in a block, is as follows:
|
||||
`length_of_the_mutation_group | encoded_mutation_1 | … | encoded_mutation_k`.
|
||||
The `encoded_mutation_i` is encoded as follows
|
||||
`type|kLen|vLen|Key|Value`
|
||||
where type is the mutation type, such as Set or Clear, `kLen` and `vLen` respectively are the length of the key and value in the mutation. `Key` and `Value` are the serialized value of the Key and Value in the mutation.
|
||||
|
||||
The code related to how a log file is written is in the `struct LogFileWriter` in `namespace fileBackup`.
|
||||
|
||||
The code that decodes a mutation block is in `ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeLogFileBlock(Reference<IAsyncFile> file, int64_t offset, int len)`.
|
||||
|
||||
|
||||
### Endianness
|
||||
When the restore decodes a serialized integer from the backup file, it needs to convert the serialized value from big endian to little endian.
|
||||
|
||||
The reason is as follows: When the backup procedure transfers the data to remote blob store, the backup data is encoded in big endian. However, FoundationDB currently only run on little endian machines. The endianness affects the interpretation of an integer, so we must perform the endianness convertion.
|
|
@ -530,7 +530,7 @@ The second feature is the ability to add one or more synchronous replicas of the
|
|||
|
||||
An example configuration would be four total datacenters, two on the east coast, two on the west coast, with a preference for fast write latencies from the west coast. One datacenter on each coast would be sized to store a full copy of the data. The second datacenter on each coast would only have a few FoundationDB processes.
|
||||
|
||||
While everything is healthy, writes need to be made durable in both west coast datacenters before a commit can succeed. The geographic proximity of the two datacenters minimizes the additional commit latency. Reads can be served from either region, and clients can get data from whichever region is closer. Getting a read version from the each coast region will still require communicating with a west coast datacenter. Clients can cache read versions if they can tolerate reading stale data to avoid waiting on read versions.
|
||||
While everything is healthy, writes need to be made durable in both west coast datacenters before a commit can succeed. The geographic proximity of the two datacenters minimizes the additional commit latency. Reads can be served from either region, and clients can get data from whichever region is closer. Getting a read version from east coast region will still require communicating with a west coast datacenter. Clients can cache read versions if they can tolerate reading stale data to avoid waiting on read versions.
|
||||
|
||||
If either west coast datacenter fails, the last few mutations will be propagated from the remaining west coast datacenter to the east coast. At this point, FoundationDB will start accepting commits on the east coast. Once the west coast comes back online, the system will automatically start copying all the data that was committed to the east coast back to the west coast replica. Once the west coast has caught up, the system will automatically switch back to accepting writes from the west coast again.
|
||||
|
||||
|
@ -615,7 +615,7 @@ The number of replicas in each region is controlled by redundancy level. For exa
|
|||
Asymmetric configurations
|
||||
-------------------------
|
||||
|
||||
The fact that satellite policies are configured per region allows for asymmetric configurations. For example, FoudnationDB can have a three datacenter setup where there are two datacenters on the west coast (WC1, WC2) and one datacenter on the east coast (EC1). The west coast region can be set as the preferred active region by setting the priority of its primary datacenter higher than the east coast datacenter. The west coast region should have a satellite policy configured, so that when it is active, FoundationDB is making mutations durable in both west coast datacenters. In the rare event that one of the west coast datacenter have failed, FoundationDB will fail over to the east coast datacenter. Because this region does not a satellite datacenter, the mutations will only be made durable in one datacenter while the transaction subsystem is located here. However this is justifiable because the region will only be active if a datacenter has already been lost.
|
||||
The fact that satellite policies are configured per region allows for asymmetric configurations. For example, FoudnationDB can have a three datacenter setup where there are two datacenters on the west coast (WC1, WC2) and one datacenter on the east coast (EC1). The west coast region can be set as the preferred active region by setting the priority of its primary datacenter higher than the east coast datacenter. The west coast region should have a satellite policy configured, so that when it is active, FoundationDB is making mutations durable in both west coast datacenters. In the rare event that one of the west coast datacenters has failed, FoundationDB will fail over to the east coast datacenter. Because this region does not a satellite datacenter, the mutations will only be made durable in one datacenter while the transaction subsystem is located here. However, this is justifiable because the region will only be active if a datacenter has already been lost.
|
||||
|
||||
This is the region configuration that implements the example::
|
||||
|
||||
|
@ -669,7 +669,7 @@ To configure an existing database to regions, do the following steps:
|
|||
|
||||
4. Configure ``usable_regions=2``. This will cause the cluster to start copying data between the regions.
|
||||
|
||||
5. Watch ``status`` and wait until data movement is complete. This will mean signal that the remote datacenter has a full replica of all of the data in the database.
|
||||
5. Watch ``status`` and wait until data movement is complete. This will signal that the remote datacenter has a full replica of all of the data in the database.
|
||||
|
||||
6. Change the region configuration to have a non-negative priority for the primary datacenters in both regions. This will enable automatic failover between regions.
|
||||
|
||||
|
@ -680,7 +680,7 @@ When a primary datacenter fails, the cluster will go into a degraded state. It w
|
|||
|
||||
.. warning:: While a datacenter has failed, the maximum write throughput of the cluster will be roughly 1/3 of normal performance. This is because the transaction logs need to store all of the mutations being committed, so that once the other datacenter comes back online, it can replay history to catch back up.
|
||||
|
||||
To drop the dead datacenter do the follow steps:
|
||||
To drop the dead datacenter do the following steps:
|
||||
|
||||
1. Configure the region configuration so that the dead datacenter has a negative priority.
|
||||
|
||||
|
|
|
@ -10,38 +10,38 @@ macOS
|
|||
|
||||
The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server.
|
||||
|
||||
* `FoundationDB-6.1.8.pkg <https://www.foundationdb.org/downloads/6.1.8/macOS/installers/FoundationDB-6.1.8.pkg>`_
|
||||
* `FoundationDB-6.1.9.pkg <https://www.foundationdb.org/downloads/6.1.9/macOS/installers/FoundationDB-6.1.9.pkg>`_
|
||||
|
||||
Ubuntu
|
||||
------
|
||||
|
||||
The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x.
|
||||
|
||||
* `foundationdb-clients-6.1.8-1_amd64.deb <https://www.foundationdb.org/downloads/6.1.8/ubuntu/installers/foundationdb-clients_6.1.7-1_amd64.deb>`_
|
||||
* `foundationdb-server-6.1.8-1_amd64.deb <https://www.foundationdb.org/downloads/6.1.8/ubuntu/installers/foundationdb-server_6.1.7-1_amd64.deb>`_ (depends on the clients package)
|
||||
* `foundationdb-clients-6.1.9-1_amd64.deb <https://www.foundationdb.org/downloads/6.1.9/ubuntu/installers/foundationdb-clients_6.1.9-1_amd64.deb>`_
|
||||
* `foundationdb-server-6.1.9-1_amd64.deb <https://www.foundationdb.org/downloads/6.1.9/ubuntu/installers/foundationdb-server_6.1.9-1_amd64.deb>`_ (depends on the clients package)
|
||||
|
||||
RHEL/CentOS EL6
|
||||
---------------
|
||||
|
||||
The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x.
|
||||
|
||||
* `foundationdb-clients-6.1.8-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.1.8/rhel6/installers/foundationdb-clients-6.1.8-1.el6.x86_64.rpm>`_
|
||||
* `foundationdb-server-6.1.8-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.1.8/rhel6/installers/foundationdb-server-6.1.8-1.el6.x86_64.rpm>`_ (depends on the clients package)
|
||||
* `foundationdb-clients-6.1.9-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.1.9/rhel6/installers/foundationdb-clients-6.1.9-1.el6.x86_64.rpm>`_
|
||||
* `foundationdb-server-6.1.9-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.1.9/rhel6/installers/foundationdb-server-6.1.9-1.el6.x86_64.rpm>`_ (depends on the clients package)
|
||||
|
||||
RHEL/CentOS EL7
|
||||
---------------
|
||||
|
||||
The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x.
|
||||
|
||||
* `foundationdb-clients-6.1.8-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.1.8/rhel7/installers/foundationdb-clients-6.1.8-1.el7.x86_64.rpm>`_
|
||||
* `foundationdb-server-6.1.8-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.1.8/rhel7/installers/foundationdb-server-6.1.8-1.el7.x86_64.rpm>`_ (depends on the clients package)
|
||||
* `foundationdb-clients-6.1.9-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.1.9/rhel7/installers/foundationdb-clients-6.1.9-1.el7.x86_64.rpm>`_
|
||||
* `foundationdb-server-6.1.9-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.1.9/rhel7/installers/foundationdb-server-6.1.9-1.el7.x86_64.rpm>`_ (depends on the clients package)
|
||||
|
||||
Windows
|
||||
-------
|
||||
|
||||
The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server.
|
||||
|
||||
* `foundationdb-6.1.8-x64.msi <https://www.foundationdb.org/downloads/6.1.8/windows/installers/foundationdb-6.1.8-x64.msi>`_
|
||||
* `foundationdb-6.1.9-x64.msi <https://www.foundationdb.org/downloads/6.1.9/windows/installers/foundationdb-6.1.9-x64.msi>`_
|
||||
|
||||
API Language Bindings
|
||||
=====================
|
||||
|
@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part
|
|||
|
||||
If you need to use the FoundationDB Python API from other Python installations or paths, download the Python package:
|
||||
|
||||
* `foundationdb-6.1.8.tar.gz <https://www.foundationdb.org/downloads/6.1.8/bindings/python/foundationdb-6.1.8.tar.gz>`_
|
||||
* `foundationdb-6.1.9.tar.gz <https://www.foundationdb.org/downloads/6.1.9/bindings/python/foundationdb-6.1.9.tar.gz>`_
|
||||
|
||||
Ruby 1.9.3/2.0.0+
|
||||
-----------------
|
||||
|
||||
* `fdb-6.1.8.gem <https://www.foundationdb.org/downloads/6.1.8/bindings/ruby/fdb-6.1.8.gem>`_
|
||||
* `fdb-6.1.9.gem <https://www.foundationdb.org/downloads/6.1.9/bindings/ruby/fdb-6.1.9.gem>`_
|
||||
|
||||
Java 8+
|
||||
-------
|
||||
|
||||
* `fdb-java-6.1.8.jar <https://www.foundationdb.org/downloads/6.1.8/bindings/java/fdb-java-6.1.8.jar>`_
|
||||
* `fdb-java-6.1.8-javadoc.jar <https://www.foundationdb.org/downloads/6.1.8/bindings/java/fdb-java-6.1.8-javadoc.jar>`_
|
||||
* `fdb-java-6.1.9.jar <https://www.foundationdb.org/downloads/6.1.9/bindings/java/fdb-java-6.1.9.jar>`_
|
||||
* `fdb-java-6.1.9-javadoc.jar <https://www.foundationdb.org/downloads/6.1.9/bindings/java/fdb-java-6.1.9-javadoc.jar>`_
|
||||
|
||||
Go 1.11+
|
||||
--------
|
||||
|
|
|
@ -42,6 +42,15 @@ struct ClusterInterface {
|
|||
UID id() const { return openDatabase.getEndpoint().token; }
|
||||
NetworkAddress address() const { return openDatabase.getEndpoint().getPrimaryAddress(); }
|
||||
|
||||
bool hasMessage() {
|
||||
return openDatabase.getFuture().isReady() ||
|
||||
failureMonitoring.getFuture().isReady() ||
|
||||
databaseStatus.getFuture().isReady() ||
|
||||
ping.getFuture().isReady() ||
|
||||
getClientWorkers.getFuture().isReady() ||
|
||||
forceRecovery.getFuture().isReady();
|
||||
}
|
||||
|
||||
void initEndpoints() {
|
||||
openDatabase.getEndpoint( TaskClusterController );
|
||||
failureMonitoring.getEndpoint( TaskFailureMonitor );
|
||||
|
|
|
@ -392,17 +392,17 @@ namespace HTTP {
|
|||
responseID = iid->second;
|
||||
}
|
||||
event.detail("RequestIDReceived", responseID);
|
||||
if(requestID != responseID) {
|
||||
|
||||
// If the response code is 5xx (server error) then a response ID is not expected
|
||||
// so a missing id will be ignored but a mismatching id will still be an error.
|
||||
bool serverError = r->code >= 500 && r->code < 600;
|
||||
|
||||
// If request/response IDs do not match and either this is not a server error
|
||||
// or it is but the response ID is not empty then log an error.
|
||||
if(requestID != responseID && (!serverError || !responseID.empty()) ) {
|
||||
err = http_bad_request_id();
|
||||
|
||||
// Log a non-debug a error
|
||||
Severity sev = SevError;
|
||||
// If the response code is 5xx (server error) and the responseID is empty then just warn
|
||||
if(responseID.empty() && r->code >= 500 && r->code < 600) {
|
||||
sev = SevWarnAlways;
|
||||
}
|
||||
|
||||
TraceEvent(sev, "HTTPRequestFailedIDMismatch")
|
||||
TraceEvent(SevError, "HTTPRequestFailedIDMismatch")
|
||||
.detail("DebugID", conn->getDebugID())
|
||||
.detail("RemoteAddress", conn->getPeerAddress())
|
||||
.detail("Verb", verb)
|
||||
|
@ -433,6 +433,7 @@ namespace HTTP {
|
|||
return r;
|
||||
} catch(Error &e) {
|
||||
double elapsed = timer() - send_start;
|
||||
// A bad_request_id error would have already been logged in verbose mode before err is thrown above.
|
||||
if(CLIENT_KNOBS->HTTP_VERBOSE_LEVEL > 0 && e.code() != error_code_http_bad_request_id) {
|
||||
printf("[%s] HTTP *ERROR*=%s early=%d, time=%fs %s %s contentLen=%d [%d out]\n",
|
||||
conn->getDebugID().toString().c_str(), e.name(), earlyResponse, elapsed, verb.c_str(), resource.c_str(), contentLen, total_sent);
|
||||
|
|
|
@ -1004,8 +1004,6 @@ void setupNetwork(uint64_t transportId, bool useMetrics) {
|
|||
if( g_network )
|
||||
throw network_already_setup();
|
||||
|
||||
setThreadLocalDeterministicRandomSeed(platform::getRandomSeed());
|
||||
|
||||
if (!networkOptions.logClientInfo.present())
|
||||
networkOptions.logClientInfo = true;
|
||||
|
||||
|
@ -2747,7 +2745,8 @@ Future<Void> Transaction::commitMutations() {
|
|||
.detail("Size", transactionSize)
|
||||
.detail("NumMutations", tr.transaction.mutations.size())
|
||||
.detail("ReadConflictSize", tr.transaction.read_conflict_ranges.expectedSize())
|
||||
.detail("WriteConflictSize", tr.transaction.write_conflict_ranges.expectedSize());
|
||||
.detail("WriteConflictSize", tr.transaction.write_conflict_ranges.expectedSize())
|
||||
.detail("DebugIdentifier", trLogInfo ? trLogInfo->identifier : "");
|
||||
}
|
||||
|
||||
if(!apiVersionAtLeast(300)) {
|
||||
|
@ -2900,15 +2899,15 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
|
|||
|
||||
if (trLogInfo) {
|
||||
if (trLogInfo->identifier.empty()) {
|
||||
trLogInfo->identifier = printable(value.get());
|
||||
trLogInfo->identifier = value.get().printable();
|
||||
}
|
||||
else if (trLogInfo->identifier != printable(value.get())) {
|
||||
else if (trLogInfo->identifier != value.get().printable()) {
|
||||
TraceEvent(SevWarn, "CannotChangeDebugTransactionIdentifier").detail("PreviousIdentifier", trLogInfo->identifier).detail("NewIdentifier", value.get());
|
||||
throw client_invalid_operation();
|
||||
}
|
||||
}
|
||||
else {
|
||||
trLogInfo = Reference<TransactionLogInfo>(new TransactionLogInfo(printable(value.get()), TransactionLogInfo::DONT_LOG));
|
||||
trLogInfo = Reference<TransactionLogInfo>(new TransactionLogInfo(value.get().printable(), TransactionLogInfo::DONT_LOG));
|
||||
}
|
||||
break;
|
||||
|
||||
|
|
|
@ -371,10 +371,11 @@ struct StorageQueuingMetricsReply {
|
|||
Version durableVersion; // latest version durable on storage server
|
||||
double cpuUsage;
|
||||
double diskUsage;
|
||||
double localRateLimit;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, localTime, instanceID, bytesDurable, bytesInput, version, storageBytes, durableVersion, cpuUsage, diskUsage);
|
||||
serializer(ar, localTime, instanceID, bytesDurable, bytesInput, version, storageBytes, durableVersion, cpuUsage, diskUsage, localRateLimit);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -57,6 +57,9 @@ add_library(thirdparty ${FDBRPC_THIRD_PARTY_SRCS})
|
|||
if(NOT WIN32)
|
||||
target_compile_options(thirdparty BEFORE PRIVATE -w) # disable warnings for third party
|
||||
endif()
|
||||
if(USE_VALGRIND)
|
||||
target_link_libraries(thirdparty PUBLIC Valgrind)
|
||||
endif()
|
||||
|
||||
set(FDBRPC_SRCS_DISABLE_ACTOR_WITHOUT_WAIT_WARNING
|
||||
ActorFuzz.actor.cpp
|
||||
|
|
|
@ -404,6 +404,7 @@ struct Peer : NonCopyable {
|
|||
state ReplyPromise<Void> reply;
|
||||
FlowTransport::transport().sendUnreliable( SerializeSource<ReplyPromise<Void>>(reply), remotePing.getEndpoint() );
|
||||
state int64_t startingBytes = peer->bytesReceived;
|
||||
state int timeouts = 0;
|
||||
loop {
|
||||
choose {
|
||||
when (wait( delay( FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT ) )) {
|
||||
|
@ -411,7 +412,11 @@ struct Peer : NonCopyable {
|
|||
TraceEvent("ConnectionTimeout").suppressFor(1.0).detail("WithAddr", peer->destination);
|
||||
throw connection_failed();
|
||||
}
|
||||
if(timeouts > 1) {
|
||||
TraceEvent(SevWarnAlways, "ConnectionSlowPing").suppressFor(1.0).detail("WithAddr", peer->destination).detail("Timeouts", timeouts);
|
||||
}
|
||||
startingBytes = peer->bytesReceived;
|
||||
timeouts++;
|
||||
}
|
||||
when (wait( reply.getFuture() )) {
|
||||
break;
|
||||
|
|
|
@ -67,11 +67,12 @@ struct ModelHolder : NonCopyable, public ReferenceCounted<ModelHolder> {
|
|||
|
||||
struct LoadBalancedReply {
|
||||
double penalty;
|
||||
Optional<Error> error;
|
||||
LoadBalancedReply() : penalty(1.0) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar &ar) {
|
||||
serializer(ar, penalty);
|
||||
serializer(ar, penalty, error);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -83,24 +84,41 @@ Optional<LoadBalancedReply> getLoadBalancedReply(void*);
|
|||
// Returns false if we got an error that should result in reissuing the request
|
||||
template <class T>
|
||||
bool checkAndProcessResult(ErrorOr<T> result, Reference<ModelHolder> holder, bool atMostOnce, bool triedAllOptions) {
|
||||
int errCode = result.isError() ? result.getError().code() : error_code_success;
|
||||
bool maybeDelivered = errCode == error_code_broken_promise || errCode == error_code_request_maybe_delivered;
|
||||
bool receivedResponse = result.present() || (!maybeDelivered && errCode != error_code_process_behind);
|
||||
bool futureVersion = errCode == error_code_future_version || errCode == error_code_process_behind;
|
||||
|
||||
Optional<LoadBalancedReply> loadBalancedReply;
|
||||
if(!result.isError()) {
|
||||
loadBalancedReply = getLoadBalancedReply(&result.get());
|
||||
}
|
||||
|
||||
int errCode;
|
||||
if (loadBalancedReply.present()) {
|
||||
errCode = loadBalancedReply.get().error.present() ? loadBalancedReply.get().error.get().code() : error_code_success;
|
||||
}
|
||||
else {
|
||||
errCode = result.isError() ? result.getError().code() : error_code_success;
|
||||
}
|
||||
|
||||
bool maybeDelivered = errCode == error_code_broken_promise || errCode == error_code_request_maybe_delivered;
|
||||
bool receivedResponse = loadBalancedReply.present() ? !loadBalancedReply.get().error.present() : result.present();
|
||||
receivedResponse = receivedResponse || (!maybeDelivered && errCode != error_code_process_behind);
|
||||
bool futureVersion = errCode == error_code_future_version || errCode == error_code_process_behind;
|
||||
|
||||
holder->release(receivedResponse, futureVersion, loadBalancedReply.present() ? loadBalancedReply.get().penalty : -1.0);
|
||||
|
||||
if(result.present()) {
|
||||
|
||||
if (errCode == error_code_server_overloaded)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (loadBalancedReply.present() && !loadBalancedReply.get().error.present()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!loadBalancedReply.present() && result.present()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if(receivedResponse) {
|
||||
throw result.getError();
|
||||
throw loadBalancedReply.present() ? loadBalancedReply.get().error.get() : result.getError();
|
||||
}
|
||||
|
||||
if(atMostOnce && maybeDelivered) {
|
||||
|
|
|
@ -84,6 +84,19 @@ void ISimulator::displayWorkers() const
|
|||
return;
|
||||
}
|
||||
|
||||
void ISimulator::disableFor(const std::string& desc, double time) {
|
||||
disabledMap[desc] = map(::delay(time), [this, desc](Void v){ disabledMap.erase(desc); return Void(); });
|
||||
}
|
||||
|
||||
Future<Void> ISimulator::checkDisabled(const std::string& desc) const
|
||||
{
|
||||
auto iter = disabledMap.find(desc);
|
||||
if (iter != disabledMap.end()) {
|
||||
return iter->second;
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
namespace std {
|
||||
template<>
|
||||
class hash<Endpoint> {
|
||||
|
|
|
@ -311,6 +311,9 @@ public:
|
|||
virtual flowGlobalType global(int id) { return getCurrentProcess()->global(id); };
|
||||
virtual void setGlobal(size_t id, flowGlobalType v) { getCurrentProcess()->setGlobal(id,v); };
|
||||
|
||||
Future<Void> checkDisabled(const std::string& desc) const;
|
||||
void disableFor(const std::string& desc, double time);
|
||||
|
||||
static thread_local ProcessInfo* currentProcess;
|
||||
protected:
|
||||
Mutex mutex;
|
||||
|
@ -320,6 +323,7 @@ private:
|
|||
std::map<NetworkAddress, int> excludedAddresses;
|
||||
std::map<NetworkAddress, int> clearedAddresses;
|
||||
std::map<NetworkAddress, std::map<std::string, int>> roleAddresses;
|
||||
std::map<std::string, Future<Void>> disabledMap;
|
||||
bool allSwapsDisabled;
|
||||
};
|
||||
|
||||
|
|
|
@ -129,9 +129,11 @@ set(FDBSERVER_SRCS
|
|||
workloads/KVStoreTest.actor.cpp
|
||||
workloads/KillRegion.actor.cpp
|
||||
workloads/LockDatabase.actor.cpp
|
||||
workloads/LocalRatekeeper.actor.cpp
|
||||
workloads/LogMetrics.actor.cpp
|
||||
workloads/LowLatency.actor.cpp
|
||||
workloads/MachineAttrition.actor.cpp
|
||||
workloads/Mako.actor.cpp
|
||||
workloads/MemoryKeyValueStore.cpp
|
||||
workloads/MemoryKeyValueStore.h
|
||||
workloads/MemoryLifetime.actor.cpp
|
||||
|
|
|
@ -2705,6 +2705,16 @@ ACTOR Future<Void> clusterControllerCore( ClusterControllerFullInterface interf,
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> replaceInterface( ClusterControllerFullInterface interf ) {
|
||||
loop {
|
||||
if( interf.hasMessage() ) {
|
||||
wait(delay(SERVER_KNOBS->REPLACE_INTERFACE_DELAY));
|
||||
return Void();
|
||||
}
|
||||
wait(delay(SERVER_KNOBS->REPLACE_INTERFACE_CHECK_DELAY));
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> clusterController( ServerCoordinators coordinators, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, bool hasConnected, Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo, LocalityData locality ) {
|
||||
loop {
|
||||
state ClusterControllerFullInterface cci;
|
||||
|
@ -2713,19 +2723,23 @@ ACTOR Future<Void> clusterController( ServerCoordinators coordinators, Reference
|
|||
try {
|
||||
//Register as a possible leader; wait to be elected
|
||||
state Future<Void> leaderFail = tryBecomeLeader( coordinators, cci, currentCC, hasConnected, asyncPriorityInfo );
|
||||
state Future<Void> shouldReplace = replaceInterface( cci );
|
||||
|
||||
while (!currentCC->get().present() || currentCC->get().get() != cci) {
|
||||
choose {
|
||||
when( wait(currentCC->onChange()) ) {}
|
||||
when( wait(leaderFail) ) { ASSERT(false); throw internal_error(); }
|
||||
when( wait(shouldReplace) ) { break; }
|
||||
}
|
||||
}
|
||||
if(!shouldReplace.isReady()) {
|
||||
shouldReplace = Future<Void>();
|
||||
hasConnected = true;
|
||||
startRole(Role::CLUSTER_CONTROLLER, cci.id(), UID());
|
||||
inRole = true;
|
||||
|
||||
hasConnected = true;
|
||||
startRole(Role::CLUSTER_CONTROLLER, cci.id(), UID());
|
||||
inRole = true;
|
||||
|
||||
wait( clusterControllerCore( cci, leaderFail, coordinators, locality ) );
|
||||
wait( clusterControllerCore( cci, leaderFail, coordinators, locality ) );
|
||||
}
|
||||
} catch(Error& e) {
|
||||
if (inRole)
|
||||
endRole(Role::CLUSTER_CONTROLLER, cci.id(), "Error", e.code() == error_code_actor_cancelled || e.code() == error_code_coordinators_changed, e);
|
||||
|
|
|
@ -50,6 +50,17 @@ struct ClusterControllerFullInterface {
|
|||
bool operator == (ClusterControllerFullInterface const& r) const { return id() == r.id(); }
|
||||
bool operator != (ClusterControllerFullInterface const& r) const { return id() != r.id(); }
|
||||
|
||||
bool hasMessage() {
|
||||
return clientInterface.hasMessage() ||
|
||||
recruitFromConfiguration.getFuture().isReady() ||
|
||||
recruitRemoteFromConfiguration.getFuture().isReady() ||
|
||||
recruitStorage.getFuture().isReady() ||
|
||||
registerWorker.getFuture().isReady() ||
|
||||
getWorkers.getFuture().isReady() ||
|
||||
registerMaster.getFuture().isReady() ||
|
||||
getServerDBInfo.getFuture().isReady();
|
||||
}
|
||||
|
||||
void initEndpoints() {
|
||||
clientInterface.initEndpoints();
|
||||
recruitFromConfiguration.getEndpoint( TaskClusterController );
|
||||
|
|
|
@ -355,17 +355,6 @@ struct CompactPreOrderTree {
|
|||
#endif
|
||||
};
|
||||
|
||||
std::string printable(const StringRef& val) {
|
||||
std::string s;
|
||||
for (int i = 0; i<val.size(); i++) {
|
||||
uint8_t b = val[i];
|
||||
if (b >= 32 && b < 127 && b != '\\') s += (char)b;
|
||||
else if (b == '\\') s += "\\\\";
|
||||
else s += format("\\x%02x", b);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
void compactMapTests(std::vector<std::string> testData, std::vector<std::string> sampleQueries, std::string prefixTreeDOTFile = "") {
|
||||
double t1, t2;
|
||||
int r = 0;
|
||||
|
|
|
@ -271,38 +271,22 @@ ACTOR Future<Void> leaderRegister(LeaderElectionRegInterface interf, Key key) {
|
|||
return Void();
|
||||
} else {
|
||||
Optional<LeaderInfo> nextNominee;
|
||||
if (availableLeaders.size() && availableCandidates.size()) {
|
||||
nextNominee = ( *availableLeaders.begin() < *availableCandidates.begin() ) ? *availableLeaders.begin() : *availableCandidates.begin();
|
||||
} else if (availableLeaders.size()) {
|
||||
nextNominee = *availableLeaders.begin();
|
||||
} else if (availableCandidates.size()) {
|
||||
if( availableCandidates.size() && (!availableLeaders.size() || availableLeaders.begin()->leaderChangeRequired(*availableCandidates.begin())) ) {
|
||||
nextNominee = *availableCandidates.begin();
|
||||
} else {
|
||||
nextNominee = Optional<LeaderInfo>();
|
||||
} else if( availableLeaders.size() ) {
|
||||
nextNominee = *availableLeaders.begin();
|
||||
}
|
||||
|
||||
bool foundCurrentNominee = false;
|
||||
if(currentNominee.present()) {
|
||||
for(auto& it : availableLeaders) {
|
||||
if(currentNominee.get().equalInternalId(it)) {
|
||||
foundCurrentNominee = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if ( !nextNominee.present() || !foundCurrentNominee || currentNominee.get().leaderChangeRequired(nextNominee.get()) ) {
|
||||
TraceEvent("NominatingLeader").detail("Nominee", nextNominee.present() ? nextNominee.get().changeID : UID())
|
||||
.detail("Changed", nextNominee != currentNominee).detail("Key", key);
|
||||
if( !currentNominee.present() || !nextNominee.present() || !currentNominee.get().equalInternalId(nextNominee.get()) || nextNominee.get() > currentNominee.get() ) {
|
||||
TraceEvent("NominatingLeader").detail("NextNominee", nextNominee.present() ? nextNominee.get().changeID : UID())
|
||||
.detail("CurrentNominee", currentNominee.present() ? currentNominee.get().changeID : UID()).detail("Key", printable(key));
|
||||
for(unsigned int i=0; i<notify.size(); i++)
|
||||
notify[i].send( nextNominee );
|
||||
notify.clear();
|
||||
currentNominee = nextNominee;
|
||||
} else if (currentNominee.get().equalInternalId(nextNominee.get())) {
|
||||
// leader becomes better
|
||||
currentNominee = nextNominee;
|
||||
}
|
||||
|
||||
currentNominee = nextNominee;
|
||||
|
||||
if( availableLeaders.size() ) {
|
||||
nextInterval = delay( SERVER_KNOBS->POLLING_FREQUENCY );
|
||||
if(leaderIntervalCount++ > 5) {
|
||||
|
|
|
@ -327,6 +327,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( MAX_VERSION_DIFFERENCE, 20 * VERSIONS_PER_SECOND );
|
||||
init( FORCE_RECOVERY_CHECK_DELAY, 5.0 );
|
||||
init( RATEKEEPER_FAILURE_TIME, 1.0 );
|
||||
init( REPLACE_INTERFACE_DELAY, 60.0 );
|
||||
init( REPLACE_INTERFACE_CHECK_DELAY, 5.0 );
|
||||
|
||||
init( INCOMPATIBLE_PEERS_LOGGING_INTERVAL, 600 ); if( randomize && BUGGIFY ) INCOMPATIBLE_PEERS_LOGGING_INTERVAL = 60.0;
|
||||
init( EXPECTED_MASTER_FITNESS, ProcessClass::UnsetFit );
|
||||
|
@ -369,6 +371,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( TARGET_BYTES_PER_STORAGE_SERVER_BATCH, 500e6 ); if( smallStorageTarget ) TARGET_BYTES_PER_STORAGE_SERVER_BATCH = 1500e3;
|
||||
init( SPRING_BYTES_STORAGE_SERVER_BATCH, 50e6 ); if( smallStorageTarget ) SPRING_BYTES_STORAGE_SERVER_BATCH = 150e3;
|
||||
init( STORAGE_HARD_LIMIT_BYTES, 1500e6 ); if( smallStorageTarget ) STORAGE_HARD_LIMIT_BYTES = 4500e3;
|
||||
init( STORAGE_DURABILITY_LAG_SOFT_MAX, 20e6 ); if( smallStorageTarget ) STORAGE_DURABILITY_LAG_SOFT_MAX = 10e6;
|
||||
init( STORAGE_DURABILITY_LAG_HARD_MAX, 200e6 ); if( smallStorageTarget ) STORAGE_DURABILITY_LAG_HARD_MAX = 100e6;
|
||||
|
||||
bool smallTlogTarget = randomize && BUGGIFY;
|
||||
init( TARGET_BYTES_PER_TLOG, 2400e6 ); if( smallTlogTarget ) TARGET_BYTES_PER_TLOG = 2000e3;
|
||||
|
|
|
@ -269,6 +269,8 @@ public:
|
|||
int64_t MAX_VERSION_DIFFERENCE;
|
||||
double FORCE_RECOVERY_CHECK_DELAY;
|
||||
double RATEKEEPER_FAILURE_TIME;
|
||||
double REPLACE_INTERFACE_DELAY;
|
||||
double REPLACE_INTERFACE_CHECK_DELAY;
|
||||
|
||||
// Knobs used to select the best policy (via monte carlo)
|
||||
int POLICY_RATING_TESTS; // number of tests per policy (in order to compare)
|
||||
|
@ -344,6 +346,8 @@ public:
|
|||
int FETCH_KEYS_PARALLELISM_BYTES;
|
||||
int BUGGIFY_BLOCK_BYTES;
|
||||
int64_t STORAGE_HARD_LIMIT_BYTES;
|
||||
int64_t STORAGE_DURABILITY_LAG_SOFT_MAX;
|
||||
int64_t STORAGE_DURABILITY_LAG_HARD_MAX;
|
||||
int STORAGE_COMMIT_BYTES;
|
||||
double STORAGE_COMMIT_INTERVAL;
|
||||
double UPDATE_SHARD_VERSION_INTERVAL;
|
||||
|
|
|
@ -41,6 +41,7 @@ enum limitReason_t {
|
|||
storage_server_min_free_space_ratio, // a storage server's normal limits are being reduced by a low free space ratio
|
||||
log_server_min_free_space,
|
||||
log_server_min_free_space_ratio,
|
||||
storage_server_read_load,
|
||||
limitReason_t_end
|
||||
};
|
||||
|
||||
|
@ -56,7 +57,8 @@ const char* limitReasonName[] = {
|
|||
"storage_server_min_free_space",
|
||||
"storage_server_min_free_space_ratio",
|
||||
"log_server_min_free_space",
|
||||
"log_server_min_free_space_ratio"
|
||||
"log_server_min_free_space_ratio",
|
||||
"storage_server_read_load"
|
||||
};
|
||||
static_assert(sizeof(limitReasonName) / sizeof(limitReasonName[0]) == limitReason_t_end, "limitReasonDesc table size");
|
||||
|
||||
|
@ -73,6 +75,7 @@ const char* limitReasonDesc[] = {
|
|||
"Storage server running out of space (approaching 5% limit).",
|
||||
"Log server running out of space (approaching 100MB limit).",
|
||||
"Log server running out of space (approaching 5% limit).",
|
||||
"Storage server is overwhelmed by read workload",
|
||||
};
|
||||
|
||||
static_assert(sizeof(limitReasonDesc) / sizeof(limitReasonDesc[0]) == limitReason_t_end, "limitReasonDesc table size");
|
||||
|
@ -87,6 +90,8 @@ struct StorageQueueInfo {
|
|||
Smoother smoothDurableVersion, smoothLatestVersion;
|
||||
Smoother smoothFreeSpace;
|
||||
Smoother smoothTotalSpace;
|
||||
double localRateLimit;
|
||||
limitReason_t limitReason;
|
||||
StorageQueueInfo(UID id, LocalityData locality) : valid(false), id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
|
||||
smoothDurableVersion(1.), smoothLatestVersion(1.), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
|
||||
|
@ -185,6 +190,7 @@ ACTOR Future<Void> trackStorageServerQueueInfo( RatekeeperData* self, StorageSer
|
|||
myQueueInfo->value.valid = true;
|
||||
myQueueInfo->value.prevReply = myQueueInfo->value.lastReply;
|
||||
myQueueInfo->value.lastReply = reply.get();
|
||||
myQueueInfo->value.localRateLimit = reply.get().localRateLimit;
|
||||
if (myQueueInfo->value.prevReply.instanceID != reply.get().instanceID) {
|
||||
myQueueInfo->value.smoothDurableBytes.reset(reply.get().bytesDurable);
|
||||
myQueueInfo->value.verySmoothDurableBytes.reset(reply.get().bytesDurable);
|
||||
|
@ -353,7 +359,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
|||
std::multimap<double, StorageQueueInfo*> storageTpsLimitReverseIndex;
|
||||
std::map<UID, limitReason_t> ssReasons;
|
||||
|
||||
// Look at each storage server's write queue, compute and store the desired rate ratio
|
||||
// Look at each storage server's write queue and local rate, compute and store the desired rate ratio
|
||||
for(auto i = self->storageQueueInfo.begin(); i != self->storageQueueInfo.end(); ++i) {
|
||||
auto& ss = i->value;
|
||||
if (!ss.valid) continue;
|
||||
|
@ -429,6 +435,14 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
|||
}
|
||||
}
|
||||
|
||||
if (i->value.localRateLimit < 0.99) {
|
||||
auto lim = double(self->actualTpsMetric) * i->value.localRateLimit;
|
||||
if (lim < limitTps) {
|
||||
limitTps = lim;
|
||||
ssLimitReason = limitReason_t::storage_server_read_load;
|
||||
}
|
||||
}
|
||||
|
||||
storageTpsLimitReverseIndex.insert(std::make_pair(limitTps, &ss));
|
||||
|
||||
if (limitTps < limits->tpsLimit && (ssLimitReason == limitReason_t::storage_server_min_free_space || ssLimitReason == limitReason_t::storage_server_min_free_space_ratio)) {
|
||||
|
|
|
@ -154,6 +154,7 @@
|
|||
<ActorCompiler Include="workloads\Serializability.actor.cpp" />
|
||||
<ActorCompiler Include="workloads\DiskDurability.actor.cpp" />
|
||||
<ActorCompiler Include="workloads\SnapTest.actor.cpp" />
|
||||
<ActorCompiler Include="workloads\Mako.actor.cpp" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<ClInclude Include="ApplyMetadataMutation.h" />
|
||||
|
|
|
@ -287,6 +287,9 @@
|
|||
<ActorCompiler Include="workloads\DiskDurability.actor.cpp">
|
||||
<Filter>workloads</Filter>
|
||||
</ActorCompiler>
|
||||
<ActorCompiler Include="workloads\Mako.actor.cpp">
|
||||
<Filter>workloads</Filter>
|
||||
</ActorCompiler>
|
||||
<ActorCompiler Include="OldTLogServer.actor.cpp" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
|
|
|
@ -51,6 +51,7 @@
|
|||
#include "fdbserver/LatencyBandConfig.h"
|
||||
#include "fdbserver/FDBExecHelper.actor.h"
|
||||
#include "flow/TDMetric.actor.h"
|
||||
#include <type_traits>
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
using std::pair;
|
||||
|
@ -307,6 +308,27 @@ public:
|
|||
VersionedData const& data() const { return versionedData; }
|
||||
VersionedData& mutableData() { return versionedData; }
|
||||
|
||||
double old_rate = 1.0;
|
||||
double currentRate() {
|
||||
auto versionLag = version.get() - durableVersion.get();
|
||||
double res;
|
||||
if (versionLag >= SERVER_KNOBS->STORAGE_DURABILITY_LAG_HARD_MAX) {
|
||||
res = 0.0;
|
||||
} else if (versionLag > SERVER_KNOBS->STORAGE_DURABILITY_LAG_SOFT_MAX) {
|
||||
res = 1.0 - (double(versionLag) / double(SERVER_KNOBS->STORAGE_DURABILITY_LAG_HARD_MAX));
|
||||
} else {
|
||||
res = 1.0;
|
||||
}
|
||||
if (res != old_rate) {
|
||||
TraceEvent(SevDebug, "LocalRatekeeperChange", thisServerID)
|
||||
.detail("Old", old_rate)
|
||||
.detail("New", res)
|
||||
.detail("NDV", versionLag);
|
||||
old_rate = res;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
void addMutationToMutationLogOrStorage( Version ver, MutationRef m ); // Appends m to mutationLog@ver, or to storage if ver==invalidVersion
|
||||
|
||||
// Update the byteSample, and write the updates to the mutation log@ver, or to storage if ver==invalidVersion
|
||||
|
@ -485,6 +507,7 @@ public:
|
|||
specialCounter(cc, "DurableVersion", [self](){ return self->durableVersion.get(); });
|
||||
specialCounter(cc, "DesiredOldestVersion", [self](){ return self->desiredOldestVersion.get(); });
|
||||
specialCounter(cc, "VersionLag", [self](){ return self->versionLag; });
|
||||
specialCounter(cc, "LocalRatekeeper", [self]{ return self->currentRate(); });
|
||||
|
||||
specialCounter(cc, "FetchKeysFetchActive", [self](){ return self->fetchKeysParallelismLock.activePermits(); });
|
||||
specialCounter(cc, "FetchKeysWaiting", [self](){ return self->fetchKeysParallelismLock.waiters(); });
|
||||
|
@ -592,7 +615,39 @@ public:
|
|||
}
|
||||
|
||||
double getPenalty() {
|
||||
return std::max(1.0, (queueSize() - (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER - 2.0*SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER)) / SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER);
|
||||
return std::max(std::max(1.0, (queueSize() - (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER -
|
||||
2.0 * SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER)) /
|
||||
SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER),
|
||||
(currentRate() < 1e-6 ? 1e6 : 1.0 / currentRate()));
|
||||
}
|
||||
|
||||
template<class Reply>
|
||||
using isLoadBalancedReply = std::is_base_of<LoadBalancedReply, Reply>;
|
||||
|
||||
template <class Reply>
|
||||
static typename std::enable_if<isLoadBalancedReply<Reply>::value, void>::type sendErrorWithPenalty(
|
||||
const ReplyPromise<Reply>& promise, const Error& err, double penalty) {
|
||||
Reply reply;
|
||||
reply.error = err;
|
||||
reply.penalty = penalty;
|
||||
promise.send(reply);
|
||||
}
|
||||
|
||||
template <class Reply>
|
||||
static typename std::enable_if<!isLoadBalancedReply<Reply>::value, void>::type sendErrorWithPenalty(
|
||||
const ReplyPromise<Reply>& promise, const Error& err, double) {
|
||||
promise.sendError(err);
|
||||
}
|
||||
|
||||
template<class Request, class HandleFunction>
|
||||
Future<Void> readGuard(const Request& request, const HandleFunction& fun) {
|
||||
auto rate = currentRate();
|
||||
if (rate < 0.8 && deterministicRandom()->random01() > rate) {
|
||||
//request.error = future_version();
|
||||
sendErrorWithPenalty(request.reply, server_overloaded(), getPenalty());
|
||||
return Void();
|
||||
}
|
||||
return fun(this, request);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -832,7 +887,7 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
|
|||
} catch (Error& e) {
|
||||
if(!canReplyWith(e))
|
||||
throw;
|
||||
req.reply.sendError(e);
|
||||
data->sendErrorWithPenalty(req.reply, e, data->getPenalty());
|
||||
}
|
||||
|
||||
++data->counters.finishedQueries;
|
||||
|
@ -877,7 +932,7 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req )
|
|||
|
||||
if( data->watchBytes > SERVER_KNOBS->MAX_STORAGE_SERVER_WATCH_BYTES ) {
|
||||
TEST(true); //Too many watches, reverting to polling
|
||||
req.reply.sendError( watch_cancelled() );
|
||||
data->sendErrorWithPenalty(req.reply, watch_cancelled(), data->getPenalty());
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -900,7 +955,7 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req )
|
|||
} catch (Error& e) {
|
||||
if(!canReplyWith(e))
|
||||
throw;
|
||||
req.reply.sendError(e);
|
||||
data->sendErrorWithPenalty(req.reply, e, data->getPenalty());
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
@ -921,7 +976,7 @@ ACTOR Future<Void> watchValueQ( StorageServer* data, WatchValueRequest req ) {
|
|||
return Void();
|
||||
}
|
||||
when( wait( timeoutDelay < 0 ? Never() : delay(timeoutDelay) ) ) {
|
||||
req.reply.sendError( timed_out() );
|
||||
data->sendErrorWithPenalty(req.reply, timed_out(), data->getPenalty());
|
||||
return Void();
|
||||
}
|
||||
when( wait( data->noRecentUpdates.onChange()) ) {}
|
||||
|
@ -962,7 +1017,7 @@ ACTOR Future<Void> getShardStateQ( StorageServer* data, GetShardStateRequest req
|
|||
choose {
|
||||
when( wait( getShardState_impl( data, req ) ) ) {}
|
||||
when( wait( delay( g_network->isSimulated() ? 10 : 60 ) ) ) {
|
||||
req.reply.sendError( timed_out() );
|
||||
data->sendErrorWithPenalty(req.reply, timed_out(), data->getPenalty());
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
|
@ -1374,7 +1429,7 @@ ACTOR Future<Void> getKeyValues( StorageServer* data, GetKeyValuesRequest req )
|
|||
} catch (Error& e) {
|
||||
if(!canReplyWith(e))
|
||||
throw;
|
||||
req.reply.sendError(e);
|
||||
data->sendErrorWithPenalty(req.reply, e, data->getPenalty());
|
||||
}
|
||||
|
||||
++data->counters.finishedQueries;
|
||||
|
@ -1431,7 +1486,7 @@ ACTOR Future<Void> getKey( StorageServer* data, GetKeyRequest req ) {
|
|||
//if (e.code() == error_code_wrong_shard_server) TraceEvent("WrongShardServer").detail("In","getKey");
|
||||
if(!canReplyWith(e))
|
||||
throw;
|
||||
req.reply.sendError(e);
|
||||
data->sendErrorWithPenalty(req.reply, e, data->getPenalty());
|
||||
}
|
||||
|
||||
++data->counters.finishedQueries;
|
||||
|
@ -1453,6 +1508,7 @@ void getQueuingMetrics( StorageServer* self, StorageQueuingMetricsRequest const&
|
|||
reply.bytesDurable = self->counters.bytesDurable.getValue();
|
||||
|
||||
reply.storageBytes = self->storage.getStorageBytes();
|
||||
reply.localRateLimit = self->currentRate();
|
||||
|
||||
reply.version = self->version.get();
|
||||
reply.cpuUsage = self->cpuUsage;
|
||||
|
@ -2802,6 +2858,9 @@ ACTOR Future<Void> update( StorageServer* data, bool* pReceivedUpdate )
|
|||
ACTOR Future<Void> updateStorage(StorageServer* data) {
|
||||
loop {
|
||||
ASSERT( data->durableVersion.get() == data->storageVersion() );
|
||||
if (g_network->isSimulated()) {
|
||||
wait(g_pSimulator->checkDisabled(format("%s/updateStorage", data->thisServerID.toString().c_str())));
|
||||
}
|
||||
wait( data->desiredOldestVersion.whenAtLeast( data->storageVersion()+1 ) );
|
||||
wait( delay(0, TaskUpdateStorage) );
|
||||
|
||||
|
@ -3358,7 +3417,7 @@ ACTOR Future<Void> metricsCore( StorageServer* self, StorageServerInterface ssi
|
|||
when (WaitMetricsRequest req = waitNext(ssi.waitMetrics.getFuture())) {
|
||||
if (!self->isReadable( req.keys )) {
|
||||
TEST( true ); // waitMetrics immediate wrong_shard_server()
|
||||
req.reply.sendError(wrong_shard_server());
|
||||
self->sendErrorWithPenalty(req.reply, wrong_shard_server(), self->getPenalty());
|
||||
} else {
|
||||
actors.add( self->metrics.waitMetrics( req, delayJittered( SERVER_KNOBS->STORAGE_METRIC_TIMEOUT ) ) );
|
||||
}
|
||||
|
@ -3366,7 +3425,7 @@ ACTOR Future<Void> metricsCore( StorageServer* self, StorageServerInterface ssi
|
|||
when (SplitMetricsRequest req = waitNext(ssi.splitMetrics.getFuture())) {
|
||||
if (!self->isReadable( req.keys )) {
|
||||
TEST( true ); // splitMetrics immediate wrong_shard_server()
|
||||
req.reply.sendError(wrong_shard_server());
|
||||
self->sendErrorWithPenalty(req.reply, wrong_shard_server(), self->getPenalty());
|
||||
} else {
|
||||
self->metrics.splitMetrics( req );
|
||||
}
|
||||
|
@ -3475,20 +3534,20 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
|
|||
if (SHORT_CIRCUT_ACTUAL_STORAGE && normalKeys.contains(req.key))
|
||||
req.reply.send(GetValueReply());
|
||||
else
|
||||
actors.add( getValueQ( self, req ) );
|
||||
actors.add(self->readGuard(req , getValueQ));
|
||||
}
|
||||
when( WatchValueRequest req = waitNext(ssi.watchValue.getFuture()) ) {
|
||||
// TODO: fast load balancing?
|
||||
// SOMEDAY: combine watches for the same key/value into a single watch
|
||||
actors.add( watchValueQ( self, req ) );
|
||||
actors.add(self->readGuard(req, watchValueQ));
|
||||
}
|
||||
when (GetKeyRequest req = waitNext(ssi.getKey.getFuture())) {
|
||||
// Warning: This code is executed at extremely high priority (TaskLoadBalancedEndpoint), so downgrade before doing real work
|
||||
actors.add( getKey( self, req ) );
|
||||
actors.add(self->readGuard(req , getKey));
|
||||
}
|
||||
when (GetKeyValuesRequest req = waitNext(ssi.getKeyValues.getFuture()) ) {
|
||||
// Warning: This code is executed at extremely high priority (TaskLoadBalancedEndpoint), so downgrade before doing real work
|
||||
actors.add( getKeyValues( self, req ) );
|
||||
actors.add(self->readGuard(req , getKeyValues));
|
||||
}
|
||||
when (GetShardStateRequest req = waitNext(ssi.getShardState.getFuture()) ) {
|
||||
if (req.mode == GetShardStateRequest::NO_WAIT ) {
|
||||
|
|
|
@ -102,13 +102,13 @@ struct CycleWorkload : TestWorkload {
|
|||
try {
|
||||
// Reverse next and next^2 node
|
||||
Optional<Value> v = wait( tr.get( self->key(r) ) );
|
||||
if (!v.present()) self->badRead("r", r, tr);
|
||||
if (!v.present()) self->badRead("KeyR", r, tr);
|
||||
state int r2 = self->fromValue(v.get());
|
||||
Optional<Value> v2 = wait( tr.get( self->key(r2) ) );
|
||||
if (!v2.present()) self->badRead("r2", r2, tr);
|
||||
if (!v2.present()) self->badRead("KeyR2", r2, tr);
|
||||
state int r3 = self->fromValue(v2.get());
|
||||
Optional<Value> v3 = wait( tr.get( self->key(r3) ) );
|
||||
if (!v3.present()) self->badRead("r3", r3, tr);
|
||||
if (!v3.present()) self->badRead("KeyR3", r3, tr);
|
||||
int r4 = self->fromValue(v3.get());
|
||||
|
||||
tr.clear( self->key(r) ); //< Shouldn't have an effect, but will break with wrong ordering
|
||||
|
|
|
@ -0,0 +1,119 @@
|
|||
#include "workloads.actor.h"
|
||||
#include <fdbserver/Knobs.h>
|
||||
#include <flow/actorcompiler.h>
|
||||
|
||||
namespace {
|
||||
|
||||
ACTOR Future<StorageServerInterface> getRandomStorage(Database cx) {
|
||||
state Transaction tr(cx);
|
||||
loop {
|
||||
try {
|
||||
tr.reset();
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
Standalone<RangeResultRef> range = wait(tr.getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
if (range.size() > 0) {
|
||||
auto idx = deterministicRandom()->randomInt(0, range.size());
|
||||
return decodeServerListValue(range[idx].value);
|
||||
} else {
|
||||
wait(delay(1.0));
|
||||
}
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct LocalRatekeeperWorkload : TestWorkload {
|
||||
|
||||
double startAfter = 0.0;
|
||||
double blockWritesFor;
|
||||
bool testFailed = false;
|
||||
|
||||
LocalRatekeeperWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
startAfter = getOption(options, LiteralStringRef("startAfter"), startAfter);
|
||||
blockWritesFor = getOption(options, LiteralStringRef("blockWritesFor"),
|
||||
double(SERVER_KNOBS->STORAGE_DURABILITY_LAG_HARD_MAX)/double(1e6));
|
||||
}
|
||||
virtual std::string description() { return "LocalRatekeeperWorkload"; }
|
||||
|
||||
ACTOR static Future<Void> testStorage(LocalRatekeeperWorkload* self, Database cx, StorageServerInterface ssi) {
|
||||
state Transaction tr(cx);
|
||||
state std::vector<Future<GetValueReply>> requests;
|
||||
requests.reserve(100);
|
||||
loop {
|
||||
state StorageQueuingMetricsReply metrics = wait(ssi.getQueuingMetrics.getReply(StorageQueuingMetricsRequest{}));
|
||||
auto durabilityLag = metrics.version - metrics.durableVersion;
|
||||
double expectedRateLimit = 1.0;
|
||||
if (durabilityLag >= SERVER_KNOBS->STORAGE_DURABILITY_LAG_HARD_MAX) {
|
||||
expectedRateLimit = 0.0;
|
||||
} else if (durabilityLag > SERVER_KNOBS->STORAGE_DURABILITY_LAG_SOFT_MAX) {
|
||||
expectedRateLimit = 1.0 - double(durabilityLag) / double(SERVER_KNOBS->STORAGE_DURABILITY_LAG_HARD_MAX);
|
||||
}
|
||||
if (expectedRateLimit < metrics.localRateLimit - 0.01 || expectedRateLimit > metrics.localRateLimit + 0.01) {
|
||||
self->testFailed = true;
|
||||
TraceEvent(SevError, "StorageRateLimitTooFarOff")
|
||||
.detail("Storage", ssi.id())
|
||||
.detail("Expected", expectedRateLimit)
|
||||
.detail("Actual", metrics.localRateLimit);
|
||||
}
|
||||
tr.reset();
|
||||
Version readVersion = wait(tr.getReadVersion());
|
||||
requests.clear();
|
||||
// we send 100 requests to this storage node and count how many of those get rejected
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
GetValueRequest req;
|
||||
req.version = readVersion;
|
||||
// we don't care about the value
|
||||
req.key = LiteralStringRef("/lkfs");
|
||||
requests.emplace_back(ssi.getValue.getReply(req));
|
||||
}
|
||||
wait(waitForAllReady(requests));
|
||||
int failedRequests = 0;
|
||||
int errors = 0;
|
||||
for (const auto& resp : requests) {
|
||||
if (resp.isError()) {
|
||||
self->testFailed = true;
|
||||
++errors;
|
||||
TraceEvent(SevError, "LoadBalancedResponseReturnedError")
|
||||
.error(resp.getError());
|
||||
} else if (resp.get().error.present() && resp.get().error.get().code() == error_code_future_version) {
|
||||
++failedRequests;
|
||||
}
|
||||
if (errors > 9) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
TraceEvent("RejectedVersions")
|
||||
.detail("NumRejected", failedRequests);
|
||||
if (self->testFailed) {
|
||||
return Void();
|
||||
}
|
||||
wait(delay(5.0));
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> _start(LocalRatekeeperWorkload* self, Database cx) {
|
||||
wait(delay(self->startAfter));
|
||||
state StorageServerInterface ssi = wait(getRandomStorage(cx));
|
||||
g_simulator.disableFor(format("%s/updateStorage", ssi.id().toString().c_str()), self->blockWritesFor);
|
||||
state Future<Void> done = delay(self->blockWritesFor);
|
||||
// not much will happen until the storage goes over the soft limit
|
||||
wait(delay(double(SERVER_KNOBS->STORAGE_DURABILITY_LAG_SOFT_MAX/1e6)));
|
||||
wait(testStorage(self, cx, ssi) || done);
|
||||
return Void();
|
||||
}
|
||||
|
||||
virtual Future<Void> start(Database const& cx) {
|
||||
// we run this only on one client
|
||||
if (clientId != 0 || !g_network->isSimulated()) {
|
||||
return Void();
|
||||
}
|
||||
return _start(this, cx);
|
||||
}
|
||||
virtual Future<bool> check(Database const& cx) { return !testFailed; }
|
||||
virtual void getMetrics(vector<PerfMetric>& m) {}
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
WorkloadFactory<LocalRatekeeperWorkload> LocalRatekeeperWorkloadFactory("LocalRatekeeper");
|
|
@ -0,0 +1,532 @@
|
|||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
#include "fdbserver/workloads/BulkSetup.actor.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "flow/actorcompiler.h"
|
||||
|
||||
|
||||
enum {OP_GETREADVERSION, OP_GET, OP_GETRANGE, OP_SGET, OP_SGETRANGE, OP_UPDATE, OP_INSERT, OP_INSERTRANGE, OP_CLEAR, OP_SETCLEAR, OP_CLEARRANGE, OP_SETCLEARRANGE, OP_COMMIT, MAX_OP};
|
||||
enum {OP_COUNT, OP_RANGE};
|
||||
constexpr int MAXKEYVALUESIZE = 1000;
|
||||
constexpr int RANGELIMIT = 10000;
|
||||
struct MakoWorkload : TestWorkload {
|
||||
uint64_t rowCount, seqNumLen, sampleSize, actorCountPerClient, keyBytes, maxValueBytes, minValueBytes;
|
||||
double testDuration, loadTime, warmingDelay, maxInsertRate, transactionsPerSecond, allowedLatency, periodicLoggingInterval;
|
||||
bool enableLogging, commitGet, populateData, runBenchmark, preserveData;
|
||||
PerfIntCounter xacts, retries, conflicts, commits, totalOps;
|
||||
std::vector<PerfIntCounter> opCounters;
|
||||
std::vector<uint64_t> insertionCountsToMeasure;
|
||||
std::vector<std::pair<uint64_t, double>> ratesAtKeyCounts;
|
||||
std::string operationsSpec;
|
||||
//store operations to execute
|
||||
int operations[MAX_OP][2];
|
||||
// used for periodically tracing
|
||||
std::vector<PerfMetric> periodicMetrics;
|
||||
// store latency of each operation with sampling
|
||||
std::vector<ContinuousSample<double>> opLatencies;
|
||||
// prefix of keys populated, e.g. 'mako00000xxxxxxx'
|
||||
const std::string KEYPREFIX = "mako";
|
||||
const int KEYPREFIXLEN = KEYPREFIX.size();
|
||||
const std::array<std::string, MAX_OP> opNames = {"GRV", "GET", "GETRANGE", "SGET", "SGETRANGE", "UPDATE", "INSERT", "INSERTRANGE", "CLEAR", "SETCLEAR", "CLEARRANGE", "SETCLEARRANGE", "COMMIT"};
|
||||
MakoWorkload(WorkloadContext const& wcx)
|
||||
: TestWorkload(wcx),
|
||||
xacts("Transactions"), retries("Retries"), conflicts("Conflicts"), commits("Commits"), totalOps("Operations"),
|
||||
loadTime(0.0)
|
||||
{
|
||||
// init parameters from test file
|
||||
// Number of rows populated
|
||||
rowCount = getOption(options, LiteralStringRef("rows"), 10000);
|
||||
// Test duration in seconds
|
||||
testDuration = getOption(options, LiteralStringRef("testDuration"), 30.0);
|
||||
warmingDelay = getOption(options, LiteralStringRef("warmingDelay"), 0.0);
|
||||
maxInsertRate = getOption(options, LiteralStringRef("maxInsertRate"), 1e12);
|
||||
// Flag to control whether to populate data into database
|
||||
populateData = getOption(options, LiteralStringRef("populateData"), true);
|
||||
// Flag to control whether to run benchmark
|
||||
runBenchmark = getOption(options, LiteralStringRef("runBenchmark"), true);
|
||||
// Flag to control whether to clean data in the database
|
||||
preserveData = getOption(options, LiteralStringRef("preserveData"), true);
|
||||
// If true, force commit for read-only transactions
|
||||
commitGet = getOption(options, LiteralStringRef("commitGet"), false);
|
||||
// Target total transaction-per-second (TPS) of all clients
|
||||
transactionsPerSecond = getOption(options, LiteralStringRef("transactionsPerSecond"), 100000.0) / clientCount;
|
||||
actorCountPerClient = getOption(options, LiteralStringRef("actorCountPerClient"), 16);
|
||||
// Sampling rate (1 sample / <sampleSize> ops) for latency stats
|
||||
sampleSize = getOption(options, LiteralStringRef("sampleSize"), rowCount / 100);
|
||||
// If true, record latency metrics per periodicLoggingInterval; For details, see tracePeriodically()
|
||||
enableLogging = getOption(options, LiteralStringRef("enableLogging"), false);
|
||||
periodicLoggingInterval = getOption( options, LiteralStringRef("periodicLoggingInterval"), 5.0 );
|
||||
// Specified length of keys and length range of values
|
||||
keyBytes = std::max( getOption( options, LiteralStringRef("keyBytes"), 16 ), 16);
|
||||
maxValueBytes = getOption( options, LiteralStringRef("valueBytes"), 16 );
|
||||
minValueBytes = getOption( options, LiteralStringRef("minValueBytes"), maxValueBytes);
|
||||
ASSERT(minValueBytes <= maxValueBytes);
|
||||
// The inserted key is formatted as: fixed prefix('mako') + sequential number + padding('x')
|
||||
// assume we want to insert 10000 rows with keyBytes set to 16,
|
||||
// then the key goes from 'mako00000xxxxxxx' to 'mako09999xxxxxxx'
|
||||
seqNumLen = digits(rowCount);
|
||||
// check keyBytes, maxValueBytes is valid
|
||||
ASSERT(seqNumLen + KEYPREFIXLEN <= keyBytes);
|
||||
ASSERT(keyBytes <= MAXKEYVALUESIZE);
|
||||
ASSERT(maxValueBytes <= MAXKEYVALUESIZE);
|
||||
// user input: a sequence of operations to be executed; e.g. "g10i5" means to do GET 10 times and Insert 5 times
|
||||
// One operation type is defined as "<Type><Count>" or "<Type><Count>:<Range>".
|
||||
// When Count is omitted, it's equivalent to setting it to 1. (e.g. "g" is equivalent to "g1")
|
||||
// Multiple operation types can be concatenated. (e.g. "g9u1" = 9 GETs and 1 update)
|
||||
// For RANGE operations, "Range" needs to be specified in addition to "Count".
|
||||
// Below are all allowed inputs:
|
||||
// g – GET
|
||||
// gr – GET RANGE
|
||||
// sg – Snapshot GET
|
||||
// sgr – Snapshot GET RANGE
|
||||
// u – Update (= GET followed by SET)
|
||||
// i – Insert (= SET with a new key)
|
||||
// ir – Insert Range (Sequential)
|
||||
// c – CLEAR
|
||||
// sc – SET & CLEAR
|
||||
// cr – CLEAR RANGE
|
||||
// scr – SET & CLEAR RANGE
|
||||
// grv – GetReadVersion()
|
||||
// Every transaction is committed unless it contains only GET / GET RANGE operations.
|
||||
operationsSpec = getOption(options, LiteralStringRef("operations"), LiteralStringRef("g100")).contents().toString();
|
||||
// parse the sequence and extract operations to be executed
|
||||
parseOperationsSpec();
|
||||
for (int i = 0; i < MAX_OP; ++i) {
|
||||
// initilize per-operation latency record
|
||||
opLatencies.push_back(ContinuousSample<double>(rowCount / sampleSize));
|
||||
// initialize per-operation counter
|
||||
opCounters.push_back(PerfIntCounter(opNames[i]));
|
||||
}
|
||||
}
|
||||
|
||||
std::string description() override {
|
||||
// Mako is a simple workload to measure the performance of FDB.
|
||||
// The primary purpose of this benchmark is to generate consistent performance results
|
||||
return "Mako";
|
||||
}
|
||||
|
||||
Future<Void> setup(Database const& cx) override {
|
||||
// use all the clients to populate data
|
||||
if (populateData)
|
||||
return _setup(cx, this);
|
||||
return Void();
|
||||
}
|
||||
|
||||
Future<Void> start(Database const& cx) override {
|
||||
return _start(cx, this);
|
||||
}
|
||||
|
||||
Future<bool> check(Database const& cx) override {
|
||||
return true;
|
||||
}
|
||||
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {
|
||||
// metrics of population process
|
||||
if (populateData){
|
||||
m.push_back( PerfMetric( "Mean load time (seconds)", loadTime, true ) );
|
||||
// The importing rate of keys, controlled by parameter "insertionCountsToMeasure"
|
||||
auto ratesItr = ratesAtKeyCounts.begin();
|
||||
for(; ratesItr != ratesAtKeyCounts.end(); ratesItr++){
|
||||
m.push_back(PerfMetric(format("%ld keys imported bytes/sec", ratesItr->first), ratesItr->second, false));
|
||||
}
|
||||
}
|
||||
// benchmark
|
||||
if (runBenchmark){
|
||||
m.push_back(PerfMetric("Measured Duration", testDuration, true));
|
||||
m.push_back(xacts.getMetric());
|
||||
m.push_back(PerfMetric("Transactions/sec", xacts.getValue() / testDuration, true));
|
||||
m.push_back(totalOps.getMetric());
|
||||
m.push_back(PerfMetric("Operations/sec", totalOps.getValue() / testDuration, true));
|
||||
m.push_back(conflicts.getMetric());
|
||||
m.push_back(PerfMetric("Conflicts/sec", conflicts.getValue() / testDuration, true));
|
||||
m.push_back(retries.getMetric());
|
||||
|
||||
// count of each operation
|
||||
for (int i = 0; i < MAX_OP; ++i){
|
||||
m.push_back(opCounters[i].getMetric());
|
||||
}
|
||||
|
||||
// Meaningful Latency metrics
|
||||
const int opExecutedAtOnce[] = {OP_GETREADVERSION, OP_GET, OP_GETRANGE, OP_SGET, OP_SGETRANGE, OP_COMMIT};
|
||||
for (const int& op : opExecutedAtOnce){
|
||||
m.push_back(PerfMetric("Mean " + opNames[op] +" Latency (ms)", 1000 * opLatencies[op].mean(), true));
|
||||
m.push_back(PerfMetric("Max " + opNames[op] + " Latency (ms, averaged)", 1000 * opLatencies[op].max(), true));
|
||||
m.push_back(PerfMetric("Min " + opNames[op] + " Latency (ms, averaged)", 1000 * opLatencies[op].min(), true));
|
||||
}
|
||||
|
||||
//insert logging metrics if exists
|
||||
m.insert(m.end(), periodicMetrics.begin(), periodicMetrics.end());
|
||||
}
|
||||
}
|
||||
static std::string randStr(int len) {
|
||||
std::string result(len, '.');
|
||||
for (int i = 0; i < len; ++i) {
|
||||
result[i] = deterministicRandom()->randomAlphaNumeric();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
static void randStr(char *str, int len){
|
||||
for (int i = 0; i < len; ++i) {
|
||||
str[i] = deterministicRandom()->randomAlphaNumeric();
|
||||
}
|
||||
}
|
||||
|
||||
Value randomValue() {
|
||||
const int length = deterministicRandom()->randomInt(minValueBytes, maxValueBytes + 1);
|
||||
std::string valueString = randStr(length);
|
||||
return StringRef(reinterpret_cast<const uint8_t*>(valueString.c_str()), length);
|
||||
}
|
||||
|
||||
Key keyForIndex(uint64_t ind) {
|
||||
Key result = makeString(keyBytes);
|
||||
char* data = reinterpret_cast<char*>(mutateString(result));
|
||||
format((KEYPREFIX + "%0*d").c_str(), seqNumLen, ind).copy(data, KEYPREFIXLEN + seqNumLen);
|
||||
for (int i = KEYPREFIXLEN + seqNumLen; i < keyBytes; ++i)
|
||||
data[i] = 'x';
|
||||
return result;
|
||||
}
|
||||
|
||||
/* number of digits */
|
||||
static uint64_t digits(uint64_t num) {
|
||||
uint64_t digits = 0;
|
||||
while (num > 0) {
|
||||
num /= 10;
|
||||
digits++;
|
||||
}
|
||||
return digits;
|
||||
}
|
||||
Standalone<KeyValueRef> operator()(uint64_t n) {
|
||||
return KeyValueRef(keyForIndex(n), randomValue());
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> tracePeriodically( MakoWorkload *self){
|
||||
state double start = now();
|
||||
state double elapsed = 0.0;
|
||||
state int64_t last_ops = 0;
|
||||
state int64_t last_xacts = 0;
|
||||
|
||||
loop {
|
||||
elapsed += self->periodicLoggingInterval;
|
||||
wait( delayUntil(start + elapsed));
|
||||
TraceEvent((self->description() + "_CommitLatency").c_str()).detail("Mean", self->opLatencies[OP_COMMIT].mean()).detail("Median", self->opLatencies[OP_COMMIT].median()).detail("Percentile5", self->opLatencies[OP_COMMIT].percentile(.05)).detail("Percentile95", self->opLatencies[OP_COMMIT].percentile(.95)).detail("Count", self->opCounters[OP_COMMIT].getValue()).detail("Elapsed", elapsed);
|
||||
TraceEvent((self->description() + "_GRVLatency").c_str()).detail("Mean", self->opLatencies[OP_GETREADVERSION].mean()).detail("Median", self->opLatencies[OP_GETREADVERSION].median()).detail("Percentile5", self->opLatencies[OP_GETREADVERSION].percentile(.05)).detail("Percentile95", self->opLatencies[OP_GETREADVERSION].percentile(.95)).detail("Count", self->opCounters[OP_GETREADVERSION].getValue());
|
||||
|
||||
std::string ts = format("T=%04.0fs: ", elapsed);
|
||||
self->periodicMetrics.push_back(PerfMetric(ts + "Transactions/sec", (self->xacts.getValue() - last_xacts) / self->periodicLoggingInterval, false));
|
||||
self->periodicMetrics.push_back(PerfMetric(ts + "Operations/sec", (self->totalOps.getValue() - last_ops) / self->periodicLoggingInterval, false));
|
||||
|
||||
last_xacts = self->xacts.getValue();
|
||||
last_ops = self->totalOps.getValue();
|
||||
}
|
||||
}
|
||||
ACTOR Future<Void> _setup(Database cx, MakoWorkload* self) {
|
||||
|
||||
state Promise<double> loadTime;
|
||||
state Promise<std::vector<std::pair<uint64_t, double>>> ratesAtKeyCounts;
|
||||
|
||||
wait(bulkSetup(cx, self, self->rowCount, loadTime, self->insertionCountsToMeasure.empty(), self->warmingDelay,
|
||||
self->maxInsertRate, self->insertionCountsToMeasure, ratesAtKeyCounts));
|
||||
|
||||
// This is the setup time
|
||||
self->loadTime = loadTime.getFuture().get();
|
||||
// This is the rates of importing keys
|
||||
self->ratesAtKeyCounts = ratesAtKeyCounts.getFuture().get();
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> _start(Database cx, MakoWorkload* self) {
|
||||
// TODO: Do I need to read data to warm the cache of the keySystem like ReadWrite.actor.cpp (line 465)?
|
||||
if (self->runBenchmark) {
|
||||
wait(self->_runBenchmark(cx, self));
|
||||
}
|
||||
if (!self->preserveData && self->clientId == 0){
|
||||
wait(self->cleanup(cx, self));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> _runBenchmark(Database cx, MakoWorkload* self){
|
||||
std::vector<Future<Void>> clients;
|
||||
for (int c = 0; c < self->actorCountPerClient; ++c) {
|
||||
clients.push_back(self->makoClient(cx, self, self->actorCountPerClient / self->transactionsPerSecond, c));
|
||||
}
|
||||
|
||||
if (self->enableLogging)
|
||||
clients.push_back(tracePeriodically(self));
|
||||
|
||||
wait( timeout( waitForAll( clients ), self->testDuration, Void() ) );
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> makoClient(Database cx, MakoWorkload* self, double delay, int actorIndex) {
|
||||
|
||||
state Key rkey, rkey2;
|
||||
state Value rval;
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
state bool doCommit;
|
||||
state int i, count;
|
||||
state uint64_t range, indBegin, indEnd, rangeLen;
|
||||
state double lastTime = now();
|
||||
state double commitStart;
|
||||
state KeyRangeRef rkeyRangeRef;
|
||||
state std::vector<int> perOpCount(MAX_OP, 0);
|
||||
|
||||
TraceEvent("ClientStarting").detail("ActorIndex", actorIndex).detail("ClientIndex", self->clientId).detail("NumActors", self->actorCountPerClient);
|
||||
|
||||
loop {
|
||||
// used for throttling
|
||||
wait(poisson(&lastTime, delay));
|
||||
try{
|
||||
// user-defined value: whether commit read-only ops or not; default is false
|
||||
doCommit = self->commitGet;
|
||||
for (i = 0; i < MAX_OP; ++i) {
|
||||
if (i == OP_COMMIT)
|
||||
continue;
|
||||
for (count = 0; count < self->operations[i][0]; ++count) {
|
||||
range = std::min(RANGELIMIT, self->operations[i][1]);
|
||||
rangeLen = digits(range);
|
||||
// generate random key-val pair for operation
|
||||
indBegin = self->getRandomKey(self->rowCount);
|
||||
rkey = self->keyForIndex(indBegin);
|
||||
rval = self->randomValue();
|
||||
indEnd = std::min(indBegin + range, self->rowCount);
|
||||
rkey2 = self->keyForIndex(indEnd);
|
||||
// KeyRangeRef(min, maxPlusOne)
|
||||
rkeyRangeRef = KeyRangeRef(rkey, rkey2);
|
||||
|
||||
if (i == OP_GETREADVERSION){
|
||||
wait(logLatency(tr.getReadVersion(), &self->opLatencies[i]));
|
||||
}
|
||||
else if (i == OP_GET){
|
||||
wait(logLatency(tr.get(rkey, false), &self->opLatencies[i]));
|
||||
} else if (i == OP_GETRANGE){
|
||||
wait(logLatency(tr.getRange(rkeyRangeRef, RANGELIMIT, false), &self->opLatencies[i]));
|
||||
}
|
||||
else if (i == OP_SGET){
|
||||
wait(logLatency(tr.get(rkey, true), &self->opLatencies[i]));
|
||||
} else if (i == OP_SGETRANGE){
|
||||
//do snapshot get range here
|
||||
wait(logLatency(tr.getRange(rkeyRangeRef, RANGELIMIT, true), &self->opLatencies[i]));
|
||||
} else if (i == OP_UPDATE){
|
||||
wait(logLatency(tr.get(rkey, false), &self->opLatencies[OP_GET]));
|
||||
tr.set(rkey, rval);
|
||||
doCommit = true;
|
||||
} else if (i == OP_INSERT){
|
||||
// generate an (almost) unique key here, it starts with 'mako' and then comes with randomly generated characters
|
||||
randStr(reinterpret_cast<char*>(mutateString(rkey)) + self->KEYPREFIXLEN, self->keyBytes-self->KEYPREFIXLEN);
|
||||
tr.set(rkey, rval);
|
||||
doCommit = true;
|
||||
} else if (i == OP_INSERTRANGE){
|
||||
char *rkeyPtr = reinterpret_cast<char*>(mutateString(rkey));
|
||||
randStr(rkeyPtr + self->KEYPREFIXLEN, self->keyBytes-self->KEYPREFIXLEN);
|
||||
for (int range_i = 0; range_i < range; ++range_i){
|
||||
format("%0.*d", rangeLen, range_i).copy(rkeyPtr + self->keyBytes - rangeLen, rangeLen);
|
||||
tr.set(rkey, self->randomValue());
|
||||
}
|
||||
doCommit = true;
|
||||
} else if (i == OP_CLEAR){
|
||||
tr.clear(rkey);
|
||||
doCommit = true;
|
||||
} else if(i == OP_SETCLEAR){
|
||||
randStr(reinterpret_cast<char*>(mutateString(rkey)) + self->KEYPREFIXLEN, self->keyBytes-self->KEYPREFIXLEN);
|
||||
tr.set(rkey, rval);
|
||||
// commit the change and update metrics
|
||||
commitStart = now();
|
||||
wait(tr.commit());
|
||||
self->opLatencies[OP_COMMIT].addSample(now() - commitStart);
|
||||
++perOpCount[OP_COMMIT];
|
||||
tr.reset();
|
||||
tr.clear(rkey);
|
||||
doCommit = true;
|
||||
} else if (i == OP_CLEARRANGE){
|
||||
tr.clear(rkeyRangeRef);
|
||||
doCommit = true;
|
||||
} else if (i == OP_SETCLEARRANGE){
|
||||
char *rkeyPtr = reinterpret_cast<char*>(mutateString(rkey));
|
||||
randStr(rkeyPtr + self->KEYPREFIXLEN, self->keyBytes-self->KEYPREFIXLEN);
|
||||
state std::string scr_start_key;
|
||||
state std::string scr_end_key;
|
||||
for (int range_i = 0; range_i < range; ++range_i){
|
||||
format("%0.*d", rangeLen, range_i).copy(rkeyPtr + self->keyBytes - rangeLen, rangeLen);
|
||||
tr.set(rkey, self->randomValue());
|
||||
if (range_i == 0)
|
||||
scr_start_key = rkey.toString();
|
||||
}
|
||||
scr_end_key = rkey.toString();
|
||||
commitStart = now();
|
||||
wait(tr.commit());
|
||||
self->opLatencies[OP_COMMIT].addSample(now() - commitStart);
|
||||
++perOpCount[OP_COMMIT];
|
||||
tr.reset();
|
||||
tr.clear(KeyRangeRef(StringRef(scr_start_key), StringRef(scr_end_key)));
|
||||
doCommit = true;
|
||||
}
|
||||
++perOpCount[i];
|
||||
}
|
||||
}
|
||||
|
||||
if (doCommit) {
|
||||
commitStart = now();
|
||||
wait(tr.commit());
|
||||
self->opLatencies[OP_COMMIT].addSample(now() - commitStart);
|
||||
++perOpCount[OP_COMMIT];
|
||||
}
|
||||
// successfully finish the transaction, update metrics
|
||||
++self->xacts;
|
||||
for (int op = 0; op < MAX_OP; ++op){
|
||||
self->opCounters[op] += perOpCount[op];
|
||||
self->totalOps += perOpCount[op];
|
||||
}
|
||||
} catch (Error& e) {
|
||||
TraceEvent("FailedToExecOperations").error(e);
|
||||
if (e.code() == error_code_operation_cancelled)
|
||||
throw;
|
||||
else if (e.code() == error_code_not_committed)
|
||||
++self->conflicts;
|
||||
|
||||
wait(tr.onError(e));
|
||||
++self->retries;
|
||||
}
|
||||
// reset all the operations' counters to 0
|
||||
std::fill(perOpCount.begin(), perOpCount.end(), 0);
|
||||
tr.reset();
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> cleanup(Database cx, MakoWorkload* self){
|
||||
// clear all data starts with 'mako' in the database
|
||||
state std::string keyPrefix(self->KEYPREFIX);
|
||||
state ReadYourWritesTransaction tr(cx);
|
||||
|
||||
loop{
|
||||
try {
|
||||
tr.clear(prefixRange(keyPrefix));
|
||||
wait(tr.commit());
|
||||
break;
|
||||
} catch (Error &e){
|
||||
TraceEvent("FailedToCleanData").error(e);
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
ACTOR template<class T>
|
||||
static Future<Void> logLatency(Future<T> f, ContinuousSample<double>* opLatencies){
|
||||
state double opBegin = now();
|
||||
T value = wait(f);
|
||||
opLatencies->addSample(now() - opBegin);
|
||||
return Void();
|
||||
}
|
||||
|
||||
int64_t getRandomKey(uint64_t rowCount) {
|
||||
// TODO: support other distribution like zipf
|
||||
return deterministicRandom()->randomInt64(0, rowCount);
|
||||
}
|
||||
void parseOperationsSpec() {
|
||||
const char *ptr = operationsSpec.c_str();
|
||||
int op = 0;
|
||||
int rangeop = 0;
|
||||
int num;
|
||||
int error = 0;
|
||||
|
||||
for (op = 0; op < MAX_OP; op++) {
|
||||
operations[op][OP_COUNT] = 0;
|
||||
operations[op][OP_RANGE] = 0;
|
||||
}
|
||||
|
||||
op = 0;
|
||||
while (*ptr) {
|
||||
if (strncmp(ptr, "grv", 3) == 0) {
|
||||
op = OP_GETREADVERSION;
|
||||
ptr += 3;
|
||||
} else if (strncmp(ptr, "gr", 2) == 0) {
|
||||
op = OP_GETRANGE;
|
||||
rangeop = 1;
|
||||
ptr += 2;
|
||||
} else if (strncmp(ptr, "g", 1) == 0) {
|
||||
op = OP_GET;
|
||||
ptr++;
|
||||
} else if (strncmp(ptr, "sgr", 3) == 0) {
|
||||
op = OP_SGETRANGE;
|
||||
rangeop = 1;
|
||||
ptr += 3;
|
||||
} else if (strncmp(ptr, "sg", 2) == 0) {
|
||||
op = OP_SGET;
|
||||
ptr += 2;
|
||||
} else if (strncmp(ptr, "u", 1) == 0) {
|
||||
op = OP_UPDATE;
|
||||
ptr++;
|
||||
} else if (strncmp(ptr, "ir", 2) == 0) {
|
||||
op = OP_INSERTRANGE;
|
||||
rangeop = 1;
|
||||
ptr += 2;
|
||||
} else if (strncmp(ptr, "i", 1) == 0) {
|
||||
op = OP_INSERT;
|
||||
ptr++;
|
||||
} else if (strncmp(ptr, "cr", 2) == 0) {
|
||||
op = OP_CLEARRANGE;
|
||||
rangeop = 1;
|
||||
ptr += 2;
|
||||
} else if (strncmp(ptr, "c", 1) == 0) {
|
||||
op = OP_CLEAR;
|
||||
ptr++;
|
||||
} else if (strncmp(ptr, "scr", 3) == 0) {
|
||||
op = OP_SETCLEARRANGE;
|
||||
rangeop = 1;
|
||||
ptr += 3;
|
||||
} else if (strncmp(ptr, "sc", 2) == 0) {
|
||||
op = OP_SETCLEAR;
|
||||
ptr += 2;
|
||||
} else {
|
||||
error = 1;
|
||||
break;
|
||||
}
|
||||
|
||||
/* count */
|
||||
num = 0;
|
||||
if ((*ptr < '0') || (*ptr > '9')) {
|
||||
num = 1; /* if omitted, set it to 1 */
|
||||
} else {
|
||||
while ((*ptr >= '0') && (*ptr <= '9')) {
|
||||
num = num * 10 + *ptr - '0';
|
||||
ptr++;
|
||||
}
|
||||
}
|
||||
/* set count */
|
||||
operations[op][OP_COUNT] = num;
|
||||
|
||||
if (rangeop) {
|
||||
if (*ptr != ':') {
|
||||
error = 1;
|
||||
break;
|
||||
} else {
|
||||
ptr++; /* skip ':' */
|
||||
num = 0;
|
||||
if ((*ptr < '0') || (*ptr > '9')) {
|
||||
error = 1;
|
||||
break;
|
||||
}
|
||||
while ((*ptr >= '0') && (*ptr <= '9')) {
|
||||
num = num * 10 + *ptr - '0';
|
||||
ptr++;
|
||||
}
|
||||
/* set range */
|
||||
if (num > RANGELIMIT)
|
||||
TraceEvent(SevError, "RangeExceedLimit").detail("RangeLimit", RANGELIMIT).detail("Range", num);
|
||||
operations[op][OP_RANGE] = num;
|
||||
}
|
||||
}
|
||||
rangeop = 0;
|
||||
}
|
||||
|
||||
if (error) {
|
||||
TraceEvent(SevError, "InvalidTransactionSpecification").detail("operations", operationsSpec);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
WorkloadFactory<MakoWorkload> MakoloadFactory("Mako");
|
|
@ -89,6 +89,9 @@ elseif(WIN32)
|
|||
endif()
|
||||
target_link_libraries(flow PRIVATE ${FLOW_LIBS})
|
||||
target_link_libraries(flow PUBLIC boost_target Threads::Threads ${CMAKE_DL_LIBS})
|
||||
if(USE_VALGRIND)
|
||||
target_link_libraries(flow PUBLIC Valgrind)
|
||||
endif()
|
||||
if(NOT WITH_TLS)
|
||||
target_compile_definitions(flow PUBLIC TLS_DISABLED)
|
||||
else()
|
||||
|
|
|
@ -127,6 +127,8 @@ void setThreadLocalDeterministicRandomSeed(uint32_t seed);
|
|||
|
||||
// Returns the random number generator that can be seeded. This generator should only
|
||||
// be used in contexts where the choice to call it is deterministic.
|
||||
//
|
||||
// This generator is only deterministic if given a seed using setThreadLocalDeterministicRandomSeed
|
||||
Reference<IRandom> deterministicRandom();
|
||||
|
||||
// A random number generator that cannot be manually seeded and may be called in
|
||||
|
|
|
@ -42,7 +42,6 @@
|
|||
#include <sys/types.h>
|
||||
#include <time.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/wait.h>
|
||||
#include <fcntl.h>
|
||||
#include "flow/UnitTest.h"
|
||||
#include "flow/FaultInjection.h"
|
||||
|
@ -2844,28 +2843,6 @@ void setupSlowTaskProfiler() {
|
|||
#endif
|
||||
}
|
||||
|
||||
#ifdef __linux__
|
||||
// There's no good place to put this, so it's here.
|
||||
// Ubuntu's packaging of libstdc++_pic offers different symbols than libstdc++. Go figure.
|
||||
// Notably, it's missing a definition of std::istream::ignore(long), which causes compilation errors
|
||||
// in the bindings. Thus, we provide weak versions of their definitions, so that if the
|
||||
// linked-against libstdc++ is missing their definitions, we'll be able to use the provided
|
||||
// ignore(long, int) version.
|
||||
#include <istream>
|
||||
namespace std {
|
||||
typedef basic_istream<char, std::char_traits<char>> char_basic_istream;
|
||||
template <>
|
||||
char_basic_istream& __attribute__((weak)) char_basic_istream::ignore(streamsize count) {
|
||||
return ignore(count, std::char_traits<char>::eof());
|
||||
}
|
||||
typedef basic_istream<wchar_t, std::char_traits<wchar_t>> wchar_basic_istream;
|
||||
template <>
|
||||
wchar_basic_istream& __attribute__((weak)) wchar_basic_istream::ignore(streamsize count) {
|
||||
return ignore(count, std::char_traits<wchar_t>::eof());
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
// UnitTest for getMemoryInfo
|
||||
#ifdef __linux__
|
||||
TEST_CASE("/flow/Platform/getMemoryInfo") {
|
||||
|
|
|
@ -331,11 +331,11 @@ struct TraceableStringImpl : std::true_type {
|
|||
result.push_back('\\');
|
||||
result.push_back('\\');
|
||||
} else {
|
||||
const uint8_t byte = *iter;
|
||||
result.push_back('\\');
|
||||
result.push_back('x');
|
||||
// In order to handle negative values properly, wrap around the top (256)
|
||||
result.push_back(base16Char((*iter < 0 ? *iter + 256 : *iter) / 16));
|
||||
result.push_back(base16Char(*iter));
|
||||
result.push_back(base16Char(byte / 16));
|
||||
result.push_back(base16Char(byte));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
|
|
|
@ -37,7 +37,7 @@ void setThreadLocalDeterministicRandomSeed(uint32_t seed) {
|
|||
|
||||
Reference<IRandom> deterministicRandom() {
|
||||
if(!seededRandom) {
|
||||
seededRandom = Reference<IRandom>(new DeterministicRandom(1, true));
|
||||
seededRandom = Reference<IRandom>(new DeterministicRandom(platform::getRandomSeed(), true));
|
||||
}
|
||||
return seededRandom;
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@
|
|||
|
||||
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
|
||||
<Product Name='$(var.Title)'
|
||||
Id='{311BF306-11DD-487C-B2BC-D2A1D85DFCA3}'
|
||||
Id='{03A2BD79-EF30-41BA-A6D3-8563B7D46810}'
|
||||
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
|
||||
Version='$(var.Version)'
|
||||
Manufacturer='$(var.Manufacturer)'
|
||||
|
|
|
@ -47,6 +47,7 @@ add_fdb_test(TEST_FILES DataDistributionMetrics.txt IGNORE)
|
|||
add_fdb_test(TEST_FILES DiskDurability.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES FileSystem.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES Happy.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES Mako.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES IncrementalDelete.txt IGNORE)
|
||||
add_fdb_test(TEST_FILES KVStoreMemTest.txt UNIT IGNORE)
|
||||
add_fdb_test(TEST_FILES KVStoreReadMostly.txt UNIT IGNORE)
|
||||
|
@ -99,6 +100,7 @@ add_fdb_test(TEST_FILES fast/IncrementTest.txt)
|
|||
add_fdb_test(TEST_FILES fast/InventoryTestAlmostReadOnly.txt)
|
||||
add_fdb_test(TEST_FILES fast/InventoryTestSomeWrites.txt)
|
||||
add_fdb_test(TEST_FILES fast/KillRegionCycle.txt)
|
||||
add_fdb_test(TEST_FILES fast/LocalRatekeeper.txt)
|
||||
add_fdb_test(TEST_FILES fast/LongStackWriteDuringRead.txt)
|
||||
add_fdb_test(TEST_FILES fast/LowLatency.txt)
|
||||
add_fdb_test(TEST_FILES fast/MemoryLifetime.txt)
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
testTitle=MakoTest
|
||||
testName=Mako
|
||||
testDuration=10.0
|
||||
transactionsPerSecond=100000
|
||||
rows=1000000
|
||||
sampleSize=100
|
||||
valueBytes=16
|
||||
keyBytes=16
|
||||
operations=g5gr5:10i5ir5:10grv5
|
||||
actorCountPerClient=256
|
||||
enableLogging=false
|
||||
commitGet=false
|
||||
populateData=true
|
||||
runBenchmark=true
|
||||
preserveData=true
|
|
@ -0,0 +1,8 @@
|
|||
testTitle=LocalRateKeeper
|
||||
testName=LocalRatekeeper
|
||||
startAfter=60.0
|
||||
blockWritesFor=80.0
|
||||
|
||||
testName=Cycle
|
||||
transactionsPerSecond=25
|
||||
testDuration=200
|
Loading…
Reference in New Issue