Merge branch 'main' of github.com:apple/foundationdb into tenant-restarting-tests

This commit is contained in:
Jon Fu 2022-10-12 11:59:27 -07:00
commit a3d42c6073
35 changed files with 1495 additions and 244 deletions

View File

@ -56,7 +56,7 @@ endfunction()
# all these tests in serialized order and within the same directory. This is
# useful for restart tests
function(add_fdb_test)
set(options UNIT IGNORE)
set(options UNIT IGNORE LONG_RUNNING)
set(oneValueArgs TEST_NAME TIMEOUT)
set(multiValueArgs TEST_FILES)
cmake_parse_arguments(ADD_FDB_TEST "${options}" "${oneValueArgs}" "${multiValueArgs}" "${ARGN}")
@ -106,6 +106,9 @@ function(add_fdb_test)
if(ADD_FDB_TEST_UNIT)
message(STATUS
"ADDING UNIT TEST ${assigned_id} ${test_name}")
elseif(ADD_FDB_TEST_LONG_RUNNING)
message(STATUS
"ADDING LONG RUNNING TEST ${assigned_id} ${test_name}")
else()
message(STATUS
"ADDING SIMULATOR TEST ${assigned_id} ${test_name}")
@ -150,9 +153,15 @@ function(add_fdb_test)
endif()
endif()
# set variables used for generating test packages
set(TEST_NAMES ${TEST_NAMES} ${test_name} PARENT_SCOPE)
set(TEST_FILES_${test_name} ${ADD_FDB_TEST_TEST_FILES} PARENT_SCOPE)
set(TEST_TYPE_${test_name} ${test_type} PARENT_SCOPE)
if(ADD_FDB_TEST_LONG_RUNNING)
set(LONG_RUNNING_TEST_NAMES ${LONG_RUNNING_TEST_NAMES} ${test_name} PARENT_SCOPE)
set(LONG_RUNNING_TEST_FILES_${test_name} ${ADD_FDB_TEST_TEST_FILES} PARENT_SCOPE)
set(LONG_RUNNING_TEST_TYPE_${test_name} ${test_type} PARENT_SCOPE)
else()
set(TEST_NAMES ${TEST_NAMES} ${test_name} PARENT_SCOPE)
set(TEST_FILES_${test_name} ${ADD_FDB_TEST_TEST_FILES} PARENT_SCOPE)
set(TEST_TYPE_${test_name} ${test_type} PARENT_SCOPE)
endif()
endfunction()
if(NOT WIN32)
@ -167,14 +176,21 @@ endif()
# - OUT_DIR the directory where files will be staged
# - CONTEXT the type of correctness package being built (e.g. 'valgrind correctness')
function(stage_correctness_package)
set(options LONG_RUNNING)
set(oneValueArgs OUT_DIR CONTEXT OUT_FILES)
cmake_parse_arguments(STAGE "" "${oneValueArgs}" "" "${ARGN}")
set(multiValueArgs TEST_LIST)
cmake_parse_arguments(STAGE "${options}" "${oneValueArgs}" "${multiValueArgs}" "${ARGN}")
file(MAKE_DIRECTORY ${STAGE_OUT_DIR}/bin)
string(LENGTH "${CMAKE_SOURCE_DIR}/tests/" base_length)
foreach(test IN LISTS TEST_NAMES)
foreach(test IN LISTS STAGE_TEST_LIST)
if((${test} MATCHES ${TEST_PACKAGE_INCLUDE}) AND
(NOT ${test} MATCHES ${TEST_PACKAGE_EXCLUDE}))
foreach(file IN LISTS TEST_FILES_${test})
string(LENGTH "${CMAKE_SOURCE_DIR}/tests/" base_length)
if(STAGE_LONG_RUNNING)
set(TEST_FILES_PREFIX "LONG_RUNNING_TEST_FILES")
else()
set(TEST_FILES_PREFIX "TEST_FILES")
endif()
foreach(file IN LISTS ${TEST_FILES_PREFIX}_${test})
string(SUBSTRING ${file} ${base_length} -1 rel_out_file)
set(out_file ${STAGE_OUT_DIR}/tests/${rel_out_file})
list(APPEND test_files ${out_file})
@ -265,7 +281,7 @@ function(create_correctness_package)
return()
endif()
set(out_dir "${CMAKE_BINARY_DIR}/correctness")
stage_correctness_package(OUT_DIR ${out_dir} CONTEXT "correctness" OUT_FILES package_files)
stage_correctness_package(OUT_DIR ${out_dir} CONTEXT "correctness" OUT_FILES package_files TEST_LIST "${TEST_NAMES}")
set(tar_file ${CMAKE_BINARY_DIR}/packages/correctness-${FDB_VERSION}.tar.gz)
add_custom_command(
OUTPUT ${tar_file}
@ -294,13 +310,47 @@ function(create_correctness_package)
add_dependencies(package_tests_u package_tests)
endfunction()
function(create_long_running_correctness_package)
if(WIN32)
return()
endif()
set(out_dir "${CMAKE_BINARY_DIR}/long_running_correctness")
stage_correctness_package(OUT_DIR ${out_dir} CONTEXT "long running correctness" OUT_FILES package_files TEST_LIST "${LONG_RUNNING_TEST_NAMES}" LONG_RUNNING)
set(tar_file ${CMAKE_BINARY_DIR}/packages/long-running-correctness-${FDB_VERSION}.tar.gz)
add_custom_command(
OUTPUT ${tar_file}
DEPENDS ${package_files}
${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/correctnessTest.sh
${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/correctnessTimeout.sh
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/correctnessTest.sh
${out_dir}/joshua_test
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/correctnessTimeout.sh
${out_dir}/joshua_timeout
COMMAND ${CMAKE_COMMAND} -E tar cfz ${tar_file} ${package_files}
${out_dir}/joshua_test
${out_dir}/joshua_timeout
WORKING_DIRECTORY ${out_dir}
COMMENT "Package long running correctness archive"
)
add_custom_target(package_long_running_tests ALL DEPENDS ${tar_file})
add_dependencies(package_long_running_tests strip_only_fdbserver TestHarness)
set(unversioned_tar_file "${CMAKE_BINARY_DIR}/packages/long_running_correctness.tar.gz")
add_custom_command(
OUTPUT "${unversioned_tar_file}"
DEPENDS "${tar_file}"
COMMAND ${CMAKE_COMMAND} -E copy "${tar_file}" "${unversioned_tar_file}"
COMMENT "Copy long running correctness package to ${unversioned_tar_file}")
add_custom_target(package_long_running_tests_u DEPENDS "${unversioned_tar_file}")
add_dependencies(package_long_running_tests_u package_long_running_tests)
endfunction()
function(create_valgrind_correctness_package)
if(WIN32)
return()
endif()
if(USE_VALGRIND)
set(out_dir "${CMAKE_BINARY_DIR}/valgrind_correctness")
stage_correctness_package(OUT_DIR ${out_dir} CONTEXT "valgrind correctness" OUT_FILES package_files)
stage_correctness_package(OUT_DIR ${out_dir} CONTEXT "valgrind correctness" OUT_FILES package_files TEST_LIST "${TEST_NAMES}")
set(tar_file ${CMAKE_BINARY_DIR}/packages/valgrind-${FDB_VERSION}.tar.gz)
add_custom_command(
OUTPUT ${tar_file}

View File

@ -26,6 +26,7 @@ env_set(TRACE_PC_GUARD_INSTRUMENTATION_LIB "" STRING "Path to a library containi
env_set(PROFILE_INSTR_GENERATE OFF BOOL "If set, build FDB as an instrumentation build to generate profiles")
env_set(PROFILE_INSTR_USE "" STRING "If set, build FDB with profile")
env_set(FULL_DEBUG_SYMBOLS OFF BOOL "Generate full debug symbols")
env_set(ENABLE_LONG_RUNNING_TESTS OFF BOOL "Add a long running tests package")
set(USE_SANITIZER OFF)
if(USE_ASAN OR USE_VALGRIND OR USE_MSAN OR USE_TSAN OR USE_UBSAN)
@ -291,6 +292,19 @@ else()
# for more information.
#add_compile_options(-fno-builtin-memcpy)
if (USE_LIBCXX)
# Make sure that libc++ can be found be the platform's loader, so that thing's like cmake's "try_run" work.
find_library(LIBCXX_SO_PATH c++ /usr/local/lib)
if (LIBCXX_SO_PATH)
get_filename_component(LIBCXX_SO_DIR ${LIBCXX_SO_PATH} DIRECTORY)
if (APPLE)
set(ENV{DYLD_LIBRARY_PATH} "$ENV{DYLD_LIBRARY_PATH}:${LIBCXX_SO_DIR}")
else()
set(ENV{LD_LIBRARY_PATH} "$ENV{LD_LIBRARY_PATH}:${LIBCXX_SO_DIR}")
endif()
endif()
endif()
if (CLANG OR ICX)
if (APPLE OR USE_LIBCXX)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++")
@ -298,19 +312,6 @@ else()
if (STATIC_LINK_LIBCXX)
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libgcc -nostdlib++ -Wl,-Bstatic -lc++ -lc++abi -Wl,-Bdynamic")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -static-libgcc -nostdlib++ -Wl,-Bstatic -lc++ -lc++abi -Wl,-Bdynamic")
else()
# Make sure that libc++ can be found be the platform's loader, so that thing's like cmake's "try_run" work.
find_library(LIBCXX_SO_PATH c++ /usr/local/lib)
if (LIBCXX_SO_PATH)
get_filename_component(LIBCXX_SO_DIR ${LIBCXX_SO_PATH} DIRECTORY)
if (APPLE)
set(ENV{DYLD_LIBRARY_PATH} "$ENV{DYLD_LIBRARY_PATH}:${LIBCXX_SO_DIR}")
elseif(WIN32)
set(ENV{PATH} "$ENV{PATH};${LIBCXX_SO_DIR}")
else()
set(ENV{LD_LIBRARY_PATH} "$ENV{LD_LIBRARY_PATH}:${LIBCXX_SO_DIR}")
endif()
endif()
endif()
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -stdlib=libc++ -Wl,-build-id=sha1")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -stdlib=libc++ -Wl,-build-id=sha1")

View File

@ -128,6 +128,35 @@ set_knob(db, 'min_trace_severity', '10', None, 'description')
set_knob(db, 'min_trace_severity', '20', 'az-1', 'description')
```
### CLI Usage
Users may also utilize `fdbcli` to set and update knobs dynamically. Usage is as follows
```
setknob <knob_name> <knob_value> [config_class]
getknob <knob_name> [config_class]
```
Where `knob_name` is an existing knob, `knob_value` is the desired value to set the knob and `config_class` is the optional configuration class. Furthermore, `setknob` may be combined within a `begin\commit` to update multiple knobs atomically. If using this option, a description must follow `commit` otherwise a prompt will be shown asking for a description. The description must be non-empty. An example follows.
```
begin
setknob min_trace_severity 30
setknob tracing_udp_listener_addr 192.168.0.1
commit "fdbcli change"
```
Users may only combine knob configuration changes with other knob configuration changes in the same transaction. For example, the following is not permitted and will raise an error.
```
begin
set foo bar
setknob max_metric_size 1000
commit "change"
```
Specifically, `set, clear, get, getrange, clearrange` cannot be combined in any transaction with a `setknob` or `getknob`.
If using an individual `setknob` without being inside a `begin\commit` block, then `fdbcli` will prompt for a description as well.
#### Type checking
Knobs have implicit types attached to them when defined. For example, the knob `tracing_udp_listener_addr` is set to `"127.0.0.1"` as so the type is string. If a user invokes `setknob` on this knob with an incorrect value that is not a string, the transaction will fail.
### Disable the Configuration Database
The configuration database includes both client and server changes and is

View File

@ -203,6 +203,13 @@ The ``get`` command fetches the value of a given key. Its syntax is ``get <KEY>`
Note that :ref:`characters can be escaped <cli-escaping>` when specifying keys (or values) in ``fdbcli``.
getknob
-------
The ``getknob`` command fetches the value of a given knob that has been populated by ``setknob``. Its syntax is ``getknob <KNOBNAME> [CONFIGCLASS]``. It displays the value of ``<KNOBNAME>`` if ``<KNOBNAME>`` is present in the database and ``not found`` otherwise.
Note that :ref:`characters can be escaped <cli-escaping>` when specifying keys (or values) in ``fdbcli``.
getrange
--------
@ -395,6 +402,13 @@ The ``setclass`` command can be used to change the :ref:`process class <guidelin
The available process classes are ``unset``, ``storage``, ``transaction``, ``resolution``, ``grv_proxy``, ``commit_proxy``, ``master``, ``test``, ``unset``, ``stateless``, ``log``, ``router``, ``cluster_controller``, ``fast_restore``, ``data_distributor``, ``coordinator``, ``ratekeeper``, ``storage_cache``, ``backup``, and ``default``.
setknob
-------
The ``setknob`` command can be used to set knobs dynamically. Its syntax is ``setknob <KNOBNAME> <KNOBVALUE> [CONFIGCLASS]``. If not present in a ``begin\commit`` block, the CLI will prompt for a description of the change.
Note that :ref:`characters can be escaped <cli-escaping>` when specifying keys (or values) in ``fdbcli``.
sleep
-----

View File

@ -499,11 +499,14 @@ void initHelp() {
"transaction, and are automatically committed for you. By explicitly beginning a transaction, "
"successive operations are all performed as part of a single transaction.\n\nTo commit the "
"transaction, use the commit command. To discard the transaction, use the reset command.");
helpMap["commit"] = CommandHelp("commit",
helpMap["commit"] = CommandHelp("commit [description]",
"commit the current transaction",
"Any sets or clears executed after the start of the current transaction will be "
"committed to the database. On success, the committed version number is displayed. "
"If commit fails, the error is displayed and the transaction must be retried.");
"If commit fails, the error is displayed and the transaction must be retried. The "
"command optionally allows for a description in case the transaction targets the "
"configuration database. If no description is provided in the command, a prompt "
"will be shown asking for a relevant description of the configuration change");
helpMap["clear"] = CommandHelp(
"clear <KEY>",
"clear a key from the database",
@ -552,6 +555,14 @@ void initHelp() {
helpMap["set"] = CommandHelp("set <KEY> <VALUE>",
"set a value for a given key",
"If KEY is not already present in the database, it will be created." ESCAPINGKV);
helpMap["setknob"] = CommandHelp("setknob <KEY> <VALUE> [CONFIG_CLASS]",
"updates a knob to specified value",
"setknob will prompt for a descrption of the changes" ESCAPINGKV);
helpMap["getknob"] = CommandHelp(
"getknob <KEY> [CONFIG_CLASS]", "gets the value of the specified knob", "CONFIG_CLASS is optional." ESCAPINGK);
helpMap["option"] = CommandHelp(
"option <STATE> <OPTION> <ARG>",
"enables or disables an option",
@ -1050,12 +1061,17 @@ Future<T> stopNetworkAfter(Future<T> what) {
}
}
enum TransType { Db = 0, Config, None };
ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterConnectionFile> ccf) {
state LineNoise& linenoise = *plinenoise;
state bool intrans = false;
state TransType transtype = TransType::None;
state bool isCommitDesc = false;
state Database localDb;
state Reference<IDatabase> db;
state Reference<IDatabase> configDb;
state Reference<ITenant> tenant;
state Optional<TenantName> tenantName;
state Optional<TenantMapEntry> tenantEntry;
@ -1064,6 +1080,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
state const Reference<ITenant> managementTenant;
state Reference<ITransaction> tr;
state Reference<ITransaction> config_tr;
state Transaction trx;
state bool writeMode = false;
@ -1085,6 +1102,8 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
printf("Using cluster file `%s'.\n", ccf->getLocation().c_str());
}
db = API->createDatabase(opt.clusterFile.c_str());
configDb = API->createDatabase(opt.clusterFile.c_str());
configDb->setOption(FDBDatabaseOptions::USE_CONFIG_DATABASE);
} catch (Error& e) {
fprintf(stderr, "ERROR: %s (%d)\n", e.what(), e.code());
printf("Unable to connect to cluster from `%s'\n", ccf->getLocation().c_str());
@ -1442,23 +1461,46 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
} else {
activeOptions = FdbOptions(globalOptions);
options = &activeOptions;
getTransaction(db, tenant, tr, options, false);
intrans = true;
transtype = TransType::None;
getTransaction(db, tenant, tr, options, false);
printf("Transaction started\n");
}
continue;
}
if (tokencmp(tokens[0], "commit")) {
if (tokens.size() != 1) {
if (tokens.size() > 2) {
printUsage(tokens[0]);
is_error = true;
} else if (!intrans) {
fprintf(stderr, "ERROR: No active transaction\n");
is_error = true;
} else {
wait(commitTransaction(tr));
if (isCommitDesc && tokens.size() == 1) {
// prompt for description and add to txn
state Optional<std::string> raw;
while (!raw.present() || raw.get().empty()) {
fprintf(stdout,
"Please set a description for the change. Description must be non-empty.\n");
state Optional<std::string> rawline =
wait(makeInterruptable(linenoise.read("description: ")));
raw = rawline;
}
std::string line = raw.get();
config_tr->set("\xff\xff/description"_sr, line);
}
if (transtype == TransType::Db) {
wait(commitTransaction(tr));
} else {
if (tokens.size() > 1) {
config_tr->set("\xff\xff/description"_sr, tokens[1]);
}
wait(commitTransaction(config_tr));
}
isCommitDesc = false;
intrans = false;
transtype = TransType::None;
options = &globalOptions;
}
@ -1481,10 +1523,16 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
fprintf(stderr, "ERROR: No active transaction\n");
is_error = true;
} else {
tr->reset();
activeOptions = FdbOptions(globalOptions);
options = &activeOptions;
options->apply(tr);
if (transtype == TransType::Config) {
config_tr->reset();
} else {
tr->reset();
activeOptions = FdbOptions(globalOptions);
options = &activeOptions;
options->apply(tr);
}
isCommitDesc = false;
transtype = TransType::None;
printf("Transaction reset\n");
}
continue;
@ -1510,6 +1558,15 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
printUsage(tokens[0]);
is_error = true;
} else {
if (intrans) {
if (transtype == TransType::None) {
transtype = TransType::Db;
} else if (transtype == TransType::Config) {
fprintf(stderr, "ERROR: Cannot perform get in configuration transaction\n");
is_error = true;
continue;
}
}
state ThreadFuture<Optional<Value>> valueF =
getTransaction(db, tenant, tr, options, intrans)->get(tokens[1]);
Optional<Standalone<StringRef>> v = wait(makeInterruptable(safeThreadFutureToFuture(valueF)));
@ -1618,7 +1675,17 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
} else {
state int limit;
bool valid = true;
if (intrans) {
if (transtype == TransType::None) {
transtype = TransType::Db;
} else if (transtype == TransType::Config) {
fprintf(
stderr,
"ERROR: Cannot perform getrange or getrangekeys in configuration transaction\n");
is_error = true;
continue;
}
}
if (tokens.size() == 4) {
// INT_MAX is 10 digits; rather than
// worrying about overflow we'll just cap
@ -1707,6 +1774,15 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
printUsage(tokens[0]);
is_error = true;
} else {
if (intrans) {
if (transtype == TransType::None) {
transtype = TransType::Db;
} else if (transtype == TransType::Config) {
fprintf(stderr, "ERROR: Cannot perform set in configuration transaction\n");
is_error = true;
continue;
}
}
getTransaction(db, tenant, tr, options, intrans);
tr->set(tokens[1], tokens[2]);
@ -1717,6 +1793,91 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
continue;
}
if (tokencmp(tokens[0], "setknob")) {
if (tokens.size() > 4 || tokens.size() < 3) {
printUsage(tokens[0]);
is_error = true;
} else {
if (intrans) {
if (transtype == TransType::None) {
transtype = TransType::Config;
} else if (transtype == TransType::Db) {
fprintf(stderr, "ERROR: Cannot perform setknob in database transaction\n");
is_error = true;
isCommitDesc = false;
continue;
}
}
Tuple t;
if (tokens.size() == 4) {
t.append(tokens[3]);
} else {
t.appendNull();
}
t.append(tokens[1]);
getTransaction(configDb, tenant, config_tr, options, intrans);
config_tr->set(t.pack(), tokens[2]);
if (!intrans) {
// prompt for description and add to txn
state Optional<std::string> raw_desc;
while (!raw_desc.present() || raw_desc.get().empty()) {
fprintf(stdout,
"Please set a description for the change. Description must be non-empty\n");
state Optional<std::string> rawline_knob =
wait(makeInterruptable(linenoise.read("description: ")));
raw_desc = rawline_knob;
}
std::string line = raw_desc.get();
config_tr->set("\xff\xff/description"_sr, line);
wait(commitTransaction(config_tr));
} else {
isCommitDesc = true;
}
}
continue;
}
if (tokencmp(tokens[0], "getknob")) {
if (tokens.size() > 3 || tokens.size() < 2) {
printUsage(tokens[0]);
is_error = true;
} else {
if (intrans) {
if (transtype == TransType::None) {
transtype = TransType::Config;
} else if (transtype == TransType::Db) {
fprintf(stderr, "ERROR: Cannot perform getknob in database transaction\n");
is_error = true;
continue;
}
}
Tuple t;
if (tokens.size() == 2) {
t.appendNull();
} else {
t.append(tokens[2]);
}
t.append(tokens[1]);
state ThreadFuture<Optional<Value>> valueF_knob =
getTransaction(configDb, tenant, config_tr, options, intrans)->get(t.pack());
Optional<Standalone<StringRef>> v =
wait(makeInterruptable(safeThreadFutureToFuture(valueF_knob)));
std::string knob_class = printable(tokens[1]);
if (tokens.size() == 3) {
std::string config_class = (" in configuration class " + printable(tokens[2]));
knob_class += config_class;
}
if (v.present())
printf("`%s' is `%s'\n",
knob_class.c_str(),
Tuple::tupleToString(Tuple::unpack(v.get())).c_str());
else
printf("`%s' is not found\n", knob_class.c_str());
}
continue;
}
if (tokencmp(tokens[0], "clear")) {
if (!writeMode) {
fprintf(stderr, "ERROR: writemode must be enabled to set or clear keys in the database.\n");
@ -1728,6 +1889,15 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
printUsage(tokens[0]);
is_error = true;
} else {
if (intrans) {
if (transtype == TransType::None) {
transtype = TransType::Db;
} else if (transtype == TransType::Config) {
fprintf(stderr, "ERROR: Cannot perform clear in configuration transaction\n");
is_error = true;
continue;
}
}
getTransaction(db, tenant, tr, options, intrans);
tr->clear(tokens[1]);
@ -1749,6 +1919,15 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
printUsage(tokens[0]);
is_error = true;
} else {
if (intrans) {
if (transtype == TransType::None) {
transtype = TransType::Db;
} else if (transtype == TransType::Config) {
fprintf(stderr, "ERROR: Cannot perform clearrange in configuration transaction\n");
is_error = true;
continue;
}
}
getTransaction(db, tenant, tr, options, intrans);
tr->clear(KeyRangeRef(tokens[1], tokens[2]));
@ -1928,7 +2107,6 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
}
TraceEvent(SevInfo, "CLICommandLog", randomID).detail("Command", line).detail("IsError", is_error);
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) {
throw;

View File

@ -334,6 +334,57 @@ def consistencycheck(logger):
assert output3 == consistency_check_on_output
@enable_logging()
def knobmanagement(logger):
# this test will set knobs and verify that the knobs are properly set
# must use begin/commit to avoid prompt for description
# Incorrect arguments
output = run_fdbcli_command('setknob')
assert output == "Usage: setknob <KEY> <VALUE> [CONFIG_CLASS]"
output = run_fdbcli_command('setknob', 'min_trace_severity')
assert output == "Usage: setknob <KEY> <VALUE> [CONFIG_CLASS]"
output = run_fdbcli_command('getknob')
assert output == "Usage: getknob <KEY> [CONFIG_CLASS]"
logger.debug("incorrect args passed")
# Invalid knob name
err = run_fdbcli_command_and_get_error('begin; setknob dummy_knob 20; commit \"fdbcli change\";')
logger.debug("err is: {}".format(err))
assert len(err) > 0
logger.debug("invalid knob name passed")
# Invalid type for knob
err = run_fdbcli_command_and_get_error('begin; setknob min_trace_severity dummy-text; commit \"fdbcli change\";')
logger.debug("err is: {}".format(err))
assert len(err) > 0
logger.debug("invalid knob type passed")
# Verifying we can't do a normal set, clear, get, getrange, clearrange
# with a setknob
err = run_fdbcli_command_and_get_error('writemode on; begin; set foo bar; setknob max_metric_size 1000; commit;')
logger.debug("err is: {}".format(err))
assert len(err) > 0
err = run_fdbcli_command_and_get_error('writemode on; begin; clear foo; setknob max_metric_size 1000; commit')
logger.debug("err is: {}".format(err))
assert len(err) > 0
# Various setknobs and verified by getknob
output = run_fdbcli_command('begin; setknob min_trace_severity 30; setknob max_metric_size 1000; \
setknob tracing_udp_listener_addr 192.168.0.1; \
setknob tracing_sample_rate 0.3; \
commit \"This is an fdbcli test for knobs\";')
assert "Committed" in output
output = run_fdbcli_command('getknob', 'min_trace_severity')
assert r"`min_trace_severity' is `30'" == output
output = run_fdbcli_command('getknob', 'max_metric_size')
assert r"`max_metric_size' is `1000'" == output
output = run_fdbcli_command('getknob', 'tracing_udp_listener_addr')
assert r"`tracing_udp_listener_addr' is `'192.168.0.1''" == output
output = run_fdbcli_command('getknob', 'tracing_sample_rate')
assert r"`tracing_sample_rate' is `0.300000'" == output
@enable_logging()
def cache_range(logger):
# this command is currently experimental
@ -983,6 +1034,7 @@ if __name__ == '__main__':
versionepoch()
integer_options()
tls_address_suffix()
knobmanagement()
else:
assert args.process_number > 1, "Process number should be positive"
coordinators()

View File

@ -44,19 +44,20 @@ ConfigKey ConfigKeyRef::decodeKey(KeyRef const& key) {
}
Value KnobValueRef::ToValueFunc::operator()(int v) const {
return BinaryWriter::toValue(v, Unversioned());
// return BinaryWriter::toValue(v, Unversioned());
return Tuple::makeTuple(v).pack();
}
Value KnobValueRef::ToValueFunc::operator()(int64_t v) const {
return BinaryWriter::toValue(v, Unversioned());
return Tuple::makeTuple(v).pack();
}
Value KnobValueRef::ToValueFunc::operator()(bool v) const {
return BinaryWriter::toValue(v, Unversioned());
return Tuple::makeTuple(v).pack();
}
Value KnobValueRef::ToValueFunc::operator()(ValueRef v) const {
return v;
return Tuple::makeTuple(v).pack();
}
Value KnobValueRef::ToValueFunc::operator()(double v) const {
return BinaryWriter::toValue(v, Unversioned());
return Tuple::makeTuple(v).pack();
}
KnobValue KnobValueRef::CreatorFunc::operator()(NoKnobFound) const {

View File

@ -23,6 +23,7 @@
#include "fdbclient/BackupContainer.h"
#include "fdbclient/BlobCipher.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/GetEncryptCipherKeys.actor.h"
#include "fdbclient/JsonBuilder.h"
#include "fdbclient/KeyBackedTypes.h"
@ -649,10 +650,8 @@ struct EncryptedRangeFileWriter : public IRangeFileWriter {
return Void();
}
ACTOR static Future<Void> updateEncryptionKeysCtx(EncryptedRangeFileWriter* self,
KeyRef key,
Reference<TenantEntryCache<Void>> cache) {
state std::pair<int64_t, TenantName> curTenantInfo = wait(getEncryptionDomainDetails(key, cache));
ACTOR static Future<Void> updateEncryptionKeysCtx(EncryptedRangeFileWriter* self, KeyRef key) {
state std::pair<int64_t, TenantName> curTenantInfo = wait(getEncryptionDomainDetails(key, self));
state Reference<AsyncVar<ClientDBInfo> const> dbInfo = self->cx->clientInfo;
// Get text and header cipher key
@ -694,13 +693,12 @@ struct EncryptedRangeFileWriter : public IRangeFileWriter {
static bool isSystemKey(KeyRef key) { return key.size() && key[0] == systemKeys.begin[0]; }
ACTOR static Future<std::pair<int64_t, TenantName>> getEncryptionDomainDetailsImpl(
KeyRef key,
Reference<TenantEntryCache<Void>> tenantCache) {
ACTOR static Future<std::pair<int64_t, TenantName>>
getEncryptionDomainDetailsImpl(KeyRef key, Reference<TenantEntryCache<Void>> tenantCache, bool useTenantCache) {
if (isSystemKey(key)) {
return std::make_pair(SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_ID, FDB_SYSTEM_KEYSPACE_ENCRYPT_DOMAIN_NAME);
}
if (key.size() < TENANT_PREFIX_SIZE) {
if (key.size() < TENANT_PREFIX_SIZE || !useTenantCache) {
return std::make_pair(FDB_DEFAULT_ENCRYPT_DOMAIN_ID, FDB_DEFAULT_ENCRYPT_DOMAIN_NAME);
}
KeyRef tenantPrefix = KeyRef(key.begin(), TENANT_PREFIX_SIZE);
@ -712,10 +710,21 @@ struct EncryptedRangeFileWriter : public IRangeFileWriter {
return std::make_pair(FDB_DEFAULT_ENCRYPT_DOMAIN_ID, FDB_DEFAULT_ENCRYPT_DOMAIN_NAME);
}
static Future<std::pair<int64_t, TenantName>> getEncryptionDomainDetails(
KeyRef key,
Reference<TenantEntryCache<Void>> tenantCache) {
return getEncryptionDomainDetailsImpl(key, tenantCache);
static Future<std::pair<int64_t, TenantName>> getEncryptionDomainDetails(KeyRef key,
EncryptedRangeFileWriter* self) {
// If tenants are disabled on a cluster then don't use the TenantEntryCache as it will result in alot of
// unnecessary cache misses. For a cluster configured in TenantMode::Optional, the backup performance may
// degrade if most of the mutations belong to an invalid tenant
TenantMode mode = self->cx->clientInfo->get().tenantMode;
bool useTenantCache = mode != TenantMode::DISABLED;
if (g_network->isSimulated() && mode == TenantMode::OPTIONAL_TENANT) {
// TODO: Currently simulation tests run with optional tenant mode but most data does not belong to any
// tenant. This results in many timeouts so disable using the tenant cache until optional tenant mode
// support with backups is more performant
useTenantCache = false;
}
CODE_PROBE(useTenantCache, "using tenant cache");
return getEncryptionDomainDetailsImpl(key, self->tenantCache, useTenantCache);
}
// Handles the first block and internal blocks. Ends current block if needed.
@ -813,7 +822,7 @@ struct EncryptedRangeFileWriter : public IRangeFileWriter {
appendStringRefWithLenToBuffer(self, &endKey);
appendStringRefWithLenToBuffer(self, &newValue);
wait(newBlock(self, 0, endKey, writeValue));
wait(updateEncryptionKeysCtx(self, self->lastKey, self->tenantCache));
wait(updateEncryptionKeysCtx(self, self->lastKey));
return Void();
}
@ -825,9 +834,8 @@ struct EncryptedRangeFileWriter : public IRangeFileWriter {
if (self->lastKey.size() == 0 || k.size() == 0) {
return false;
}
state std::pair<int64_t, TenantName> curKeyTenantInfo = wait(getEncryptionDomainDetails(k, self->tenantCache));
state std::pair<int64_t, TenantName> prevKeyTenantInfo =
wait(getEncryptionDomainDetails(self->lastKey, self->tenantCache));
state std::pair<int64_t, TenantName> curKeyTenantInfo = wait(getEncryptionDomainDetails(k, self));
state std::pair<int64_t, TenantName> prevKeyTenantInfo = wait(getEncryptionDomainDetails(self->lastKey, self));
// crossing tenant boundaries so finish the current block using only the tenant prefix of the new key
if (curKeyTenantInfo.first != prevKeyTenantInfo.first) {
CODE_PROBE(true, "crossed tenant boundaries");
@ -840,7 +848,7 @@ struct EncryptedRangeFileWriter : public IRangeFileWriter {
// Start a new block if needed, then write the key and value
ACTOR static Future<Void> writeKV_impl(EncryptedRangeFileWriter* self, Key k, Value v) {
if (!self->cipherKeys.headerCipherKey.isValid() || !self->cipherKeys.textCipherKey.isValid()) {
wait(updateEncryptionKeysCtx(self, k, self->tenantCache));
wait(updateEncryptionKeysCtx(self, k));
}
state int toWrite = sizeof(int32_t) + k.size() + sizeof(int32_t) + v.size();
wait(newBlockIfNeeded(self, toWrite));
@ -862,7 +870,7 @@ struct EncryptedRangeFileWriter : public IRangeFileWriter {
// TODO (Nim): Is it possible to write empty begin and end keys?
if (k.size() > 0 &&
(!self->cipherKeys.headerCipherKey.isValid() || !self->cipherKeys.textCipherKey.isValid())) {
wait(updateEncryptionKeysCtx(self, k, self->tenantCache));
wait(updateEncryptionKeysCtx(self, k));
}
// Need to account for extra "empty" value being written in the case of crossing tenant boundaries
@ -1035,8 +1043,7 @@ private:
ACTOR static Future<Void> decodeKVPairs(StringRefReader* reader,
Standalone<VectorRef<KeyValueRef>>* results,
bool encryptedBlock,
Optional<Database> cx,
Reference<TenantEntryCache<Void>> tenantCache) {
Optional<Database> cx) {
// Read begin key, if this fails then block was invalid.
state uint32_t kLen = reader->consumeNetworkUInt32();
state const uint8_t* k = reader->consume(kLen);
@ -1091,7 +1098,7 @@ ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeRangeFileBlock(Reference<
// BACKUP_AGENT_ENCRYPTED_SNAPSHOT_FILE_VERSION
int32_t file_version = reader.consume<int32_t>();
if (file_version == BACKUP_AGENT_SNAPSHOT_FILE_VERSION) {
wait(decodeKVPairs(&reader, &results, false, cx, Reference<TenantEntryCache<Void>>()));
wait(decodeKVPairs(&reader, &results, false, cx));
} else if (file_version == BACKUP_AGENT_ENCRYPTED_SNAPSHOT_FILE_VERSION) {
CODE_PROBE(true, "decoding encrypted block");
ASSERT(cx.present());
@ -1114,8 +1121,7 @@ ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeRangeFileBlock(Reference<
StringRef decryptedData =
wait(EncryptedRangeFileWriter::decrypt(cx.get(), header, dataPayloadStart, dataLen, &results.arena()));
reader = StringRefReader(decryptedData, restore_corrupted_data());
Reference<TenantEntryCache<Void>> tenantCache = makeReference<TenantEntryCache<Void>>(cx.get());
wait(decodeKVPairs(&reader, &results, true, cx, tenantCache));
wait(decodeKVPairs(&reader, &results, true, cx));
} else {
throw restore_unsupported_file_version();
}

View File

@ -32,6 +32,7 @@
#include <vector>
#include "boost/algorithm/string.hpp"
#include "flow/CodeProbe.h"
#include "fmt/format.h"
@ -48,6 +49,7 @@
#include "fdbclient/ClusterConnectionFile.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/GlobalConfig.actor.h"
#include "fdbclient/IKnobCollection.h"
@ -188,6 +190,8 @@ void DatabaseContext::addTssMapping(StorageServerInterface const& ssi, StorageSe
TSSEndpointData(tssi.id(), tssi.getMappedKeyValues.getEndpoint(), metrics));
queueModel.updateTssEndpoint(ssi.getKeyValuesStream.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.getKeyValuesStream.getEndpoint(), metrics));
queueModel.updateTssEndpoint(ssi.changeFeedStream.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.changeFeedStream.getEndpoint(), metrics));
// non-data requests duplicated for load
queueModel.updateTssEndpoint(ssi.watchValue.getEndpoint().token.first(),
@ -198,6 +202,12 @@ void DatabaseContext::addTssMapping(StorageServerInterface const& ssi, StorageSe
TSSEndpointData(tssi.id(), tssi.getReadHotRanges.getEndpoint(), metrics));
queueModel.updateTssEndpoint(ssi.getRangeSplitPoints.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.getRangeSplitPoints.getEndpoint(), metrics));
queueModel.updateTssEndpoint(ssi.overlappingChangeFeeds.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.overlappingChangeFeeds.getEndpoint(), metrics));
// duplicated to ensure feed data cleanup
queueModel.updateTssEndpoint(ssi.changeFeedPop.getEndpoint().token.first(),
TSSEndpointData(tssi.id(), tssi.changeFeedPop.getEndpoint(), metrics));
}
}
@ -9013,6 +9023,273 @@ void DatabaseContext::setDesiredChangeFeedVersion(Version v) {
}
}
// Because two storage servers, depending on the shard map, can have different representations of a clear at the same
// version depending on their shard maps at the time of the mutation, it is non-trivial to directly compare change feed
// streams. Instead we compare the presence of data at each version. This both saves on cpu cost of validation, and
// because historically most change feed corruption bugs are the absence of entire versions, not a subset of mutations
// within a version.
struct ChangeFeedTSSValidationData {
PromiseStream<Version> ssStreamSummary;
ReplyPromiseStream<ChangeFeedStreamReply> tssStream;
Future<Void> validatorFuture;
std::deque<std::pair<Version, Version>> rollbacks;
Version popVersion = invalidVersion;
bool done = false;
ChangeFeedTSSValidationData() {}
ChangeFeedTSSValidationData(ReplyPromiseStream<ChangeFeedStreamReply> tssStream) : tssStream(tssStream) {}
void updatePopped(Version newPopVersion) { popVersion = std::max(popVersion, newPopVersion); }
bool checkRollback(const MutationsAndVersionRef& m) {
if (m.mutations.size() == 1 && m.mutations.back().param1 == lastEpochEndPrivateKey) {
if (rollbacks.empty() || rollbacks.back().second < m.version) {
Version rollbackVersion;
BinaryReader br(m.mutations.back().param2, Unversioned());
br >> rollbackVersion;
if (!rollbacks.empty()) {
ASSERT(rollbacks.back().second <= rollbackVersion);
}
rollbacks.push_back({ rollbackVersion, m.version });
}
return true;
} else {
return false;
}
}
bool shouldAddMutation(const MutationsAndVersionRef& m) {
return !done && !m.mutations.empty() && !checkRollback(m);
}
bool isRolledBack(Version v) {
return !rollbacks.empty() && rollbacks.front().first < v && rollbacks.front().second > v;
}
void send(const ChangeFeedStreamReply& ssReply) {
if (done) {
return;
}
updatePopped(ssReply.popVersion);
for (auto& it : ssReply.mutations) {
if (shouldAddMutation(it)) {
ssStreamSummary.send(it.version);
}
}
}
void complete() {
done = true;
// destroy TSS stream to stop server actor
tssStream.reset();
}
};
void handleTSSChangeFeedMismatch(const ChangeFeedStreamRequest& request,
const TSSEndpointData& tssData,
int64_t matchesFound,
Version lastMatchingVersion,
Version ssVersion,
Version tssVersion,
Version popVersion) {
if (request.canReadPopped) {
// There is a known issue where this can return different data between an SS and TSS when a feed was popped but
// the SS restarted before the pop could be persisted, for reads that can read popped data. As such, only count
// this as a mismatch when !req.canReadPopped
return;
}
CODE_PROBE(true, "TSS mismatch in stream comparison");
if (tssData.metrics->shouldRecordDetailedMismatch()) {
TraceEvent mismatchEvent(
(g_network->isSimulated() && g_simulator->tssMode == ISimulator::TSSMode::EnabledDropMutations)
? SevWarnAlways
: SevError,
"TSSMismatchChangeFeedStream");
mismatchEvent.setMaxEventLength(FLOW_KNOBS->TSS_LARGE_TRACE_SIZE);
// request info
mismatchEvent.detail("TSSID", tssData.tssId);
mismatchEvent.detail("FeedID", request.rangeID);
mismatchEvent.detail("BeginVersion", request.begin);
mismatchEvent.detail("EndVersion", request.end);
mismatchEvent.detail("StartKey", request.range.begin);
mismatchEvent.detail("EndKey", request.range.end);
mismatchEvent.detail("CanReadPopped", request.canReadPopped);
mismatchEvent.detail("PopVersion", popVersion);
mismatchEvent.detail("DebugUID", request.debugUID);
// mismatch info
mismatchEvent.detail("MatchesFound", matchesFound);
mismatchEvent.detail("LastMatchingVersion", lastMatchingVersion);
mismatchEvent.detail("SSVersion", ssVersion);
mismatchEvent.detail("TSSVersion", tssVersion);
CODE_PROBE(FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL,
"Tracing Full TSS Feed Mismatch in stream comparison");
CODE_PROBE(!FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL,
"Tracing Partial TSS Feed Mismatch in stream comparison and storing the rest in FDB");
if (!FLOW_KNOBS->LOAD_BALANCE_TSS_MISMATCH_TRACE_FULL) {
mismatchEvent.disable();
UID mismatchUID = deterministicRandom()->randomUniqueID();
tssData.metrics->recordDetailedMismatchData(mismatchUID, mismatchEvent.getFields().toString());
// record a summarized trace event instead
TraceEvent summaryEvent(
(g_network->isSimulated() && g_simulator->tssMode == ISimulator::TSSMode::EnabledDropMutations)
? SevWarnAlways
: SevError,
"TSSMismatchChangeFeedStream");
summaryEvent.detail("TSSID", tssData.tssId)
.detail("MismatchId", mismatchUID)
.detail("FeedDebugUID", request.debugUID);
}
}
}
ACTOR Future<Void> changeFeedTSSValidator(ChangeFeedStreamRequest req,
Optional<ChangeFeedTSSValidationData>* data,
TSSEndpointData tssData) {
state bool ssDone = false;
state bool tssDone = false;
state std::deque<Version> ssSummary;
state std::deque<Version> tssSummary;
ASSERT(data->present());
state int64_t matchesFound = 0;
state Version lastMatchingVersion = req.begin - 1;
loop {
// If SS stream gets error, whole stream data gets reset, so it's ok to cancel this actor
if (!ssDone && ssSummary.empty()) {
try {
Version next = waitNext(data->get().ssStreamSummary.getFuture());
ssSummary.push_back(next);
} catch (Error& e) {
if (e.code() != error_code_end_of_stream) {
data->get().complete();
if (e.code() != error_code_operation_cancelled) {
tssData.metrics->ssError(e.code());
}
throw e;
}
ssDone = true;
if (tssDone) {
data->get().complete();
return Void();
}
}
}
if (!tssDone && tssSummary.empty()) {
try {
choose {
when(ChangeFeedStreamReply nextTss = waitNext(data->get().tssStream.getFuture())) {
data->get().updatePopped(nextTss.popVersion);
for (auto& it : nextTss.mutations) {
if (data->get().shouldAddMutation(it)) {
tssSummary.push_back(it.version);
}
}
}
// if ss has result, tss needs to return it
when(wait((ssDone || !ssSummary.empty()) ? delay(2.0 * FLOW_KNOBS->LOAD_BALANCE_TSS_TIMEOUT)
: Never())) {
++tssData.metrics->tssTimeouts;
data->get().complete();
return Void();
}
}
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) {
throw e;
}
if (e.code() == error_code_end_of_stream) {
tssDone = true;
if (ssDone) {
data->get().complete();
return Void();
}
} else {
tssData.metrics->tssError(e.code());
data->get().complete();
return Void();
}
}
}
// handle rollbacks and concurrent pops
while (!ssSummary.empty() &&
(ssSummary.front() < data->get().popVersion || data->get().isRolledBack(ssSummary.front()))) {
ssSummary.pop_front();
}
while (!tssSummary.empty() &&
(tssSummary.front() < data->get().popVersion || data->get().isRolledBack(tssSummary.front()))) {
tssSummary.pop_front();
}
while (!ssSummary.empty() && !tssSummary.empty()) {
CODE_PROBE(true, "Comparing TSS change feed data");
if (ssSummary.front() != tssSummary.front()) {
CODE_PROBE(true, "TSS change feed mismatch");
handleTSSChangeFeedMismatch(req,
tssData,
matchesFound,
lastMatchingVersion,
ssSummary.front(),
tssSummary.front(),
data->get().popVersion);
data->get().complete();
return Void();
}
matchesFound++;
lastMatchingVersion = ssSummary.front();
ssSummary.pop_front();
tssSummary.pop_front();
while (!data->get().rollbacks.empty() && data->get().rollbacks.front().second <= lastMatchingVersion) {
data->get().rollbacks.pop_front();
}
}
ASSERT(!ssDone || !tssDone); // both shouldn't be done, otherwise we shouldn't have looped
if ((ssDone && !tssSummary.empty()) || (tssDone && !ssSummary.empty())) {
CODE_PROBE(true, "TSS change feed mismatch at end of stream");
handleTSSChangeFeedMismatch(req,
tssData,
matchesFound,
lastMatchingVersion,
ssDone ? -1 : ssSummary.front(),
tssDone ? -1 : tssSummary.front(),
data->get().popVersion);
data->get().complete();
return Void();
}
}
}
void maybeDuplicateTSSChangeFeedStream(ChangeFeedStreamRequest& req,
const RequestStream<ChangeFeedStreamRequest>& stream,
QueueModel* model,
Optional<ChangeFeedTSSValidationData>* tssData) {
if (model) {
Optional<TSSEndpointData> tssPair = model->getTssData(stream.getEndpoint().token.first());
if (tssPair.present()) {
CODE_PROBE(true, "duplicating feed stream to TSS");
resetReply(req);
RequestStream<ChangeFeedStreamRequest> tssRequestStream(tssPair.get().endpoint);
*tssData = Optional<ChangeFeedTSSValidationData>(
ChangeFeedTSSValidationData(tssRequestStream.getReplyStream(req)));
// tie validator actor to the lifetime of the stream being active
tssData->get().validatorFuture = changeFeedTSSValidator(req, tssData, tssPair.get());
}
}
}
ChangeFeedStorageData::~ChangeFeedStorageData() {
if (context) {
context->changeFeedUpdaters.erase(interfToken);
@ -9134,7 +9411,8 @@ ACTOR Future<Void> partialChangeFeedStream(StorageServerInterface interf,
Version end,
Reference<ChangeFeedData> feedData,
Reference<ChangeFeedStorageData> storageData,
UID debugUID) {
UID debugUID,
Optional<ChangeFeedTSSValidationData>* tssData) {
// calling lastReturnedVersion's callbacks could cause us to be cancelled
state Promise<Void> refresh = feedData->refresh;
@ -9178,6 +9456,9 @@ ACTOR Future<Void> partialChangeFeedStream(StorageServerInterface interf,
if (rep.popVersion > feedData->popVersion) {
feedData->popVersion = rep.popVersion;
}
if (tssData->present()) {
tssData->get().updatePopped(rep.popVersion);
}
if (lastEmpty != invalidVersion && !results.isEmpty()) {
for (auto& it : feedData->storageData) {
@ -9192,6 +9473,10 @@ ACTOR Future<Void> partialChangeFeedStream(StorageServerInterface interf,
while (resultLoc < rep.mutations.size()) {
wait(results.onEmpty());
if (rep.mutations[resultLoc].version >= nextVersion) {
if (tssData->present() && tssData->get().shouldAddMutation(rep.mutations[resultLoc])) {
tssData->get().ssStreamSummary.send(rep.mutations[resultLoc].version);
}
results.send(rep.mutations[resultLoc]);
if (DEBUG_CF_CLIENT_TRACE) {
@ -9388,6 +9673,11 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
state std::vector<Future<Void>> fetchers(interfs.size());
state std::vector<Future<Void>> onErrors(interfs.size());
state std::vector<MutationAndVersionStream> streams(interfs.size());
state std::vector<Optional<ChangeFeedTSSValidationData>> tssDatas;
tssDatas.reserve(interfs.size());
for (int i = 0; i < interfs.size(); i++) {
tssDatas.push_back({});
}
CODE_PROBE(interfs.size() > 10, "Large change feed merge cursor");
CODE_PROBE(interfs.size() > 100, "Very large change feed merge cursor");
@ -9395,12 +9685,12 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
state UID mergeCursorUID = UID();
state std::vector<UID> debugUIDs;
results->streams.clear();
for (auto& it : interfs) {
for (int i = 0; i < interfs.size(); i++) {
ChangeFeedStreamRequest req;
req.rangeID = rangeID;
req.begin = *begin;
req.end = end;
req.range = it.second;
req.range = interfs[i].second;
req.canReadPopped = canReadPopped;
// divide total buffer size among sub-streams, but keep individual streams large enough to be efficient
req.replyBufferSize = replyBufferSize / interfs.size();
@ -9412,7 +9702,11 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
mergeCursorUID =
UID(mergeCursorUID.first() ^ req.debugUID.first(), mergeCursorUID.second() ^ req.debugUID.second());
results->streams.push_back(it.first.changeFeedStream.getReplyStream(req));
results->streams.push_back(interfs[i].first.changeFeedStream.getReplyStream(req));
maybeDuplicateTSSChangeFeedStream(req,
interfs[i].first.changeFeedStream,
db->enableLocalityLoadBalance ? &db->queueModel : nullptr,
&tssDatas[i]);
}
results->maxSeenVersion = invalidVersion;
@ -9449,7 +9743,8 @@ ACTOR Future<Void> mergeChangeFeedStream(Reference<DatabaseContext> db,
end,
results,
results->storageData[i],
debugUIDs[i]);
debugUIDs[i],
&tssDatas[i]);
}
wait(waitForAny(onErrors) || mergeChangeFeedStreamInternal(results, interfs, streams, begin, end, mergeCursorUID));
@ -9503,7 +9798,8 @@ ACTOR Future<Void> singleChangeFeedStreamInternal(KeyRange range,
Reference<ChangeFeedData> results,
Key rangeID,
Version* begin,
Version end) {
Version end,
Optional<ChangeFeedTSSValidationData>* tssData) {
state Promise<Void> refresh = results->refresh;
ASSERT(results->streams.size() == 1);
@ -9538,6 +9834,9 @@ ACTOR Future<Void> singleChangeFeedStreamInternal(KeyRange range,
if (feedReply.popVersion > results->popVersion) {
results->popVersion = feedReply.popVersion;
}
if (tssData->present()) {
tssData->get().updatePopped(feedReply.popVersion);
}
// don't send completely empty set of mutations to promise stream
bool anyMutations = false;
@ -9552,6 +9851,10 @@ ACTOR Future<Void> singleChangeFeedStreamInternal(KeyRange range,
// stream. Anything with mutations should be strictly greater than lastReturnedVersion
ASSERT(feedReply.mutations.front().version > results->lastReturnedVersion.get());
if (tssData->present()) {
tssData->get().send(feedReply);
}
results->mutations.send(
Standalone<VectorRef<MutationsAndVersionRef>>(feedReply.mutations, feedReply.arena));
@ -9603,6 +9906,7 @@ ACTOR Future<Void> singleChangeFeedStream(Reference<DatabaseContext> db,
bool canReadPopped) {
state Database cx(db);
state ChangeFeedStreamRequest req;
state Optional<ChangeFeedTSSValidationData> tssData;
req.rangeID = rangeID;
req.begin = *begin;
req.end = end;
@ -9636,7 +9940,11 @@ ACTOR Future<Void> singleChangeFeedStream(Reference<DatabaseContext> db,
}
refresh.send(Void());
wait(results->streams[0].onError() || singleChangeFeedStreamInternal(range, results, rangeID, begin, end));
maybeDuplicateTSSChangeFeedStream(
req, interf.changeFeedStream, cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr, &tssData);
wait(results->streams[0].onError() ||
singleChangeFeedStreamInternal(range, results, rangeID, begin, end, &tssData));
return Void();
}
@ -9982,6 +10290,8 @@ ACTOR Future<Void> popChangeFeedMutationsActor(Reference<DatabaseContext> db, Ke
return Void();
}
auto model = cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr;
bool foundFailed = false;
for (int i = 0; i < locations.size() && !foundFailed; i++) {
for (int j = 0; j < locations[i].locations->size() && !foundFailed; j++) {
@ -9990,6 +10300,15 @@ ACTOR Future<Void> popChangeFeedMutationsActor(Reference<DatabaseContext> db, Ke
.isFailed()) {
foundFailed = true;
}
// for now, if any of popping SS has a TSS pair, just always use backup method
if (model && model
->getTssData(locations[i]
.locations->get(j, &StorageServerInterface::changeFeedPop)
.getEndpoint()
.token.first())
.present()) {
foundFailed = true;
}
}
}

View File

@ -342,7 +342,7 @@ void TSS_traceMismatch(TraceEvent& event,
// change feed
template <>
bool TSS_doCompare(const OverlappingChangeFeedsReply& src, const OverlappingChangeFeedsReply& tss) {
ASSERT(false);
// We duplicate for load, no need to validate replies
return true;
}

View File

@ -99,6 +99,48 @@ Tuple Tuple::unpack(StringRef const& str, bool exclude_incomplete) {
return Tuple(str, exclude_incomplete);
}
std::string Tuple::tupleToString(const Tuple& tuple) {
std::string str;
if (tuple.size() > 1) {
str += "(";
}
for (int i = 0; i < tuple.size(); ++i) {
Tuple::ElementType type = tuple.getType(i);
if (type == Tuple::NULL_TYPE) {
str += "NULL";
} else if (type == Tuple::BYTES || type == Tuple::UTF8) {
if (type == Tuple::UTF8) {
str += "u";
}
str += "\'" + tuple.getString(i).printable() + "\'";
} else if (type == Tuple::INT) {
str += format("%ld", tuple.getInt(i));
} else if (type == Tuple::FLOAT) {
str += format("%f", tuple.getFloat(i));
} else if (type == Tuple::DOUBLE) {
str += format("%f", tuple.getDouble(i));
} else if (type == Tuple::BOOL) {
str += tuple.getBool(i) ? "true" : "false";
} else if (type == Tuple::VERSIONSTAMP) {
TupleVersionstamp versionstamp = tuple.getVersionstamp(i);
str += format("Transaction Version: '%ld', BatchNumber: '%hd', UserVersion : '%hd'",
versionstamp.getVersion(),
versionstamp.getBatchNumber(),
versionstamp.getUserVersion());
} else {
ASSERT(false);
}
if (i < tuple.size() - 1) {
str += ", ";
}
}
if (tuple.size() > 1) {
str += ")";
}
return str;
}
Tuple Tuple::unpackUserType(StringRef const& str, bool exclude_incomplete) {
return Tuple(str, exclude_incomplete, true);
}

View File

@ -224,6 +224,7 @@ struct GetReadVersionReply : public BasicLoadBalancedReply {
bool rkBatchThrottled = false;
TransactionTagMap<ClientTagThrottleLimits> tagThrottleInfo;
double proxyTagThrottledDuration{ 0.0 };
VersionVector ssVersionVectorDelta;
UID proxyId; // GRV proxy ID to detect old GRV proxies at client side
@ -242,7 +243,8 @@ struct GetReadVersionReply : public BasicLoadBalancedReply {
rkDefaultThrottled,
rkBatchThrottled,
ssVersionVectorDelta,
proxyId);
proxyId,
proxyTagThrottledDuration);
}
};
@ -267,6 +269,10 @@ struct GetReadVersionRequest : TimedRequest {
TransactionPriority priority;
TransactionTagMap<uint32_t> tags;
// Not serialized, because this field does not need to be sent to master.
// It is used for reporting to clients the amount of time spent delayed by
// the TagQueue
double proxyTagThrottledDuration{ 0.0 };
Optional<UID> debugID;
ReplyPromise<GetReadVersionReply> reply;
@ -303,6 +309,8 @@ struct GetReadVersionRequest : TimedRequest {
bool operator<(GetReadVersionRequest const& rhs) const { return priority < rhs.priority; }
bool isTagged() const { return !tags.empty(); }
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, transactionCount, flags, tags, debugID, reply, spanContext, maxVersion);

View File

@ -48,6 +48,7 @@ struct Tuple {
// Note that strings can't be incomplete because they are parsed such that the end of the packed
// byte string is considered the end of the string in lieu of a specific end.
static Tuple unpack(StringRef const& str, bool exclude_incomplete = false);
static std::string tupleToString(Tuple const& tuple);
static Tuple unpackUserType(StringRef const& str, bool exclude_incomplete = false);
Tuple& append(Tuple const& tuple);

View File

@ -344,7 +344,7 @@ class TransactionEnvironment {
state Key configKey = encodeConfigKey(configClass, knobName);
state Optional<Value> value = wait(tr->get(configKey));
if (expected.present()) {
ASSERT_EQ(BinaryReader::fromStringRef<int64_t>(value.get(), Unversioned()), expected.get());
ASSERT_EQ(Tuple::unpack(value.get()).getInt(0), expected.get());
} else {
ASSERT(!value.present());
}

View File

@ -400,8 +400,8 @@ public:
void addRequests(TransactionTag tag, int count) { tagStatistics[tag].addTransactions(static_cast<double>(count)); }
uint64_t getThrottledTagChangeId() const { return throttledTagChangeId; }
PrioritizedTransactionTagMap<double> getProxyRates(int numProxies) {
PrioritizedTransactionTagMap<double> result;
TransactionTagMap<double> getProxyRates(int numProxies) {
TransactionTagMap<double> result;
lastBusyTagCount = 0;
for (auto& [tag, stats] : tagStatistics) {
@ -414,8 +414,7 @@ public:
}
if (targetTps.present()) {
auto const smoothedTargetTps = stats.updateAndGetTargetLimit(targetTps.get());
result[TransactionPriority::BATCH][tag] = result[TransactionPriority::DEFAULT][tag] =
smoothedTargetTps / numProxies;
result[tag] = smoothedTargetTps / numProxies;
} else {
te.disable();
}
@ -497,7 +496,7 @@ uint64_t GlobalTagThrottler::getThrottledTagChangeId() const {
PrioritizedTransactionTagMap<ClientTagThrottleLimits> GlobalTagThrottler::getClientRates() {
return impl->getClientRates();
}
PrioritizedTransactionTagMap<double> GlobalTagThrottler::getProxyRates(int numProxies) {
TransactionTagMap<double> GlobalTagThrottler::getProxyRates(int numProxies) {
return impl->getProxyRates(numProxies);
}
int64_t GlobalTagThrottler::autoThrottleCount() const {
@ -679,12 +678,9 @@ bool isNear(Optional<double> a, Optional<double> b) {
bool targetRateIsNear(GlobalTagThrottler& globalTagThrottler, TransactionTag tag, Optional<double> expected) {
Optional<double> rate;
auto targetRates = globalTagThrottler.getProxyRates(1);
auto it1 = targetRates.find(TransactionPriority::DEFAULT);
if (it1 != targetRates.end()) {
auto it2 = it1->second.find(tag);
if (it2 != it1->second.end()) {
rate = it2->second;
}
auto it = targetRates.find(tag);
if (it != targetRates.end()) {
rate = it->second;
}
TraceEvent("GlobalTagThrottling_RateMonitor")
.detail("Tag", tag)

View File

@ -24,11 +24,13 @@
#include "fdbclient/Notified.h"
#include "fdbclient/TransactionLineage.h"
#include "fdbclient/Tuple.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/LogSystemDiskQueueAdapter.h"
#include "fdbclient/CommitProxyInterface.h"
#include "fdbclient/GrvProxyInterface.h"
#include "fdbclient/VersionVector.h"
#include "fdbserver/GrvProxyTransactionTagThrottler.h"
#include "fdbserver/GrvTransactionRateInfo.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/LogSystemDiskQueueAdapter.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbrpc/sim_validation.h"
@ -158,83 +160,6 @@ struct GrvProxyStats {
}
};
struct GrvTransactionRateInfo {
double rate;
double limit;
double budget;
bool disabled;
Smoother smoothRate;
Smoother smoothReleased;
GrvTransactionRateInfo(double rate = 0.0)
: rate(rate), limit(0), budget(0), disabled(true), smoothRate(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW),
smoothReleased(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW) {}
void reset() {
// Determine the number of transactions that this proxy is allowed to release
// Roughly speaking, this is done by computing the number of transactions over some historical window that we
// could have started but didn't, and making that our limit. More precisely, we track a smoothed rate limit and
// release rate, the difference of which is the rate of additional transactions that we could have released
// based on that window. Then we multiply by the window size to get a number of transactions.
//
// Limit can be negative in the event that we are releasing more transactions than we are allowed (due to the
// use of our budget or because of higher priority transactions).
double releaseRate = smoothRate.smoothTotal() - smoothReleased.smoothRate();
limit = SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW * releaseRate;
}
bool canStart(int64_t numAlreadyStarted, int64_t count) const {
return numAlreadyStarted + count <=
std::min(limit + budget, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START);
}
void updateBudget(int64_t numStartedAtPriority, bool queueEmptyAtPriority, double elapsed) {
// Update the budget to accumulate any extra capacity available or remove any excess that was used.
// The actual delta is the portion of the limit we didn't use multiplied by the fraction of the window that
// elapsed.
//
// We may have exceeded our limit due to the budget or because of higher priority transactions, in which case
// this delta will be negative. The delta can also be negative in the event that our limit was negative, which
// can happen if we had already started more transactions in our window than our rate would have allowed.
//
// This budget has the property that when the budget is required to start transactions (because batches are
// big), the sum limit+budget will increase linearly from 0 to the batch size over time and decrease by the
// batch size upon starting a batch. In other words, this works equivalently to a model where we linearly
// accumulate budget over time in the case that our batches are too big to take advantage of the window based
// limits.
budget = std::max(
0.0, budget + elapsed * (limit - numStartedAtPriority) / SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW);
// If we are emptying out the queue of requests, then we don't need to carry much budget forward
// If we did keep accumulating budget, then our responsiveness to changes in workflow could be compromised
if (queueEmptyAtPriority) {
budget = std::min(budget, SERVER_KNOBS->START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET);
}
smoothReleased.addDelta(numStartedAtPriority);
}
void disable() {
disabled = true;
// Use smoothRate.setTotal(0) instead of setting rate to 0 so txns will not be throttled immediately.
smoothRate.setTotal(0);
}
void setRate(double rate) {
ASSERT(rate >= 0 && rate != std::numeric_limits<double>::infinity() && !std::isnan(rate));
this->rate = rate;
if (disabled) {
smoothRate.reset(rate);
disabled = false;
} else {
smoothRate.setTotal(rate);
}
}
};
struct GrvProxyData {
GrvProxyInterface proxy;
UID dbgid;
@ -437,7 +362,7 @@ ACTOR Future<Void> getRate(UID myID,
GetHealthMetricsReply* detailedHealthMetricsReply,
TransactionTagMap<uint64_t>* transactionTagCounter,
PrioritizedTransactionTagMap<ClientTagThrottleLimits>* clientThrottledTags,
PrioritizedTransactionTagMap<GrvTransactionRateInfo>* perTagRateInfo,
GrvProxyTransactionTagThrottler* tagThrottler,
GrvProxyStats* stats,
GrvProxyData* proxyData) {
state Future<Void> nextRequestTimer = Never();
@ -498,12 +423,7 @@ ACTOR Future<Void> getRate(UID myID,
*clientThrottledTags = std::move(rep.clientThrottledTags.get());
}
if (rep.proxyThrottledTags.present()) {
perTagRateInfo->clear();
for (const auto& [priority, tagToRate] : rep.proxyThrottledTags.get()) {
for (const auto& [tag, rate] : tagToRate) {
(*perTagRateInfo)[priority][tag].setRate(rate);
}
}
tagThrottler->updateRates(rep.proxyThrottledTags.get());
}
}
when(wait(leaseTimeout)) {
@ -537,20 +457,19 @@ void dropRequestFromQueue(Deque<GetReadVersionRequest>* queue, GrvProxyStats* st
}
// Put a GetReadVersion request into the queue corresponding to its priority.
ACTOR Future<Void> queueGetReadVersionRequests(
Reference<AsyncVar<ServerDBInfo> const> db,
SpannedDeque<GetReadVersionRequest>* systemQueue,
SpannedDeque<GetReadVersionRequest>* defaultQueue,
SpannedDeque<GetReadVersionRequest>* batchQueue,
FutureStream<GetReadVersionRequest> readVersionRequests,
PromiseStream<Void> GRVTimer,
double* lastGRVTime,
double* GRVBatchTime,
FutureStream<double> normalGRVLatency,
GrvProxyStats* stats,
GrvTransactionRateInfo* batchRateInfo,
TransactionTagMap<uint64_t>* transactionTagCounter,
PrioritizedTransactionTagMap<GrvTransactionRateInfo> const* perClientRateInfo) {
ACTOR Future<Void> queueGetReadVersionRequests(Reference<AsyncVar<ServerDBInfo> const> db,
SpannedDeque<GetReadVersionRequest>* systemQueue,
SpannedDeque<GetReadVersionRequest>* defaultQueue,
SpannedDeque<GetReadVersionRequest>* batchQueue,
FutureStream<GetReadVersionRequest> readVersionRequests,
PromiseStream<Void> GRVTimer,
double* lastGRVTime,
double* GRVBatchTime,
FutureStream<double> normalGRVLatency,
GrvProxyStats* stats,
GrvTransactionRateInfo* batchRateInfo,
TransactionTagMap<uint64_t>* transactionTagCounter,
GrvProxyTransactionTagThrottler* tagThrottler) {
getCurrentLineage()->modify(&TransactionLineage::operation) =
TransactionLineage::Operation::GetConsistentReadVersion;
loop choose {
@ -617,12 +536,16 @@ ACTOR Future<Void> queueGetReadVersionRequests(
stats->txnStartIn += req.transactionCount;
stats->txnDefaultPriorityStartIn += req.transactionCount;
++stats->defaultGRVQueueSize;
defaultQueue->push_back(req);
if (SERVER_KNOBS->ENFORCE_TAG_THROTTLING_ON_PROXIES && req.isTagged()) {
tagThrottler->addRequest(req);
} else {
defaultQueue->push_back(req);
}
// defaultQueue->span.addParent(req.spanContext);
} else {
// Return error for batch_priority GRV requests
int64_t proxiesCount = std::max((int)db->get().client.grvProxies.size(), 1);
if (batchRateInfo->rate <= (1.0 / proxiesCount)) {
if (batchRateInfo->getRate() <= (1.0 / proxiesCount)) {
req.reply.sendError(batch_transaction_throttled());
stats->txnThrottled += req.transactionCount;
} else {
@ -630,7 +553,11 @@ ACTOR Future<Void> queueGetReadVersionRequests(
stats->txnStartIn += req.transactionCount;
stats->txnBatchPriorityStartIn += req.transactionCount;
++stats->batchGRVQueueSize;
batchQueue->push_back(req);
if (SERVER_KNOBS->ENFORCE_TAG_THROTTLING_ON_PROXIES && req.isTagged()) {
tagThrottler->addRequest(req);
} else {
batchQueue->push_back(req);
}
// batchQueue->span.addParent(req.spanContext);
}
}
@ -791,6 +718,7 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture,
grvProxyData->versionVectorSizeOnGRVReply.addMeasurement(reply.ssVersionVectorDelta.size());
}
reply.proxyId = grvProxyData->dbgid;
reply.proxyTagThrottledDuration = request.proxyTagThrottledDuration;
if (!request.tags.empty()) {
auto& priorityThrottledTags = clientThrottledTags[request.priority];
@ -895,7 +823,7 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
state int64_t batchTransactionCount = 0;
state GrvTransactionRateInfo normalRateInfo(10);
state GrvTransactionRateInfo batchRateInfo(0);
state PrioritizedTransactionTagMap<GrvTransactionRateInfo> perTagRateInfo;
state GrvProxyTransactionTagThrottler tagThrottler;
state SpannedDeque<GetReadVersionRequest> systemQueue("GP:transactionStarterSystemQueue"_loc);
state SpannedDeque<GetReadVersionRequest> defaultQueue("GP:transactionStarterDefaultQueue"_loc);
@ -922,7 +850,7 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
detailedHealthMetricsReply,
&transactionTagCounter,
&clientThrottledTags,
&perTagRateInfo,
&tagThrottler,
&grvProxyData->stats,
grvProxyData));
addActor.send(queueGetReadVersionRequests(db,
@ -937,7 +865,7 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
&grvProxyData->stats,
&batchRateInfo,
&transactionTagCounter,
&perTagRateInfo));
&tagThrottler));
while (std::find(db->get().client.grvProxies.begin(), db->get().client.grvProxies.end(), proxy) ==
db->get().client.grvProxies.end()) {
@ -960,11 +888,12 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
elapsed = 1e-15;
}
normalRateInfo.reset();
batchRateInfo.reset();
tagThrottler.releaseTransactions(elapsed, defaultQueue, batchQueue);
normalRateInfo.startReleaseWindow();
batchRateInfo.startReleaseWindow();
grvProxyData->stats.transactionLimit = normalRateInfo.limit;
grvProxyData->stats.batchTransactionLimit = batchRateInfo.limit;
grvProxyData->stats.transactionLimit = normalRateInfo.getLimit();
grvProxyData->stats.batchTransactionLimit = batchRateInfo.getLimit();
int transactionsStarted[2] = { 0, 0 };
int systemTransactionsStarted[2] = { 0, 0 };
@ -1071,11 +1000,11 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
transactionCount += transactionsStarted[0] + transactionsStarted[1];
batchTransactionCount += batchTotalStarted;
normalRateInfo.updateBudget(
normalRateInfo.endReleaseWindow(
systemTotalStarted + normalTotalStarted, systemQueue.empty() && defaultQueue.empty(), elapsed);
batchRateInfo.updateBudget(systemTotalStarted + normalTotalStarted + batchTotalStarted,
systemQueue.empty() && defaultQueue.empty() && batchQueue.empty(),
elapsed);
batchRateInfo.endReleaseWindow(systemTotalStarted + normalTotalStarted + batchTotalStarted,
systemQueue.empty() && defaultQueue.empty() && batchQueue.empty(),
elapsed);
if (debugID.present()) {
g_traceBatch.addEvent("TransactionDebug",

View File

@ -0,0 +1,349 @@
/*
* GrvProxyTransactionTagThrottler.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/GrvProxyTransactionTagThrottler.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // must be last include
uint64_t GrvProxyTransactionTagThrottler::DelayedRequest::lastSequenceNumber = 0;
void GrvProxyTransactionTagThrottler::DelayedRequest::updateProxyTagThrottledDuration() {
req.proxyTagThrottledDuration = now() - startTime;
}
void GrvProxyTransactionTagThrottler::TagQueue::setRate(double rate) {
if (rateInfo.present()) {
rateInfo.get().setRate(rate);
} else {
rateInfo = GrvTransactionRateInfo(rate);
}
}
void GrvProxyTransactionTagThrottler::updateRates(TransactionTagMap<double> const& newRates) {
for (const auto& [tag, rate] : newRates) {
auto it = queues.find(tag);
if (it == queues.end()) {
queues[tag] = TagQueue(rate);
} else {
it->second.setRate(rate);
}
}
// Clean up tags that did not appear in newRates
for (auto& [tag, queue] : queues) {
if (newRates.find(tag) == newRates.end()) {
queue.rateInfo.reset();
}
}
// TODO: Use std::erase_if in C++20
for (auto it = queues.begin(); it != queues.end();) {
const auto& [tag, queue] = *it;
if (queue.requests.empty() && !queue.rateInfo.present()) {
it = queues.erase(it);
} else {
++it;
}
}
}
void GrvProxyTransactionTagThrottler::addRequest(GetReadVersionRequest const& req) {
ASSERT(req.isTagged());
auto const& tag = req.tags.begin()->first;
if (req.tags.size() > 1) {
// The GrvProxyTransactionTagThrottler assumes that each GetReadVersionRequest
// has at most one tag. If a transaction uses multiple tags and
// SERVER_KNOBS->ENFORCE_TAG_THROTTLING_ON_PROXIES is enabled, there may be
// unexpected behaviour, because only one tag is used for throttling.
TraceEvent(SevWarnAlways, "GrvProxyTransactionTagThrottler_MultipleTags")
.detail("NumTags", req.tags.size())
.detail("UsingTag", printable(tag));
}
queues[tag].requests.emplace_back(req);
}
void GrvProxyTransactionTagThrottler::releaseTransactions(double elapsed,
SpannedDeque<GetReadVersionRequest>& outBatchPriority,
SpannedDeque<GetReadVersionRequest>& outDefaultPriority) {
// Pointer to a TagQueue with some extra metadata stored alongside
struct TagQueueHandle {
// Store pointers here to avoid frequent std::unordered_map lookups
TagQueue* queue;
// Cannot be stored directly because we need to
uint32_t* numReleased;
// Sequence number of the first queued request
int64_t nextSeqNo;
bool operator<(TagQueueHandle const& rhs) const { return nextSeqNo < rhs.nextSeqNo; }
explicit TagQueueHandle(TagQueue& queue, uint32_t& numReleased) : queue(&queue), numReleased(&numReleased) {
ASSERT(!this->queue->requests.empty());
nextSeqNo = this->queue->requests.front().sequenceNumber;
}
};
// Priority queue of queues for each tag, ordered by the sequence number of the
// next request to process in each queue
std::priority_queue<TagQueueHandle> pqOfQueues;
// Track transactions released for each tag
std::vector<std::pair<TransactionTag, uint32_t>> transactionsReleased;
transactionsReleased.reserve(queues.size());
auto const transactionsReleasedInitialCapacity = transactionsReleased.capacity();
for (auto& [tag, queue] : queues) {
if (queue.rateInfo.present()) {
queue.rateInfo.get().startReleaseWindow();
}
if (!queue.requests.empty()) {
// First place the count in the transactionsReleased object,
// then pass a reference to the count to the TagQueueHandle object
// emplaced into pqOfQueues.
//
// Because we've reserved enough space in transactionsReleased
// to avoid resizing, this reference should remain valid.
// This allows each TagQueueHandle to update its number of
// numReleased counter without incurring the cost of a std::unordered_map lookup.
auto& [_, count] = transactionsReleased.emplace_back(tag, 0);
pqOfQueues.emplace(queue, count);
}
}
while (!pqOfQueues.empty()) {
auto tagQueueHandle = pqOfQueues.top();
pqOfQueues.pop();
// Used to determine when it is time to start processing another tag
auto const nextQueueSeqNo =
pqOfQueues.empty() ? std::numeric_limits<int64_t>::max() : pqOfQueues.top().nextSeqNo;
while (!tagQueueHandle.queue->requests.empty()) {
auto& delayedReq = tagQueueHandle.queue->requests.front();
auto count = delayedReq.req.tags.begin()->second;
ASSERT_EQ(tagQueueHandle.nextSeqNo, delayedReq.sequenceNumber);
if (tagQueueHandle.queue->rateInfo.present() &&
!tagQueueHandle.queue->rateInfo.get().canStart(*(tagQueueHandle.numReleased), count)) {
// Cannot release any more transaction from this tag (don't push the tag queue handle back into
// pqOfQueues)
CODE_PROBE(true, "GrvProxyTransactionTagThrottler::releaseTransactions : Throttling transaction");
break;
} else {
if (tagQueueHandle.nextSeqNo < nextQueueSeqNo) {
// Releasing transaction
*(tagQueueHandle.numReleased) += count;
delayedReq.updateProxyTagThrottledDuration();
if (delayedReq.req.priority == TransactionPriority::BATCH) {
outBatchPriority.push_back(delayedReq.req);
} else if (delayedReq.req.priority == TransactionPriority::DEFAULT) {
outDefaultPriority.push_back(delayedReq.req);
} else {
// Immediate priority transactions should bypass the GrvProxyTransactionTagThrottler
ASSERT(false);
}
tagQueueHandle.queue->requests.pop_front();
if (!tagQueueHandle.queue->requests.empty()) {
tagQueueHandle.nextSeqNo = tagQueueHandle.queue->requests.front().sequenceNumber;
}
} else {
CODE_PROBE(
true, "GrvProxyTransactionTagThrottler::releaseTransactions : Switching tags to preserve FIFO");
pqOfQueues.push(tagQueueHandle);
break;
}
}
}
}
// End release windows for queues with valid rateInfo
{
TransactionTagMap<uint32_t> transactionsReleasedMap;
for (const auto& [tag, count] : transactionsReleased) {
transactionsReleasedMap[tag] = count;
}
for (auto& [tag, queue] : queues) {
if (queue.rateInfo.present()) {
queue.rateInfo.get().endReleaseWindow(transactionsReleasedMap[tag], false, elapsed);
}
}
}
// If the capacity is increased, that means the vector has been illegally resized, potentially
// corrupting memory
ASSERT_EQ(transactionsReleased.capacity(), transactionsReleasedInitialCapacity);
}
uint32_t GrvProxyTransactionTagThrottler::size() {
return queues.size();
}
ACTOR static Future<Void> mockClient(GrvProxyTransactionTagThrottler* throttler,
TransactionPriority priority,
TagSet tagSet,
int batchSize,
double desiredRate,
TransactionTagMap<uint32_t>* counters) {
state Future<Void> timer;
state TransactionTagMap<uint32_t> tags;
for (const auto& tag : tagSet) {
tags[tag] = batchSize;
}
loop {
timer = delayJittered(static_cast<double>(batchSize) / desiredRate);
GetReadVersionRequest req;
req.tags = tags;
req.priority = priority;
throttler->addRequest(req);
wait(success(req.reply.getFuture()) && timer);
for (auto& [tag, _] : tags) {
(*counters)[tag] += batchSize;
}
}
}
ACTOR static Future<Void> mockServer(GrvProxyTransactionTagThrottler* throttler) {
state SpannedDeque<GetReadVersionRequest> outBatchPriority("TestGrvProxyTransactionTagThrottler_Batch"_loc);
state SpannedDeque<GetReadVersionRequest> outDefaultPriority("TestGrvProxyTransactionTagThrottler_Default"_loc);
loop {
state double elapsed = (0.009 + 0.002 * deterministicRandom()->random01());
wait(delay(elapsed));
throttler->releaseTransactions(elapsed, outBatchPriority, outDefaultPriority);
while (!outBatchPriority.empty()) {
outBatchPriority.front().reply.send(GetReadVersionReply{});
outBatchPriority.pop_front();
}
while (!outDefaultPriority.empty()) {
outDefaultPriority.front().reply.send(GetReadVersionReply{});
outDefaultPriority.pop_front();
}
}
}
static TransactionTag getRandomTag() {
TransactionTag result;
auto arr = new (result.arena()) uint8_t[32];
for (int i = 0; i < 32; ++i) {
arr[i] = (uint8_t)deterministicRandom()->randomInt(0, 256);
}
result.contents() = TransactionTagRef(arr, 32);
return result;
}
static bool isNear(double desired, int64_t actual) {
return std::abs(desired - actual) * 10 < desired;
}
// Rate limit set at 10, but client attempts 20 transactions per second.
// Client should be throttled to only 10 transactions per second.
TEST_CASE("/GrvProxyTransactionTagThrottler/Simple") {
state GrvProxyTransactionTagThrottler throttler;
state TagSet tagSet;
state TransactionTagMap<uint32_t> counters;
{
TransactionTagMap<double> rates;
rates["sampleTag"_sr] = 10.0;
throttler.updateRates(rates);
}
tagSet.addTag("sampleTag"_sr);
state Future<Void> client = mockClient(&throttler, TransactionPriority::DEFAULT, tagSet, 1, 20.0, &counters);
state Future<Void> server = mockServer(&throttler);
wait(timeout(client && server, 60.0, Void()));
TraceEvent("TagQuotaTest_Simple").detail("Counter", counters["sampleTag"_sr]);
ASSERT(isNear(counters["sampleTag"_sr], 60.0 * 10.0));
return Void();
}
// Clients share the available 30 transaction/second budget
TEST_CASE("/GrvProxyTransactionTagThrottler/MultiClient") {
state GrvProxyTransactionTagThrottler throttler;
state TagSet tagSet;
state TransactionTagMap<uint32_t> counters;
{
TransactionTagMap<double> rates;
rates["sampleTag"_sr] = 30.0;
throttler.updateRates(rates);
}
tagSet.addTag("sampleTag"_sr);
state std::vector<Future<Void>> clients;
clients.reserve(10);
for (int i = 0; i < 10; ++i) {
clients.push_back(mockClient(&throttler, TransactionPriority::DEFAULT, tagSet, 1, 10.0, &counters));
}
state Future<Void> server = mockServer(&throttler);
wait(timeout(waitForAll(clients) && server, 60.0, Void()));
TraceEvent("TagQuotaTest_MultiClient").detail("Counter", counters["sampleTag"_sr]);
ASSERT(isNear(counters["sampleTag"_sr], 60.0 * 30.0));
return Void();
}
// Test processing GetReadVersionRequests that batch several transactions
TEST_CASE("/GrvProxyTransactionTagThrottler/Batch") {
state GrvProxyTransactionTagThrottler throttler;
state TagSet tagSet;
state TransactionTagMap<uint32_t> counters;
{
TransactionTagMap<double> rates;
rates["sampleTag"_sr] = 10.0;
throttler.updateRates(rates);
}
tagSet.addTag("sampleTag"_sr);
state Future<Void> client = mockClient(&throttler, TransactionPriority::DEFAULT, tagSet, 5, 20.0, &counters);
state Future<Void> server = mockServer(&throttler);
wait(timeout(client && server, 60.0, Void()));
TraceEvent("TagQuotaTest_Batch").detail("Counter", counters["sampleTag"_sr]);
ASSERT(isNear(counters["sampleTag"_sr], 60.0 * 10.0));
return Void();
}
// Tests cleanup of tags that are no longer throttled.
TEST_CASE("/GrvProxyTransactionTagThrottler/Cleanup1") {
GrvProxyTransactionTagThrottler throttler;
for (int i = 0; i < 1000; ++i) {
auto const tag = getRandomTag();
TransactionTagMap<double> rates;
rates[tag] = 10.0;
throttler.updateRates(rates);
ASSERT_EQ(throttler.size(), 1);
}
return Void();
}
// Tests cleanup of tags once queues have been emptied
TEST_CASE("/GrvProxyTransactionTagThrottler/Cleanup2") {
GrvProxyTransactionTagThrottler throttler;
{
GetReadVersionRequest req;
req.tags["sampleTag"_sr] = 1;
req.priority = TransactionPriority::DEFAULT;
throttler.addRequest(req);
}
ASSERT_EQ(throttler.size(), 1);
throttler.updateRates(TransactionTagMap<double>{});
ASSERT_EQ(throttler.size(), 1);
{
SpannedDeque<GetReadVersionRequest> outBatchPriority("TestGrvProxyTransactionTagThrottler_Batch"_loc);
SpannedDeque<GetReadVersionRequest> outDefaultPriority("TestGrvProxyTransactionTagThrottler_Default"_loc);
throttler.releaseTransactions(0.1, outBatchPriority, outDefaultPriority);
}
// Calling updates cleans up the queues in throttler
throttler.updateRates(TransactionTagMap<double>{});
ASSERT_EQ(throttler.size(), 0);
return Void();
}

View File

@ -0,0 +1,123 @@
/*
* GrvTransactionRateInfo.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/GrvTransactionRateInfo.h"
#include "fdbserver/Knobs.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // must be last include
GrvTransactionRateInfo::GrvTransactionRateInfo(double rate)
: rate(rate), smoothRate(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW),
smoothReleased(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW) {
smoothRate.setTotal(rate);
}
bool GrvTransactionRateInfo::canStart(int64_t numAlreadyStarted, int64_t count) const {
return numAlreadyStarted + count <=
std::min(limit + budget, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START);
}
void GrvTransactionRateInfo::endReleaseWindow(int64_t numStartedAtPriority, bool queueEmptyAtPriority, double elapsed) {
// Update the budget to accumulate any extra capacity available or remove any excess that was used.
// The actual delta is the portion of the limit we didn't use multiplied by the fraction of the rate window that
// elapsed.
//
// We may have exceeded our limit due to the budget or because of higher priority transactions, in which case
// this delta will be negative. The delta can also be negative in the event that our limit was negative, which
// can happen if we had already started more transactions in our rate window than our rate would have allowed.
//
// This budget has the property that when the budget is required to start transactions (because batches are
// big), the sum limit+budget will increase linearly from 0 to the batch size over time and decrease by the
// batch size upon starting a batch. In other words, this works equivalently to a model where we linearly
// accumulate budget over time in the case that our batches are too big to take advantage of the rate window based
// limits.
//
// Note that "rate window" here indicates a period of SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW seconds,
// whereas "release window" is the period between wait statements, with duration indicated by "elapsed."
budget =
std::max(0.0, budget + elapsed * (limit - numStartedAtPriority) / SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW);
// If we are emptying out the queue of requests, then we don't need to carry much budget forward
// If we did keep accumulating budget, then our responsiveness to changes in workflow could be compromised
if (queueEmptyAtPriority) {
budget = std::min(budget, SERVER_KNOBS->START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET);
}
smoothReleased.addDelta(numStartedAtPriority);
}
void GrvTransactionRateInfo::disable() {
disabled = true;
// Use smoothRate.setTotal(0) instead of setting rate to 0 so txns will not be throttled immediately.
smoothRate.setTotal(0);
}
void GrvTransactionRateInfo::setRate(double rate) {
ASSERT(rate >= 0 && rate != std::numeric_limits<double>::infinity() && !std::isnan(rate));
this->rate = rate;
if (disabled) {
smoothRate.reset(rate);
disabled = false;
} else {
smoothRate.setTotal(rate);
}
}
void GrvTransactionRateInfo::startReleaseWindow() {
// Determine the number of transactions that this proxy is allowed to release
// Roughly speaking, this is done by computing the number of transactions over some historical window that we
// could have started but didn't, and making that our limit. More precisely, we track a smoothed rate limit and
// release rate, the difference of which is the rate of additional transactions that we could have released
// based on that window. Then we multiply by the window size to get a number of transactions.
//
// Limit can be negative in the event that we are releasing more transactions than we are allowed (due to the
// use of our budget or because of higher priority transactions).
double releaseRate = smoothRate.smoothTotal() - smoothReleased.smoothRate();
limit = SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW * releaseRate;
}
static bool isNear(double desired, int64_t actual) {
return std::abs(desired - actual) * 10 < desired;
}
ACTOR static Future<Void> mockClient(GrvTransactionRateInfo* rateInfo, double desiredRate, int64_t* counter) {
loop {
state double elapsed = (0.9 + 0.2 * deterministicRandom()->random01()) / desiredRate;
wait(delay(elapsed));
rateInfo->startReleaseWindow();
int started = rateInfo->canStart(0, 1) ? 1 : 0;
*counter += started;
rateInfo->endReleaseWindow(started, false, elapsed);
}
}
// Rate limit set at 10, but client attempts 20 transactions per second.
// Client should be throttled to only 10 transactions per second.
TEST_CASE("/GrvTransactionRateInfo/Simple") {
state GrvTransactionRateInfo rateInfo;
state int64_t counter;
rateInfo.setRate(10.0);
wait(timeout(mockClient(&rateInfo, 20.0, &counter), 60.0, Void()));
TraceEvent("GrvTransactionRateInfoTest").detail("Counter", counter);
ASSERT(isNear(60.0 * 10.0, counter));
return Void();
}

View File

@ -379,6 +379,7 @@ public:
bool allowDisablingTenants = true;
bool allowCreatingTenants = true;
bool injectTargetedSSRestart = false;
bool tenantModeRequired = false;
bool injectSSDelay = false;
// By default, tenant mode is set randomly
// If provided, set using TenantMode::fromValue
@ -448,6 +449,7 @@ public:
.add("allowDefaultTenant", &allowDefaultTenant)
.add("allowDisablingTenants", &allowDisablingTenants)
.add("allowCreatingTenants", &allowCreatingTenants)
.add("tenantModeRequired", &tenantModeRequired)
.add("randomlyRenameZoneId", &randomlyRenameZoneId)
.add("injectTargetedSSRestart", &injectTargetedSSRestart)
.add("injectSSDelay", &injectSSDelay)
@ -2476,6 +2478,7 @@ ACTOR void setupAndRun(std::string dataFolder,
state bool allowDefaultTenant = testConfig.allowDefaultTenant;
state bool allowDisablingTenants = testConfig.allowDisablingTenants;
state bool allowCreatingTenants = testConfig.allowCreatingTenants;
state bool tenantModeRequired = testConfig.tenantModeRequired;
if (!SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) {
testConfig.storageEngineExcludeTypes.push_back(5);
@ -2492,6 +2495,7 @@ ACTOR void setupAndRun(std::string dataFolder,
// TODO: persist the chosen default tenant in the restartInfo.ini file for the second test
// allowDefaultTenant = false;
// allowCreatingTenants = false;
// tenantModeRequired = false;
}
// TODO: Currently backup and restore related simulation tests are failing when run with rocksDB storage engine
@ -2548,10 +2552,10 @@ ACTOR void setupAndRun(std::string dataFolder,
if (testConfig.tenantMode != "random") {
tenantMode = TenantMode::fromValue(testConfig.tenantMode);
} else {
if (allowDefaultTenant && deterministicRandom()->random01() < 0.5) {
if (tenantModeRequired || (allowDefaultTenant && deterministicRandom()->random01() < 0.5)) {
defaultTenant = "SimulatedDefaultTenant"_sr;
tenantsToCreate.push_back_deep(tenantsToCreate.arena(), defaultTenant.get());
if (deterministicRandom()->random01() < 0.9) {
if (tenantModeRequired || deterministicRandom()->random01() < 0.9) {
tenantMode = TenantMode::REQUIRED;
} else {
tenantMode = TenantMode::OPTIONAL_TENANT;

View File

@ -0,0 +1,79 @@
/*
* GrvProxyTransactionTagThrottler.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "fdbclient/CommitProxyInterface.h"
#include "fdbclient/TagThrottle.actor.h"
#include "fdbserver/GrvTransactionRateInfo.h"
// GrvProxyTransactionTagThrottler is used to throttle GetReadVersionRequests based on tag quotas
// before they're pushed into priority-partitioned queues.
//
// A GrvTransactionRateInfo object and a request queue are maintained for each tag.
// The GrvTransactionRateInfo object is used to determine when a request can be released.
//
// Between each set of waits, releaseTransactions is run, releasing queued transactions
// that have passed the tag throttling stage. Transactions that are not yet ready
// are requeued during releaseTransactions.
class GrvProxyTransactionTagThrottler {
class DelayedRequest {
static uint64_t lastSequenceNumber;
double startTime;
public:
GetReadVersionRequest req;
uint64_t sequenceNumber;
explicit DelayedRequest(GetReadVersionRequest const& req)
: req(req), startTime(now()), sequenceNumber(++lastSequenceNumber) {}
void updateProxyTagThrottledDuration();
};
struct TagQueue {
Optional<GrvTransactionRateInfo> rateInfo;
Deque<DelayedRequest> requests;
explicit TagQueue(double rate = 0.0) : rateInfo(rate) {}
void setRate(double rate);
};
// Track the budgets for each tag
TransactionTagMap<TagQueue> queues;
public:
// Called with rates received from ratekeeper
void updateRates(TransactionTagMap<double> const& newRates);
// elapsed indicates the amount of time since the last epoch was run.
// If a request is ready to be executed, it is sent to the deque
// corresponding to its priority. If not, the request remains queued.
void releaseTransactions(double elapsed,
SpannedDeque<GetReadVersionRequest>& outBatchPriority,
SpannedDeque<GetReadVersionRequest>& outDefaultPriority);
void addRequest(GetReadVersionRequest const&);
public: // testing
// Returns number of tags tracked
uint32_t size();
};

View File

@ -0,0 +1,69 @@
/*
* GrvTransactionRateInfo.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "fdbrpc/Smoother.h"
// Used by GRV Proxy to enforce rate limits received from the Ratekeeper.
//
// Between waits, the GrvTransactionRateInfo executes a "release window" starting
// with a call to the startReleaseWindow method. Within this release window, transactions are
// released while canStart returns true. At the end of the release window, the
// endReleaseWindow method is called, and the budget is updated to add or
// remove capacity.
//
// Meanwhile, the desired rate is updated through the setRate method.
//
// Smoothers are used to avoid turbulent throttling behaviour.
class GrvTransactionRateInfo {
double rate = 0.0;
double limit{ 0.0 };
double budget{ 0.0 };
bool disabled{ true };
Smoother smoothRate;
Smoother smoothReleased;
public:
explicit GrvTransactionRateInfo(double rate = 0.0);
// Determines the number of transactions that this proxy is allowed to release
// in this release window.
void startReleaseWindow();
// Checks if a "count" new transactions can be released, given that
// "numAlreadyStarted" transactions have already been released in the
// current release window.
bool canStart(int64_t numAlreadyStarted, int64_t count) const;
// Updates the budget to accumulate any extra capacity available or remove any excess that was used.
// Call at the end of a release window.
void endReleaseWindow(int64_t numStartedAtPriority, bool queueEmptyAtPriority, double elapsed);
// Smoothly sets rate. If currently disabled, reenable
void setRate(double rate);
// Smoothly sets transaction rate to 0. Call disable when new rates have not been
// set for a sufficiently long period of time.
void disable();
double getRate() const { return rate; }
double getLimit() const { return limit; }
};

View File

@ -81,7 +81,7 @@ struct GetRateInfoReply {
// Depending on the value of SERVER_KNOBS->ENFORCE_TAG_THROTTLING_ON_PROXIES,
// one of these fields may be populated
Optional<PrioritizedTransactionTagMap<ClientTagThrottleLimits>> clientThrottledTags;
Optional<PrioritizedTransactionTagMap<double>> proxyThrottledTags;
Optional<TransactionTagMap<double>> proxyThrottledTags;
template <class Ar>
void serialize(Ar& ar) {

View File

@ -42,7 +42,7 @@ public:
// For each tag and priority combination, return the throughput limit for the cluster
// (to be shared across all GRV proxies)
virtual PrioritizedTransactionTagMap<double> getProxyRates(int numProxies) = 0;
virtual TransactionTagMap<double> getProxyRates(int numProxies) = 0;
virtual int64_t autoThrottleCount() const = 0;
virtual uint32_t busyReadTagCount() const = 0;
@ -66,7 +66,7 @@ public:
void addRequests(TransactionTag tag, int count) override;
uint64_t getThrottledTagChangeId() const override;
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() override;
PrioritizedTransactionTagMap<double> getProxyRates(int numProxies) override { throw not_implemented(); }
TransactionTagMap<double> getProxyRates(int numProxies) override { throw not_implemented(); }
int64_t autoThrottleCount() const override;
uint32_t busyReadTagCount() const override;
uint32_t busyWriteTagCount() const override;
@ -94,7 +94,7 @@ public:
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const&) override;
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() override;
PrioritizedTransactionTagMap<double> getProxyRates(int numProxies) override;
TransactionTagMap<double> getProxyRates(int numProxies) override;
// Testing only:
public:

View File

@ -2863,20 +2863,6 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
}
}
if (DEBUG_CF_TRACE) {
TraceEvent(SevDebug, "ChangeFeedMutationsDone", data->thisServerID)
.detail("FeedID", req.rangeID)
.detail("StreamUID", streamUID)
.detail("Range", req.range)
.detail("Begin", req.begin)
.detail("End", req.end)
.detail("FirstVersion", reply.mutations.empty() ? invalidVersion : reply.mutations.front().version)
.detail("LastVersion", reply.mutations.empty() ? invalidVersion : reply.mutations.back().version)
.detail("Count", reply.mutations.size())
.detail("GotAll", gotAll)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
}
if (DEBUG_CF_MISSING(req.rangeID, req.range, req.begin, reply.mutations.back().version) && !req.canReadPopped) {
bool foundVersion = false;
bool foundKey = false;
@ -2928,6 +2914,21 @@ ACTOR Future<std::pair<ChangeFeedStreamReply, bool>> getChangeFeedMutations(Stor
reply.popVersion = feedInfo->emptyVersion + 1;
if (DEBUG_CF_TRACE) {
TraceEvent(SevDebug, "ChangeFeedMutationsDone", data->thisServerID)
.detail("FeedID", req.rangeID)
.detail("StreamUID", streamUID)
.detail("Range", req.range)
.detail("Begin", req.begin)
.detail("End", req.end)
.detail("FirstVersion", reply.mutations.empty() ? invalidVersion : reply.mutations.front().version)
.detail("LastVersion", reply.mutations.empty() ? invalidVersion : reply.mutations.back().version)
.detail("PopVersion", reply.popVersion)
.detail("Count", reply.mutations.size())
.detail("GotAll", gotAll)
.detail("PeerAddr", req.reply.getEndpoint().getPrimaryAddress());
}
// If the SS's version advanced at all during any of the waits, the read from memory may have missed some
// mutations, so gotAll can only be true if data->version didn't change over the course of this actor
return std::make_pair(reply, gotAll);

View File

@ -2727,8 +2727,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
f.cancel();
state Error e = err;
bool ok = e.code() == error_code_please_reboot || e.code() == error_code_actor_cancelled ||
e.code() == error_code_please_reboot_delete;
e.code() == error_code_please_reboot_delete || e.code() == error_code_local_config_changed;
endRole(Role::WORKER, interf.id(), "WorkerError", ok, e);
errorForwarders.clear(false);
sharedLogs.clear();

View File

@ -104,7 +104,7 @@ struct AtomicRestoreWorkload : TestWorkload {
deterministicRandom()->randomInt(0, 100),
BackupAgentBase::getDefaultTagName(),
self->backupRanges,
false,
SERVER_KNOBS->ENABLE_ENCRYPTION,
StopWhenDone::False,
self->usePartitionedLogs));
} catch (Error& e) {

View File

@ -225,7 +225,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
deterministicRandom()->randomInt(0, 100),
tag.toString(),
backupRanges,
false,
SERVER_KNOBS->ENABLE_ENCRYPTION,
StopWhenDone{ !stopDifferentialDelay },
self->usePartitionedLogs));
} catch (Error& e) {
@ -485,7 +485,7 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
deterministicRandom()->randomInt(0, 100),
self->backupTag.toString(),
self->backupRanges,
false,
SERVER_KNOBS->ENABLE_ENCRYPTION,
StopWhenDone::True,
UsePartitionedLog::False);
} catch (Error& e) {

View File

@ -46,7 +46,6 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
bool allowPauses;
bool shareLogRange;
bool shouldSkipRestoreRanges;
bool enableBackupEncryption;
bool defaultBackup;
Optional<std::string> encryptionKeyFileName;
@ -62,7 +61,6 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
backupTag = getOption(options, "backupTag"_sr, BackupAgentBase::getDefaultTag());
backupRangesCount = getOption(options, "backupRangesCount"_sr, 5);
backupRangeLengthMax = getOption(options, "backupRangeLengthMax"_sr, 1);
enableBackupEncryption = getOption(options, "enableBackupEncryption"_sr, false);
abortAndRestartAfter =
getOption(options,
"abortAndRestartAfter"_sr,
@ -343,7 +341,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
deterministicRandom()->randomInt(0, 2000),
tag.toString(),
backupRanges,
self->enableBackupEncryption && SERVER_KNOBS->ENABLE_ENCRYPTION,
SERVER_KNOBS->ENABLE_ENCRYPTION,
StopWhenDone{ !stopDifferentialDelay },
UsePartitionedLog::False,
IncrementalBackupOnly::False,
@ -597,16 +595,15 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
if (!self->locked && BUGGIFY) {
TraceEvent("BARW_SubmitBackup2", randomID).detail("Tag", printable(self->backupTag));
try {
extraBackup =
backupAgent.submitBackup(cx,
"file://simfdb/backups/"_sr,
{},
deterministicRandom()->randomInt(0, 60),
deterministicRandom()->randomInt(0, 100),
self->backupTag.toString(),
self->backupRanges,
self->enableBackupEncryption && SERVER_KNOBS->ENABLE_ENCRYPTION,
StopWhenDone::True);
extraBackup = backupAgent.submitBackup(cx,
"file://simfdb/backups/"_sr,
{},
deterministicRandom()->randomInt(0, 60),
deterministicRandom()->randomInt(0, 100),
self->backupTag.toString(),
self->backupRanges,
SERVER_KNOBS->ENABLE_ENCRYPTION,
StopWhenDone::True);
} catch (Error& e) {
TraceEvent("BARW_SubmitBackup2Exception", randomID)
.error(e)

View File

@ -66,7 +66,7 @@ struct BackupToBlobWorkload : TestWorkload {
self->snapshotInterval,
self->backupTag.toString(),
backupRanges,
false));
SERVER_KNOBS->ENABLE_ENCRYPTION));
EBackupState backupStatus = wait(backupAgent.waitBackup(cx, self->backupTag.toString(), StopWhenDone::True));
TraceEvent("BackupToBlob_BackupStatus").detail("Status", BackupAgentBase::getStateText(backupStatus));
return Void();

View File

@ -44,7 +44,8 @@ class ConfigIncrementWorkload : public TestWorkload {
if (!serializedValue.present()) {
return 0;
} else {
int value = BinaryReader::fromStringRef<int>(serializedValue.get(), Unversioned());
Tuple t = Tuple::unpack(serializedValue.get());
int value = t.getInt(0);
te.detail("Value", value);
return value;
}

View File

@ -159,7 +159,7 @@ struct IncrementalBackupWorkload : TestWorkload {
1e8,
self->tag.toString(),
backupRanges,
false,
SERVER_KNOBS->ENABLE_ENCRYPTION,
StopWhenDone::False,
UsePartitionedLog::False,
IncrementalBackupOnly::True));

View File

@ -63,7 +63,7 @@ struct SubmitBackupWorkload final : TestWorkload {
self->snapshotInterval,
self->tag.toString(),
backupRanges,
false,
SERVER_KNOBS->ENABLE_ENCRYPTION,
self->stopWhenDone,
UsePartitionedLog::False,
self->incremental));

View File

@ -85,8 +85,8 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
init( WRITE_TRACING_ENABLED, true ); if( randomize && BUGGIFY ) WRITE_TRACING_ENABLED = false;
init( TRACING_SPAN_ATTRIBUTES_ENABLED, false ); // Additional K/V and tenant data added to Span Attributes
init( TRACING_SAMPLE_RATE, 0.0 ); // Fraction of distributed traces (not spans) to sample (0 means ignore all traces)
init( TRACING_UDP_LISTENER_ADDR, "127.0.0.1" ); // Only applicable if TracerType is set to a network option
init( TRACING_SAMPLE_RATE, 0.0); // Fraction of distributed traces (not spans) to sample (0 means ignore all traces)
init( TRACING_UDP_LISTENER_ADDR, "127.0.0.1"); // Only applicable if TracerType is set to a network option
init( TRACING_UDP_LISTENER_PORT, 8889 ); // Only applicable if TracerType is set to a network option
//connectionMonitor
@ -229,7 +229,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
init( ZERO_LENGTH_FILE_PAD, 1 );
init( TRACE_FLUSH_INTERVAL, 0.25 );
init( TRACE_RETRY_OPEN_INTERVAL, 1.00 );
init( MIN_TRACE_SEVERITY, isSimulated ? 1 : 10 ); // Related to the trace severity in Trace.h
init( MIN_TRACE_SEVERITY, isSimulated ? 1 : 10, Atomic::NO ); // Related to the trace severity in Trace.h
init( MAX_TRACE_SUPPRESSIONS, 1e4 );
init( TRACE_DATETIME_ENABLED, true ); // trace time in human readable format (always real time)
init( TRACE_SYNC_ENABLED, 0 );
@ -245,7 +245,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
//TDMetrics
init( MAX_METRICS, 600 );
init( MAX_METRIC_SIZE, 2500 );
init( MAX_METRIC_SIZE, 2500, Atomic::NO );
init( MAX_METRIC_LEVEL, 25 );
init( METRIC_LEVEL_DIVISOR, log(4) );
init( METRIC_LIMIT_START_QUEUE_SIZE, 10 ); // The queue size at which to start restricting logging by disabling levels

View File

@ -403,6 +403,9 @@ if(WITH_PYTHON)
if (USE_VALGRIND)
create_valgrind_correctness_package()
endif()
if (ENABLE_LONG_RUNNING_TESTS)
create_long_running_correctness_package()
endif()
endif()
if (NOT OPEN_FOR_IDE AND NOT WIN32 AND NOT USE_SANITIZER)

View File

@ -1,6 +1,7 @@
[configuration]
allowDefaultTenant = false
allowDisablingTenants = false
tenantModeRequired = true
[[knobs]]
enable_encryption = true
@ -32,7 +33,6 @@ simBackupAgents = 'BackupToFile'
[[test.workload]]
testName = 'BackupAndRestoreCorrectness'
enableBackupEncryption = true
defaultBackup = true
backupAfter = 10.0
restoreAfter = 60.0