Merge branch 'main' of github.com:apple/foundationdb into jfu-mako-active-tenants

This commit is contained in:
Jon Fu 2022-06-27 14:20:50 -07:00
commit 2dccbd7e70
157 changed files with 883 additions and 572 deletions

View File

@ -105,15 +105,12 @@ set(FDB_PACKAGE_NAME "${FDB_MAJOR}.${FDB_MINOR}")
configure_file(${CMAKE_SOURCE_DIR}/versions.target.cmake ${CMAKE_CURRENT_BINARY_DIR}/versions.target)
file(WRITE ${CMAKE_BINARY_DIR}/version.txt ${FDB_VERSION})
message(STATUS "FDB version is ${FDB_VERSION}")
message(STATUS "FDB package name is ${FDB_PACKAGE_NAME}")
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fdbclient/versions.h.cmake ${CMAKE_CURRENT_BINARY_DIR}/fdbclient/versions.h)
################################################################################
# Flow
################################################################################
include(utils)
# Flow and other tools are written in C# - so we need that dependency
include(EnableCsharp)
@ -203,12 +200,6 @@ if(CMAKE_SYSTEM_NAME STREQUAL "FreeBSD")
add_link_options(-lexecinfo)
endif()
################################################################################
# Build information
################################################################################
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fdbclient/BuildFlags.h.in ${CMAKE_CURRENT_BINARY_DIR}/fdbclient/BuildFlags.h)
################################################################################
# process compile commands for IDE
################################################################################

View File

@ -180,7 +180,7 @@ if(NOT WIN32)
add_dependencies(disconnected_timeout_unit_tests doctest)
target_include_directories(fdb_c_setup_tests PUBLIC ${DOCTEST_INCLUDE_DIR})
target_include_directories(fdb_c_unit_tests PUBLIC ${DOCTEST_INCLUDE_DIR})
target_include_directories(fdb_c_unit_tests_version_510 PUBLIC ${DOCTEST_INCLUDE_DIR})
target_include_directories(fdb_c_unit_tests_version_510 PUBLIC ${DOCTEST_INCLUDE_DIR} ${CMAKE_BINARY_DIR}/flow/include)
target_include_directories(disconnected_timeout_unit_tests PUBLIC ${DOCTEST_INCLUDE_DIR})
target_link_libraries(fdb_c_setup_tests PRIVATE fdb_c Threads::Threads)
target_link_libraries(fdb_c_unit_tests PRIVATE fdb_c Threads::Threads fdbclient rapidjson)
@ -188,18 +188,19 @@ if(NOT WIN32)
target_link_libraries(trace_partial_file_suffix_test PRIVATE fdb_c Threads::Threads flow)
target_link_libraries(disconnected_timeout_unit_tests PRIVATE fdb_c Threads::Threads)
if(USE_SANITIZER)
target_link_libraries(fdb_c_api_tester PRIVATE fdb_c fdb_cpp toml11_target Threads::Threads fmt::fmt boost_asan)
else()
target_link_libraries(fdb_c_api_tester PRIVATE fdb_c fdb_cpp toml11_target Threads::Threads fmt::fmt boost_target)
endif()
if(USE_SANITIZER)
target_link_libraries(fdb_c_api_tester PRIVATE fdb_c fdb_cpp toml11_target Threads::Threads fmt::fmt boost_asan)
else()
target_link_libraries(fdb_c_api_tester PRIVATE fdb_c fdb_cpp toml11_target Threads::Threads fmt::fmt boost_target)
endif()
target_include_directories(fdb_c_api_tester PRIVATE "${CMAKE_BINARY_DIR}/flow/include")
# do not set RPATH for mako
set_property(TARGET mako PROPERTY SKIP_BUILD_RPATH TRUE)
if (USE_SANITIZER)
target_link_libraries(mako PRIVATE fdb_c fdbclient fmt::fmt Threads::Threads fdb_cpp boost_asan)
target_link_libraries(mako PRIVATE fdb_c fdbclient fmt::fmt Threads::Threads fdb_cpp boost_asan rapidjson)
else()
target_link_libraries(mako PRIVATE fdb_c fdbclient fmt::fmt Threads::Threads fdb_cpp boost_target)
target_link_libraries(mako PRIVATE fdb_c fdbclient fmt::fmt Threads::Threads fdb_cpp boost_target rapidjson)
endif()
if(NOT OPEN_FOR_IDE)
@ -330,7 +331,7 @@ endif()
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadSingleThr.toml
--upgrade-path "6.3.23" "7.0.0" "7.1.5" "7.2.0"
--upgrade-path "6.3.23" "7.0.0" "7.1.9" "7.2.0"
--process-number 1
)
@ -338,7 +339,7 @@ endif()
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadSingleThr.toml
--upgrade-path "7.0.0" "7.1.5" "7.2.0"
--upgrade-path "7.0.0" "7.1.9" "7.2.0"
--process-number 1
)
@ -346,7 +347,7 @@ endif()
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
--upgrade-path "6.3.23" "7.0.0" "7.1.5" "7.2.0" "7.1.5"
--upgrade-path "6.3.23" "7.0.0" "7.1.9" "7.2.0" "7.1.9"
--process-number 3
)
@ -354,7 +355,7 @@ endif()
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
--upgrade-path "7.0.0" "7.1.5" "7.2.0" "7.1.5"
--upgrade-path "7.0.0" "7.1.9" "7.2.0" "7.1.9"
--process-number 3
)
@ -362,7 +363,7 @@ endif()
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
--upgrade-path "7.1.5" "7.2.0" "7.1.5"
--upgrade-path "7.1.9" "7.2.0" "7.1.9"
--process-number 3
)
@ -376,15 +377,25 @@ endif()
--redundancy double
)
add_test(NAME fdb_c_wiggle_and_upgrade
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
--upgrade-path "7.0.0" "wiggle" "7.2.0"
--disable-log-dump
--process-number 3
--redundancy double
)
add_test(NAME fdb_c_wiggle_and_upgrade_latest
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
--upgrade-path "7.1.9" "wiggle" "7.2.0"
--disable-log-dump
--process-number 3
--redundancy double
)
add_test(NAME fdb_c_wiggle_and_upgrade_63
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
--build-dir ${CMAKE_BINARY_DIR}
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
--upgrade-path "6.3.24" "wiggle" "7.0.0"
--disable-log-dump
--process-number 3
--redundancy double
)
endif()

View File

@ -35,10 +35,10 @@
#include "operations.hpp"
#include "time.hpp"
#include "ddsketch.hpp"
#include "contrib/rapidjson/rapidjson/document.h"
#include "contrib/rapidjson/rapidjson/rapidjson.h"
#include "contrib/rapidjson/rapidjson/stringbuffer.h"
#include "contrib/rapidjson/rapidjson/writer.h"
#include "rapidjson/document.h"
#include "rapidjson/rapidjson.h"
#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"
#include <iostream>
#include <sstream>
#include <vector>

View File

@ -16,10 +16,11 @@ set(SRCS
fdb_flow.actor.cpp
fdb_flow.h)
add_flow_target(STATIC_LIBRARY NAME fdb_flow SRCS ${SRCS})
add_flow_target(STATIC_LIBRARY NAME fdb_flow SRCS ${SRCS} NO_COPY_HDR)
target_link_libraries(fdb_flow PUBLIC fdb_c)
target_link_libraries(fdb_flow PUBLIC fdbclient)
target_include_directories(fdb_flow PUBLIC
"${CMAKE_SOURCE_DIR}"
"${CMAKE_CURRENT_BINARY_DIR}"
"${CMAKE_CURRENT_SOURCE_DIR}"
"${CMAKE_CURRENT_SOURCE_DIR}/tester"

View File

@ -24,7 +24,7 @@
#include <stdio.h>
#include <cinttypes>
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fmt/format.h"
#include "flow/DeterministicRandom.h"
#include "flow/SystemMonitor.h"
#include "flow/TLSConfig.actor.h"

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#include "Tester.actor.h"
#include "tester/Tester.actor.h"
#include <cinttypes>
#ifdef __linux__
#include <string.h>

View File

@ -22,7 +22,7 @@
// version.
#if defined(NO_INTELLISENSE) && !defined(FDB_FLOW_TESTER_TESTER_ACTOR_G_H)
#define FDB_FLOW_TESTER_TESTER_ACTOR_G_H
#include "Tester.actor.g.h"
#include "tester/Tester.actor.g.h"
#elif !defined(FDB_FLOW_TESTER_TESTER_ACTOR_H)
#define FDB_FLOW_TESTER_TESTER_ACTOR_H

View File

@ -620,6 +620,15 @@ def tenants(logger):
assert lines[0].strip().startswith('id: ')
assert lines[1].strip().startswith('prefix: ')
output = run_fdbcli_command('gettenant tenant JSON')
json_output = json.loads(output, strict=False)
assert(len(json_output) == 2)
assert('tenant' in json_output)
assert(json_output['type'] == 'success')
assert(len(json_output['tenant']) == 2)
assert('id' in json_output['tenant'])
assert('prefix' in json_output['tenant'])
output = run_fdbcli_command('usetenant')
assert output == 'Using the default tenant'

View File

@ -64,9 +64,6 @@ add_compile_definitions(BOOST_ERROR_CODE_HEADER_ONLY BOOST_SYSTEM_NO_DEPRECATED)
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)
include_directories(${CMAKE_SOURCE_DIR})
include_directories(${CMAKE_BINARY_DIR})
if(WIN32)
add_definitions(-DBOOST_USE_WINDOWS_H)
add_definitions(-DWIN32_LEAN_AND_MEAN)

View File

@ -147,9 +147,50 @@ function(strip_debug_symbols target)
add_dependencies(strip_targets strip_${target})
endfunction()
# This will copy the header from a flow target into ${CMAKE_BINARY_DIR}/include/target-name
# We're doing this to enforce proper dependencies. In the past we simply added the source
# and binary dir to the include list, which means that for example a compilation unit in
# flow could include a header file that lives in fdbserver. This is a somewhat hacky solution
# but due to our directory structure it seems to be the least invasive one.
function(copy_headers)
set(options)
set(oneValueArgs NAME OUT_DIR INC_DIR)
set(multiValueArgs SRCS)
cmake_parse_arguments(CP "${options}" "${oneValueArgs}" "${multiValueArgs}" "${ARGN}")
get_filename_component(dir_name ${CMAKE_CURRENT_SOURCE_DIR} NAME)
set(include_dir "${CMAKE_CURRENT_BINARY_DIR}/include")
set(incl_dir "${include_dir}/${dir_name}")
make_directory("${incl_dir}")
foreach(f IN LISTS CP_SRCS)
is_prefix(bd "${CMAKE_CURRENT_BINARY_DIR}" "${f}")
is_prefix(sd "${CMAKE_CURRENT_SOURCE_DIR}" "${f}")
if (bd OR sd)
continue()
endif()
is_header(hdr "${f}")
if(NOT hdr)
continue()
endif()
get_filename_component(fname ${f} NAME)
get_filename_component(dname ${f} DIRECTORY)
if (dname)
make_directory(${incl_dir}/${dname})
endif()
set(fpath "${incl_dir}/${dname}/${fname}")
add_custom_command(OUTPUT "${fpath}"
DEPENDS "${f}"
COMMAND "${CMAKE_COMMAND}" -E copy "${f}" "${fpath}"
WORKING_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}")
list(APPEND out_files "${fpath}")
endforeach()
add_custom_target("${CP_NAME}_incl" DEPENDS ${out_files})
set("${CP_OUT_DIR}" "${incl_dir}" PARENT_SCOPE)
set("${CP_INC_DIR}" ${include_dir} PARENT_SCOPE)
endfunction()
function(add_flow_target)
set(options EXECUTABLE STATIC_LIBRARY
DYNAMIC_LIBRARY)
DYNAMIC_LIBRARY NO_COPY_HDR)
set(oneValueArgs NAME)
set(multiValueArgs SRCS COVERAGE_FILTER_OUT DISABLE_ACTOR_DIAGNOSTICS ADDL_SRCS)
cmake_parse_arguments(AFT "${options}" "${oneValueArgs}" "${multiValueArgs}" "${ARGN}")
@ -159,42 +200,81 @@ function(add_flow_target)
if(NOT AFT_SRCS)
message(FATAL_ERROR "No sources provided")
endif()
if(NOT AFT_NO_COPY_HDR)
copy_headers(NAME ${AFT_NAME} SRCS "${AFT_SRCS};${AFT_DISABLE_ACTOR_DIAGNOSTICS}" OUT_DIR incl_dir INC_DIR include_dir)
endif()
#foreach(src IN LISTS AFT_SRCS)
# is_header(h "${src}")
# if(NOT h)
# list(SRCS "${CMAKE_CURRENT_SOURCE_DIR}/${src}")
# endif()
#endforeach()
if(OPEN_FOR_IDE)
# Intentionally omit ${AFT_DISABLE_ACTOR_DIAGNOSTICS} since we don't want diagnostics
set(sources ${AFT_SRCS} ${AFT_ADDL_SRCS})
add_library(${AFT_NAME} OBJECT ${sources})
else()
foreach(src IN LISTS AFT_SRCS AFT_DISABLE_ACTOR_DIAGNOSTICS)
set(actor_compiler_flags "")
is_header(hdr ${src})
set(in_filename "${src}")
if(${src} MATCHES ".*\\.actor\\.(h|cpp)")
list(APPEND actors ${src})
list(APPEND actor_compiler_flags "--generate-probes")
set(is_actor_file YES)
if(${src} MATCHES ".*\\.h")
string(REPLACE ".actor.h" ".actor.g.h" generated ${src})
string(REPLACE ".actor.h" ".actor.g.h" out_filename ${in_filename})
else()
string(REPLACE ".actor.cpp" ".actor.g.cpp" generated ${src})
string(REPLACE ".actor.cpp" ".actor.g.cpp" out_filename ${in_filename})
endif()
else()
set(is_actor_file NO)
set(out_filename "${src}")
endif()
if(hdr AND NOT AFT_NO_COPY_HDR)
set(in_file "${incl_dir}/${in_filename}")
set(out_file "${incl_dir}/${out_filename}")
else()
set(in_file "${CMAKE_CURRENT_SOURCE_DIR}/${in_filename}")
if(is_actor_file)
set(out_file "${CMAKE_CURRENT_BINARY_DIR}/${out_filename}")
else()
set(out_file "${in_file}")
endif()
endif()
is_prefix(in_src_dir "${CMAKE_CURRENT_SOURCE_DIR}" ${src})
is_prefix(in_bin_dir "${CMAKE_CURRENT_BINARY_DIR}" ${src})
if(NOT AFT_NO_COPY_HDR)
is_prefix(in_incl_dir "${incl_dir}" ${src})
endif()
if(in_src_dir OR in_bin_dir)
set(in_file "${src}")
set(out_file "${src}")
endif()
list(APPEND sources ${out_file})
set(actor_compiler_flags "")
if(is_actor_file)
list(APPEND actors ${in_file})
list(APPEND actor_compiler_flags "--generate-probes")
foreach(s IN LISTS AFT_DISABLE_ACTOR_DIAGNOSTICS)
if("${s}" STREQUAL "${src}")
list(APPEND actor_compiler_flags "--disable-diagnostics")
break()
endif()
endforeach()
list(APPEND sources ${generated})
list(APPEND generated_files ${CMAKE_CURRENT_BINARY_DIR}/${generated})
list(APPEND generated_files ${out_file})
if(WIN32)
add_custom_command(OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${generated}"
COMMAND $<TARGET_FILE:actorcompiler> "${CMAKE_CURRENT_SOURCE_DIR}/${src}" "${CMAKE_CURRENT_BINARY_DIR}/${generated}" ${actor_compiler_flags}
DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/${src}" ${actor_exe}
add_custom_command(OUTPUT "${out_file}"
COMMAND $<TARGET_FILE:actorcompiler> "${in_file}" "${out_file}" ${actor_compiler_flags}
DEPENDS "${in_file}" ${actor_exe}
COMMENT "Compile actor: ${src}")
else()
add_custom_command(OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${generated}"
COMMAND ${MONO_EXECUTABLE} ${actor_exe} "${CMAKE_CURRENT_SOURCE_DIR}/${src}" "${CMAKE_CURRENT_BINARY_DIR}/${generated}" ${actor_compiler_flags} > /dev/null
DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/${src}" ${actor_exe}
add_custom_command(OUTPUT "${out_file}"
COMMAND ${MONO_EXECUTABLE} ${actor_exe} "${in_file}" "${out_file}" ${actor_compiler_flags} > /dev/null
DEPENDS "${in_file}" ${actor_exe}
COMMENT "Compile actor: ${src}")
endif()
else()
list(APPEND sources ${src})
endif()
endforeach()
if(AFT_EXECUTABLE)
@ -227,7 +307,10 @@ function(add_flow_target)
set_property(TARGET ${AFT_NAME} PROPERTY COVERAGE_FILTERS ${AFT_SRCS})
add_custom_target(${AFT_NAME}_actors DEPENDS ${generated_files})
add_dependencies(${AFT_NAME}_actors actorcompiler)
if(NOT AFT_NO_COPY_HDR)
target_include_directories("${AFT_NAME}" PUBLIC "${include_dir}")
add_dependencies(${AFT_NAME}_actors actorcompiler "${AFT_NAME}_incl")
endif()
add_dependencies(${AFT_NAME} ${AFT_NAME}_actors)
if(NOT WIN32)
assert_no_version_h(${AFT_NAME}_actors)

31
cmake/utils.cmake Normal file
View File

@ -0,0 +1,31 @@
# sets out_var to YES if filename has extension .h or .hpp, NO otherwise
function(is_header out_var filename)
set(res "NO")
get_filename_component(ext "${filename}" LAST_EXT)
if((ext STREQUAL ".h") OR (ext STREQUAL ".hpp"))
set(res "YES")
endif()
set("${out_var}" "${res}" PARENT_SCOPE)
endfunction()
function(remove_prefix out prefix str)
string(LENGTH "${prefix}" len)
string(SUBSTRING "${str}" ${len} -1 res)
set("${out}" "${res}" PARENT_SCOPE)
endfunction()
function(is_prefix out prefix str)
string(LENGTH "${prefix}" plen)
string(LENGTH "${str}" slen)
if(plen GREATER slen)
set(res NO)
else()
string(SUBSTRING "${str}" 0 ${plen} pstr)
if(pstr STREQUAL prefix)
set(res YES)
else()
set(res NO)
endif()
endif()
set(${out} ${res} PARENT_SCOPE)
endfunction()

View File

@ -234,12 +234,27 @@ Note that :ref:`characters can be escaped <cli-escaping>` when specifying keys (
gettenant
---------
The ``gettenant`` command fetches metadata for a given tenant and displays it. Its syntax is ``gettenant <TENANT_NAME>``.
The ``gettenant`` command fetches metadata for a given tenant and displays it. Its syntax is ``gettenant <TENANT_NAME> [JSON]``.
Included in the output of this command are the ``id`` and ``prefix`` assigned to the tenant. If the tenant does not exist, ``fdbcli`` will report an error.
Included in the output of this command are the ``id`` and ``prefix`` assigned to the tenant. If the tenant does not exist, ``fdbcli`` will report an error. If ``JSON`` is specified, then the output will be written as a JSON document::
getversion
----------
{
"tenant": {
"id": 0,
"prefix": "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
},
"type": "success"
}
In the event of an error, the output will include an error message::
{
"error": "...",
"type": "error"
}
getversion
----------
The ``getversion`` command fetches the current read version of the cluster or currently running transaction.

View File

@ -4,6 +4,18 @@
Release Notes
#############
7.1.11
======
* Same as 7.1.10 release with AVX enabled.
7.1.10
======
* Released with AVX disabled.
* Fixed a sequencer crash when DC ID is a string. `(PR #7393) <https://github.com/apple/foundationdb/pull/7393>`_
* Fixed a client performance regression by removing unnecessary transaction initialization. `(PR #7365) <https://github.com/apple/foundationdb/pull/7365>`_
* Safely removed fdb_transaction_get_range_and_flat_map C API. `(PR #7379) <https://github.com/apple/foundationdb/pull/7379>`_
* Fixed an unknown error bug when hostname resolving fails. `(PR #7380) <https://github.com/apple/foundationdb/pull/7380>`_
7.1.9
=====
* Same as 7.1.8 release with AVX enabled.
@ -15,7 +27,7 @@ Release Notes
* Added RSS bytes for processes in status json output and corrected available_bytes calculation. `(PR #7348) <https://github.com/apple/foundationdb/pull/7348>`_
* Added versionstamp support in tuples. `(PR #7313) <https://github.com/apple/foundationdb/pull/7313>`_
* Fixed some spammy trace events. `(PR #7300) <https://github.com/apple/foundationdb/pull/7300>`_
* Fixed a memory corruption bug for using streaming peeks. `(PR #7288) <https://github.com/apple/foundationdb/pull/7288>`_
* Avoided a memory corruption bug by disabling streaming peeks. `(PR #7288) <https://github.com/apple/foundationdb/pull/7288>`_
* Fixed a hang bug in fdbcli exclude command. `(PR #7268) <https://github.com/apple/foundationdb/pull/7268>`_
* Fixed an issue that a remote TLog blocks peeks. `(PR #7255) <https://github.com/apple/foundationdb/pull/7255>`_
* Fixed a connection issue using hostnames. `(PR #7264) <https://github.com/apple/foundationdb/pull/7264>`_

View File

@ -19,7 +19,7 @@
* limitations under the License.
*/
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fmt/format.h"
#include "flow/flow.h"
#include "flow/Platform.h"
#include "flow/DeterministicRandom.h"

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fmt/format.h"
#include "fdbbackup/BackupTLSConfig.h"
#include "fdbclient/JsonBuilder.h"
#include "flow/Arena.h"

View File

@ -19,7 +19,7 @@
*/
#include "boost/lexical_cast.hpp"
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fmt/format.h"
#include "fdbcli/fdbcli.actor.h"
#include "fdbclient/IClientApi.h"

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fmt/format.h"
#include "fdbcli/fdbcli.actor.h"

View File

@ -21,7 +21,7 @@
#include <cinttypes>
#include "boost/lexical_cast.hpp"
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fmt/format.h"
#include "fdbcli/fdbcli.actor.h"

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fmt/format.h"
#include "fdbcli/fdbcli.actor.h"

View File

@ -209,11 +209,12 @@ CommandFactory listTenantsFactory(
// gettenant command
ACTOR Future<bool> getTenantCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens) {
if (tokens.size() != 2) {
if (tokens.size() < 2 || tokens.size() > 3 || (tokens.size() == 3 && tokens[2] != "JSON"_sr)) {
printUsage(tokens[0]);
return false;
}
state bool useJson = tokens.size() == 3;
state Key tenantNameKey = fdb_cli::tenantSpecialKeyRange.begin.withSuffix(tokens[1]);
state Reference<ITransaction> tr = db->createTransaction();
@ -228,30 +229,58 @@ ACTOR Future<bool> getTenantCommandActor(Reference<IDatabase> db, std::vector<St
json_spirit::mValue jsonObject;
json_spirit::read_string(tenant.get().toString(), jsonObject);
JSONDoc doc(jsonObject);
int64_t id;
std::string prefix;
doc.get("id", id);
doc.get("prefix", prefix);
if (useJson) {
json_spirit::mObject resultObj;
resultObj["tenant"] = jsonObject;
resultObj["type"] = "success";
printf("%s\n",
json_spirit::write_string(json_spirit::mValue(resultObj), json_spirit::pretty_print).c_str());
} else {
JSONDoc doc(jsonObject);
int64_t id;
std::string prefix;
doc.get("id", id);
doc.get("prefix", prefix);
printf(" id: %" PRId64 "\n", id);
printf(" prefix: %s\n", printable(prefix).c_str());
}
printf(" id: %" PRId64 "\n", id);
printf(" prefix: %s\n", printable(prefix).c_str());
return true;
} catch (Error& e) {
state Error err(e);
if (e.code() == error_code_special_keys_api_failure) {
std::string errorMsgStr = wait(fdb_cli::getSpecialKeysFailureErrorMessage(tr));
fprintf(stderr, "ERROR: %s\n", errorMsgStr.c_str());
return false;
try {
wait(safeThreadFutureToFuture(tr->onError(e)));
} catch (Error& finalErr) {
state std::string errorStr;
if (finalErr.code() == error_code_special_keys_api_failure) {
std::string str = wait(getSpecialKeysFailureErrorMessage(tr));
errorStr = str;
} else if (useJson) {
errorStr = finalErr.what();
} else {
throw finalErr;
}
if (useJson) {
json_spirit::mObject resultObj;
resultObj["type"] = "error";
resultObj["error"] = errorStr;
printf(
"%s\n",
json_spirit::write_string(json_spirit::mValue(resultObj), json_spirit::pretty_print).c_str());
} else {
fprintf(stderr, "ERROR: %s\n", errorStr.c_str());
}
}
wait(safeThreadFutureToFuture(tr->onError(err)));
}
}
}
CommandFactory getTenantFactory("gettenant",
CommandHelp("gettenant <TENANT_NAME>",
"prints the metadata for a tenant",
"Prints the metadata for a tenant."));
CommandFactory getTenantFactory(
"gettenant",
CommandHelp("gettenant <TENANT_NAME> [JSON]",
"prints the metadata for a tenant",
"Prints the metadata for a tenant. If JSON is specified, then the output will be in JSON format."));
} // namespace fdb_cli

View File

@ -19,7 +19,7 @@
*/
#include "boost/lexical_cast.hpp"
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fmt/format.h"
#include "fdbclient/ClusterConnectionFile.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/FDBTypes.h"

View File

@ -20,7 +20,7 @@
#include "flow/flow.h"
#include "flow/singleton.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/IAsyncFile.h"
#include "fdbclient/ActorLineageProfiler.h"
#include "fdbclient/NameLineage.h"
#include <msgpack.hpp>

View File

@ -31,10 +31,10 @@
#include <sstream>
#include <time.h>
#include "fdbrpc/IAsyncFile.h"
#include "flow/IAsyncFile.h"
#include "flow/serialize.h"
#include "flow/Net2Packet.h"
#include "fdbrpc/IRateControl.h"
#include "flow/IRateControl.h"
#include "fdbclient/S3BlobStore.h"
#include "fdbclient/md5/md5.h"
#include "fdbclient/libb64/encode.h"

View File

@ -86,10 +86,10 @@ public:
addTask([promise, func, priority] {
try {
auto funcResult = func();
onMainThreadVoid([promise, funcResult] { promise.send(funcResult); }, nullptr, priority);
onMainThreadVoid([promise, funcResult] { promise.send(funcResult); }, priority);
} catch (Error& e) {
TraceEvent("ErrorExecutingAsyncTask").error(e);
onMainThreadVoid([promise, e] { promise.sendError(e); }, nullptr, priority);
onMainThreadVoid([promise, e] { promise.sendError(e); }, priority);
}
});
return promise.getFuture();

View File

@ -29,7 +29,7 @@
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/TaskBucket.h"
#include "fdbclient/Notified.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/IAsyncFile.h"
#include "fdbclient/KeyBackedTypes.h"
#include <ctime>
#include <climits>

View File

@ -22,7 +22,7 @@
#include <ostream>
// FIXME: Trim this down
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fmt/format.h"
#include "flow/Platform.actor.h"
#include "fdbclient/AsyncTaskThread.h"
#include "fdbclient/BackupContainer.h"
@ -37,7 +37,9 @@
#include "fdbrpc/simulator.h"
#include "flow/Platform.h"
#include "fdbclient/AsyncFileS3BlobStore.actor.h"
#ifdef BUILD_AZURE_BACKUP
#include "fdbclient/BackupContainerAzureBlobStore.h"
#endif
#include "fdbclient/BackupContainerFileSystem.h"
#include "fdbclient/BackupContainerLocalDirectory.h"
#include "fdbclient/BackupContainerS3BlobStore.h"

View File

@ -23,7 +23,7 @@
#pragma once
#include "flow/flow.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/IAsyncFile.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ReadYourWrites.h"

View File

@ -19,7 +19,9 @@
*/
#include "fdbclient/BackupAgent.actor.h"
#ifdef BUILD_AZURE_BACKUP
#include "fdbclient/BackupContainerAzureBlobStore.h"
#endif
#include "fdbclient/BackupContainerFileSystem.h"
#include "fdbclient/BackupContainerLocalDirectory.h"
#include "fdbclient/BackupContainerS3BlobStore.h"

View File

@ -22,7 +22,7 @@
#define FDBCLIENT_BACKUP_CONTAINER_FILESYSTEM_H
#pragma once
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fmt/format.h"
#include "fdbclient/BackupContainer.h"
#include "fdbclient/FDBTypes.h"
#include "flow/Trace.h"

View File

@ -20,7 +20,7 @@
#include "fdbclient/BackupContainerLocalDirectory.h"
#include "fdbrpc/AsyncFileReadAhead.actor.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/IAsyncFile.h"
#include "flow/Platform.actor.h"
#include "flow/Platform.h"
#include "fdbrpc/simulator.h"

View File

@ -20,7 +20,7 @@
#include <vector>
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fmt/format.h"
#include "flow/serialize.h"
#include "fdbclient/BlobGranuleFiles.h"
#include "fdbclient/Knobs.h"

View File

@ -21,7 +21,7 @@
#include <map>
#include <vector>
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fmt/format.h"
#include "fdbclient/AsyncFileS3BlobStore.actor.h"
#include "fdbclient/BlobGranuleCommon.h"
#include "fdbclient/BlobGranuleFiles.h"

View File

@ -1,6 +1,7 @@
set(FDBCLIENT_SRCS
ActorLineageProfiler.h
ActorLineageProfiler.cpp
AnnotateActor.h
AnnotateActor.cpp
AsyncFileS3BlobStore.actor.cpp
AsyncFileS3BlobStore.actor.h
@ -67,12 +68,15 @@ set(FDBCLIENT_SRCS
GlobalConfig.actor.h
GlobalConfig.actor.cpp
GrvProxyInterface.h
HTTP.h
HTTP.actor.cpp
HighContentionPrefixAllocator.actor.h
IClientApi.h
IConfigTransaction.cpp
IConfigTransaction.h
ISingleThreadTransaction.cpp
ISingleThreadTransaction.h
JSONDoc.h
JsonBuilder.cpp
JsonBuilder.h
KeyBackedTypes.h
@ -103,9 +107,14 @@ set(FDBCLIENT_SRCS
PaxosConfigTransaction.actor.cpp
PaxosConfigTransaction.h
PImpl.h
ProcessInterface.h
SimpleConfigTransaction.actor.cpp
SpecialKeySpace.actor.cpp
SpecialKeySpace.actor.h
RESTClient.h
RESTClient.actor.cpp
RESTUtils.h
RESTUtils.actor.cpp
ReadYourWrites.actor.cpp
ReadYourWrites.h
RestoreInterface.cpp
@ -113,6 +122,7 @@ set(FDBCLIENT_SRCS
RunTransaction.actor.h
RYWIterator.cpp
RYWIterator.h
S3BlobStore.h
S3BlobStore.actor.cpp
Schemas.cpp
Schemas.h
@ -147,6 +157,9 @@ set(FDBCLIENT_SRCS
TestKnobCollection.h
ThreadSafeTransaction.cpp
ThreadSafeTransaction.h
Tracing.h
Tracing.actor.cpp
TransactionLineage.h
Tuple.cpp
Tuple.h
VersionedMap.actor.h
@ -156,7 +169,6 @@ set(FDBCLIENT_SRCS
Versionstamp.h
VersionVector.h
VersionVector.cpp
WellKnownEndpoints.h
WriteMap.h
WriteMap.cpp
json_spirit/json_spirit_error_position.h
@ -164,18 +176,49 @@ set(FDBCLIENT_SRCS
json_spirit/json_spirit_value.h
json_spirit/json_spirit_writer_options.h
json_spirit/json_spirit_writer_template.h
libb64/encode.h
libb64/decode.h
libb64/cdecode.h
libb64/cdecode.c
libb64/cencode.h
libb64/cencode.c
md5/md5.h
md5/md5.c
rapidxml/rapidxml.hpp
rapidxml/rapidxml_iterators.hpp
rapidxml/rapidxml_print.hpp
rapidxml/rapidxml_utils.hpp
sha1/SHA1.h
sha1/SHA1.cpp
zipf.c
zipf.h)
message(STATUS "FDB version is ${FDB_VERSION}")
message(STATUS "FDB package name is ${FDB_PACKAGE_NAME}")
set(options_srcs ${CMAKE_CURRENT_BINARY_DIR}/FDBOptions.g.cpp)
vexillographer_compile(TARGET fdboptions LANG cpp OUT ${CMAKE_CURRENT_BINARY_DIR}/FDBOptions.g
make_directory(${CMAKE_CURRENT_BINARY_DIR}/include/fdbclient/)
vexillographer_compile(TARGET fdboptions_vex LANG cpp OUT ${CMAKE_CURRENT_BINARY_DIR}/FDBOptions.g
OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/FDBOptions.g.h ${CMAKE_CURRENT_BINARY_DIR}/FDBOptions.g.cpp)
add_custom_command(OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/include/fdbclient/FDBOptions.g.h
DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/FDBOptions.g.h
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_BINARY_DIR}/FDBOptions.g.h ${CMAKE_CURRENT_BINARY_DIR}/include/fdbclient/FDBOptions.g.h)
vexillographer_compile(TARGET fdboptions_c LANG c OUT ${CMAKE_CURRENT_BINARY_DIR}/include/fdbclient/fdb_c_options.g.h
OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/include/fdbclient/fdb_c_options.g.h)
add_custom_target(fdboptions DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/include/fdbclient/FDBOptions.g.h)
add_dependencies(fdboptions fdboptions_c)
################################################################################
# Build information
################################################################################
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/BuildFlags.h.in ${CMAKE_CURRENT_BINARY_DIR}/include/fdbclient/BuildFlags.h)
set(BUILD_AZURE_BACKUP OFF CACHE BOOL "Build Azure backup client")
if(BUILD_AZURE_BACKUP)
add_compile_definitions(BUILD_AZURE_BACKUP)
@ -224,6 +267,7 @@ if(WITH_AWS_BACKUP)
endif()
add_flow_target(STATIC_LIBRARY NAME fdbclient SRCS ${FDBCLIENT_SRCS} ADDL_SRCS ${options_srcs})
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/versions.h.cmake ${CMAKE_CURRENT_BINARY_DIR}/include/fdbclient/versions.h)
add_dependencies(fdbclient fdboptions)
target_link_libraries(fdbclient PUBLIC fdbrpc msgpack)

View File

@ -24,7 +24,7 @@
#include "fdbclient/FDBTypes.h"
#include "fdbclient/Knobs.h"
#include "flow/Tracing.h"
#include "fdbclient/Tracing.h"
// The versioned message has wire format : -1, version, messages
static const int32_t VERSION_HEADER = -1;

View File

@ -27,7 +27,7 @@
#include "fdbrpc/Locality.h"
#include "fdbclient/CommitProxyInterface.h"
#include "fdbclient/ClusterInterface.h"
#include "fdbclient/WellKnownEndpoints.h"
#include "fdbrpc/WellKnownEndpoints.h"
#include "flow/Hostname.h"
const int MAX_CLUSTER_FILE_BYTES = 60000;
@ -46,6 +46,8 @@ struct ClientLeaderRegInterface {
bool operator==(const ClientLeaderRegInterface& rhs) const {
return getLeader == rhs.getLeader && openDatabase == rhs.openDatabase;
}
std::string getAddressString() const;
};
// A string containing the information necessary to connect to a cluster.

View File

@ -26,7 +26,7 @@
#include "fdbclient/NativeAPI.actor.h"
#include <ctime>
#include <climits>
#include "fdbrpc/IAsyncFile.h"
#include "flow/IAsyncFile.h"
#include "flow/genericactors.actor.h"
#include "flow/Hash3.h"
#include <numeric>

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fmt/format.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/BackupContainer.h"
#include "fdbclient/DatabaseContext.h"
@ -33,7 +33,7 @@
#include <cinttypes>
#include <ctime>
#include <climits>
#include "fdbrpc/IAsyncFile.h"
#include "flow/IAsyncFile.h"
#include "flow/genericactors.actor.h"
#include "flow/Hash3.h"
#include <numeric>

View File

@ -18,10 +18,12 @@
* limitations under the License.
*/
#include "fdbrpc/HTTP.h"
#include "fdbclient/HTTP.h"
#include "fdbclient/md5/md5.h"
#include "fdbclient/ClientKnobs.h"
#include "fdbclient/libb64/encode.h"
#include "fdbclient/Knobs.h"
#include <cctype>
#include "flow/actorcompiler.h" // has to be last include

View File

@ -25,8 +25,7 @@
#include "flow/flow.h"
#include "flow/Net2Packet.h"
#include "fdbrpc/IRateControl.h"
#include "fdbclient/Knobs.h"
#include "flow/IRateControl.h"
namespace HTTP {
struct is_iless {

View File

@ -27,7 +27,7 @@
#include "fdbclient/FDBTypes.h"
#include "fdbclient/Tenant.h"
#include "flow/Tracing.h"
#include "fdbclient/Tracing.h"
#include "flow/ThreadHelper.actor.h"
struct VersionVector;
@ -101,8 +101,8 @@ public:
// @todo This API and the "getSpanContext()" API may help with debugging simulation
// test failures. (These APIs are not currently invoked anywhere.) Remove them
// later if they are not really needed.
virtual VersionVector getVersionVector() = 0;
virtual SpanContext getSpanContext() = 0;
virtual ThreadFuture<VersionVector> getVersionVector() = 0;
virtual ThreadFuture<SpanContext> getSpanContext() = 0;
virtual ThreadFuture<int64_t> getApproximateSize() = 0;
virtual void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) = 0;

View File

@ -22,7 +22,7 @@
#include <string>
#include <vector>
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fmt/format.h"
#include "fdbclient/Knobs.h"
#include "flow/Arena.h"
#include "fdbclient/ClusterConnectionMemoryRecord.h"

View File

@ -485,6 +485,14 @@ ClientLeaderRegInterface::ClientLeaderRegInterface(INetwork* local) {
TaskPriority::Coordination);
}
std::string ClientLeaderRegInterface::getAddressString() const {
if (hostname.present()) {
return hostname.get().toString();
} else {
return getLeader.getEndpoint().getPrimaryAddress().toString();
}
}
// Nominee is the worker among all workers that are considered as leader by one coordinator
// This function contacts a coordinator coord to ask who is its nominee.
ACTOR Future<Void> monitorNominee(Key key,
@ -510,9 +518,7 @@ ACTOR Future<Void> monitorNominee(Key key,
TraceEvent("GetLeaderReply")
.suppressFor(1.0)
.detail("Coordinator",
coord.hostname.present() ? coord.hostname.get().toString()
: coord.getLeader.getEndpoint().getPrimaryAddress().toString())
.detail("Coordinator", coord.getAddressString())
.detail("Nominee", li.present() ? li.get().changeID : UID())
.detail("ClusterKey", key.printable());
@ -581,6 +587,7 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration(Reference<IClusterCon
state AsyncTrigger nomineeChange;
state std::vector<Optional<LeaderInfo>> nominees;
state Future<Void> allActors;
state Optional<std::pair<LeaderInfo, bool>> leader;
nominees.resize(coordinators.clientLeaderServers.size());
@ -594,7 +601,7 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration(Reference<IClusterCon
allActors = waitForAll(actors);
loop {
Optional<std::pair<LeaderInfo, bool>> leader = getLeader(nominees);
leader = getLeader(nominees);
TraceEvent("MonitorLeaderChange")
.detail("NewLeader", leader.present() ? leader.get().first.changeID : UID(1, 1));
if (leader.present()) {
@ -615,7 +622,7 @@ ACTOR Future<MonitorLeaderInfo> monitorLeaderOneGeneration(Reference<IClusterCon
.detail("CurrentConnectionString",
info.intermediateConnRecord->getConnectionString().toString());
}
connRecord->setAndPersistConnectionString(info.intermediateConnRecord->getConnectionString());
wait(connRecord->setAndPersistConnectionString(info.intermediateConnRecord->getConnectionString()));
info.intermediateConnRecord = connRecord;
}
@ -871,6 +878,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
state std::vector<UID> lastGrvProxyUIDs;
state std::vector<GrvProxyInterface> lastGrvProxies;
state std::vector<ClientLeaderRegInterface> clientLeaderServers;
state bool allConnectionsFailed = false;
clientLeaderServers.reserve(coordinatorsSize);
for (const auto& h : cs.hostnames) {
@ -896,7 +904,22 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
state ClusterConnectionString storedConnectionString;
if (connRecord) {
bool upToDate = wait(connRecord->upToDate(storedConnectionString));
if (!upToDate) {
if (upToDate) {
incorrectTime = Optional<double>();
} else if (allConnectionsFailed) {
// Failed to connect to all coordinators from the current connection string,
// so it is not possible to get any new updates from the cluster. It can be that
// all the coordinators have changed, but the client missed that, because it had
// an incompatible protocol version. Since the cluster file is different,
// it may have been updated by other clients.
TraceEvent("UpdatingConnectionStringFromFile")
.detail("ClusterFile", connRecord->toString())
.detail("StoredConnectionString", storedConnectionString.toString())
.detail("CurrentConnectionString", connRecord->getConnectionString().toString());
wait(connRecord->setAndPersistConnectionString(storedConnectionString));
info.intermediateConnRecord = connRecord;
return info;
} else {
req.issues.push_back_deep(req.issues.arena(), LiteralStringRef("incorrect_cluster_file_contents"));
std::string connectionString = connRecord->getConnectionString().toString();
if (!incorrectTime.present()) {
@ -909,8 +932,6 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
.detail("ClusterFile", connRecord->toString())
.detail("StoredConnectionString", storedConnectionString.toString())
.detail("CurrentConnectionString", connectionString);
} else {
incorrectTime = Optional<double>();
}
} else {
incorrectTime = Optional<double>();
@ -953,7 +974,7 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
.detail("CurrentConnectionString",
info.intermediateConnRecord->getConnectionString().toString());
}
connRecord->setAndPersistConnectionString(info.intermediateConnRecord->getConnectionString());
wait(connRecord->setAndPersistConnectionString(info.intermediateConnRecord->getConnectionString()));
info.intermediateConnRecord = connRecord;
}
@ -964,11 +985,16 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration(
shrinkProxyList(ni, lastCommitProxyUIDs, lastCommitProxies, lastGrvProxyUIDs, lastGrvProxies);
clientInfo->setUnconditional(ni);
successIndex = index;
allConnectionsFailed = false;
} else {
TEST(rep.getError().code() == error_code_failed_to_progress); // Coordinator cant talk to cluster controller
TEST(rep.getError().code() == error_code_lookup_failed); // Coordinator hostname resolving failure
TraceEvent("MonitorProxiesConnectFailed")
.detail("Error", rep.getError().name())
.detail("Coordinator", clientLeaderServer.getAddressString());
index = (index + 1) % coordinatorsSize;
if (index == successIndex) {
allConnectionsFailed = true;
wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY));
}
}

View File

@ -202,7 +202,7 @@ public:
auto sav = (DLThreadSingleAssignmentVar<T>*)param;
if (MultiVersionApi::api->callbackOnMainThread) {
onMainThreadVoid([sav]() { sav->apply(); }, nullptr);
onMainThreadVoid([sav]() { sav->apply(); });
} else {
sav->apply();
}

View File

@ -385,7 +385,7 @@ void DLTransaction::reset() {
api->transactionReset(tr);
}
VersionVector DLTransaction::getVersionVector() {
ThreadFuture<VersionVector> DLTransaction::getVersionVector() {
return VersionVector(); // not implemented
}
@ -1136,7 +1136,7 @@ Version MultiVersionTransaction::getCommittedVersion() {
return invalidVersion;
}
VersionVector MultiVersionTransaction::getVersionVector() {
ThreadFuture<VersionVector> MultiVersionTransaction::getVersionVector() {
auto tr = getTransaction();
if (tr.transaction) {
return tr.transaction->getVersionVector();
@ -1145,7 +1145,7 @@ VersionVector MultiVersionTransaction::getVersionVector() {
return VersionVector();
}
SpanContext MultiVersionTransaction::getSpanContext() {
ThreadFuture<SpanContext> MultiVersionTransaction::getSpanContext() {
auto tr = getTransaction();
if (tr.transaction) {
return tr.transaction->getSpanContext();
@ -1461,8 +1461,7 @@ MultiVersionDatabase::MultiVersionDatabase(MultiVersionApi* api,
});
Reference<DatabaseState> dbStateRef = dbState;
onMainThreadVoid([dbStateRef]() { dbStateRef->protocolVersionMonitor = dbStateRef->monitorProtocolVersion(); },
nullptr);
onMainThreadVoid([dbStateRef]() { dbStateRef->protocolVersionMonitor = dbStateRef->monitorProtocolVersion(); });
}
}
@ -1629,7 +1628,7 @@ ThreadFuture<Void> MultiVersionDatabase::DatabaseState::monitorProtocolVersion()
ProtocolVersion clusterVersion =
!cv.isError() ? cv.get() : self->dbProtocolVersion.orDefault(currentProtocolVersion);
onMainThreadVoid([self, clusterVersion]() { self->protocolVersionChanged(clusterVersion); }, nullptr);
onMainThreadVoid([self, clusterVersion]() { self->protocolVersionChanged(clusterVersion); });
return ErrorOr<Void>(Void());
});
}
@ -1693,10 +1692,10 @@ void MultiVersionDatabase::DatabaseState::protocolVersionChanged(ProtocolVersion
dbReady = mapThreadFuture<Void, Void>(
newDb.castTo<DLDatabase>()->onReady(), [self, newDb, client](ErrorOr<Void> ready) {
if (!ready.isError()) {
onMainThreadVoid([self, newDb, client]() { self->updateDatabase(newDb, client); }, nullptr);
onMainThreadVoid([self, newDb, client]() { self->updateDatabase(newDb, client); });
} else {
onMainThreadVoid([self, client]() { self->updateDatabase(Reference<IDatabase>(), client); },
nullptr);
onMainThreadVoid(
[self, client]() { self->updateDatabase(Reference<IDatabase>(), client); });
}
return ready;
@ -1806,19 +1805,17 @@ void MultiVersionDatabase::DatabaseState::startLegacyVersionMonitors() {
// Cleans up state for the legacy version monitors to break reference cycles
void MultiVersionDatabase::DatabaseState::close() {
Reference<DatabaseState> self = Reference<DatabaseState>::addRef(this);
onMainThreadVoid(
[self]() {
self->closed = true;
if (self->protocolVersionMonitor.isValid()) {
self->protocolVersionMonitor.cancel();
}
for (auto monitor : self->legacyVersionMonitors) {
monitor->close();
}
onMainThreadVoid([self]() {
self->closed = true;
if (self->protocolVersionMonitor.isValid()) {
self->protocolVersionMonitor.cancel();
}
for (auto monitor : self->legacyVersionMonitors) {
monitor->close();
}
self->legacyVersionMonitors.clear();
},
nullptr);
self->legacyVersionMonitors.clear();
});
}
// Starts the connection monitor by creating a database object at an old version.
@ -1838,22 +1835,20 @@ void MultiVersionDatabase::LegacyVersionMonitor::startConnectionMonitor(
Reference<LegacyVersionMonitor> self = Reference<LegacyVersionMonitor>::addRef(this);
versionMonitor =
mapThreadFuture<Void, Void>(db.castTo<DLDatabase>()->onReady(), [self, dbState](ErrorOr<Void> ready) {
onMainThreadVoid(
[self, ready, dbState]() {
if (ready.isError()) {
if (ready.getError().code() != error_code_operation_cancelled) {
TraceEvent(SevError, "FailedToOpenDatabaseOnClient")
.error(ready.getError())
.detail("LibPath", self->client->libPath);
onMainThreadVoid([self, ready, dbState]() {
if (ready.isError()) {
if (ready.getError().code() != error_code_operation_cancelled) {
TraceEvent(SevError, "FailedToOpenDatabaseOnClient")
.error(ready.getError())
.detail("LibPath", self->client->libPath);
self->client->failed = true;
MultiVersionApi::api->updateSupportedVersions();
}
} else {
self->runGrvProbe(dbState);
}
},
nullptr);
self->client->failed = true;
MultiVersionApi::api->updateSupportedVersions();
}
} else {
self->runGrvProbe(dbState);
}
});
return ready;
});
@ -1868,12 +1863,10 @@ void MultiVersionDatabase::LegacyVersionMonitor::runGrvProbe(Reference<MultiVers
versionMonitor = mapThreadFuture<Version, Void>(tr->getReadVersion(), [self, dbState](ErrorOr<Version> v) {
// If the version attempt returns an error, we regard that as a connection (except operation_cancelled)
if (!v.isError() || v.getError().code() != error_code_operation_cancelled) {
onMainThreadVoid(
[self, dbState]() {
self->monitorRunning = false;
dbState->protocolVersionChanged(self->client->protocolVersion);
},
nullptr);
onMainThreadVoid([self, dbState]() {
self->monitorRunning = false;
dbState->protocolVersionChanged(self->client->protocolVersion);
});
}
return v.map<Void>([](Version v) { return Void(); });
@ -2123,11 +2116,9 @@ void MultiVersionApi::setSupportedClientVersions(Standalone<StringRef> versions)
// This option must be set on the main thread because it modifies structures that can be used concurrently by the
// main thread
onMainThreadVoid(
[this, versions]() {
localClient->api->setNetworkOption(FDBNetworkOptions::SUPPORTED_CLIENT_VERSIONS, versions);
},
nullptr);
onMainThreadVoid([this, versions]() {
localClient->api->setNetworkOption(FDBNetworkOptions::SUPPORTED_CLIENT_VERSIONS, versions);
});
if (!bypassMultiClientApi) {
runOnExternalClientsAllThreads([versions](Reference<ClientInfo> client) {
@ -2896,7 +2887,7 @@ THREAD_FUNC runSingleAssignmentVarTest(void* arg) {
checkUndestroyed.blockUntilReady();
}
onMainThreadVoid([done]() { *done = true; }, nullptr);
onMainThreadVoid([done]() { *done = true; });
} catch (Error& e) {
printf("Caught error in test: %s\n", e.name());
*done = true;

View File

@ -23,7 +23,7 @@
#include "flow/ProtocolVersion.h"
#pragma once
#include "bindings/c/foundationdb/fdb_c_options.g.h"
#include "fdbclient/fdb_c_options.g.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/IClientApi.h"
@ -395,8 +395,8 @@ public:
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
VersionVector getVersionVector() override;
SpanContext getSpanContext() override { return SpanContext(); };
ThreadFuture<VersionVector> getVersionVector() override;
ThreadFuture<SpanContext> getSpanContext() override { return SpanContext(); };
ThreadFuture<int64_t> getApproximateSize() override;
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;
@ -588,8 +588,8 @@ public:
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
VersionVector getVersionVector() override;
SpanContext getSpanContext() override;
ThreadFuture<VersionVector> getVersionVector() override;
ThreadFuture<SpanContext> getSpanContext() override;
ThreadFuture<int64_t> getApproximateSize() override;
void setOption(FDBTransactionOptions::Option option, Optional<StringRef> value = Optional<StringRef>()) override;

View File

@ -31,7 +31,7 @@
#include <vector>
#include "boost/algorithm/string.hpp"
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fmt/format.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/FDBTypes.h"
@ -63,7 +63,7 @@
#include "fdbclient/SystemData.h"
#include "fdbclient/TransactionLineage.h"
#include "fdbclient/versions.h"
#include "fdbclient/WellKnownEndpoints.h"
#include "fdbrpc/WellKnownEndpoints.h"
#include "fdbrpc/LoadBalance.h"
#include "fdbrpc/Net2FileSystem.h"
#include "fdbrpc/simulator.h"
@ -82,7 +82,7 @@
#include "flow/Platform.h"
#include "flow/SystemMonitor.h"
#include "flow/TLSConfig.actor.h"
#include "flow/Tracing.h"
#include "fdbclient/Tracing.h"
#include "flow/UnitTest.h"
#include "flow/network.h"
#include "flow/serialize.h"
@ -9588,3 +9588,27 @@ int64_t getMaxWriteKeySize(KeyRef const& key, bool hasRawAccess) {
int64_t getMaxClearKeySize(KeyRef const& key) {
return getMaxKeySize(key);
}
namespace NativeAPI {
ACTOR Future<std::vector<std::pair<StorageServerInterface, ProcessClass>>> getServerListAndProcessClasses(
Transaction* tr) {
state Future<std::vector<ProcessData>> workers = getWorkers(tr);
state Future<RangeResult> serverList = tr->getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY);
wait(success(workers) && success(serverList));
ASSERT(!serverList.get().more && serverList.get().size() < CLIENT_KNOBS->TOO_MANY);
std::map<Optional<Standalone<StringRef>>, ProcessData> id_data;
for (int i = 0; i < workers.get().size(); i++)
id_data[workers.get()[i].locality.processId()] = workers.get()[i];
std::vector<std::pair<StorageServerInterface, ProcessClass>> results;
for (int i = 0; i < serverList.get().size(); i++) {
auto ssi = decodeServerListValue(serverList.get()[i].value);
results.emplace_back(ssi, id_data[ssi.locality.processId()].processClass);
}
return results;
}
} // namespace NativeAPI

View File

@ -20,7 +20,7 @@
#pragma once
#include "flow/IRandom.h"
#include "flow/Tracing.h"
#include "fdbclient/Tracing.h"
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_NATIVEAPI_ACTOR_G_H)
#define FDBCLIENT_NATIVEAPI_ACTOR_G_H
#include "fdbclient/NativeAPI.actor.g.h"
@ -556,5 +556,9 @@ int64_t getMaxWriteKeySize(KeyRef const& key, bool hasRawAccess);
// Returns the maximum legal size of a key that can be cleared. Keys larger than this will be assumed not to exist.
int64_t getMaxClearKeySize(KeyRef const& key);
namespace NativeAPI {
ACTOR Future<std::vector<std::pair<StorageServerInterface, ProcessClass>>> getServerListAndProcessClasses(
Transaction* tr);
}
#include "flow/unactorcompiler.h"
#endif

View File

@ -21,7 +21,7 @@
#include "fdbclient/AnnotateActor.h"
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbclient/WellKnownEndpoints.h"
#include "fdbrpc/WellKnownEndpoints.h"
struct ProcessInterface {
constexpr static FileIdentifier file_identifier = 985636;

View File

@ -18,11 +18,11 @@
* limitations under the License.
*/
#include "fdbrpc/RESTClient.h"
#include "fdbclient/RESTClient.h"
#include "fdbrpc/HTTP.h"
#include "fdbrpc/IRateControl.h"
#include "fdbrpc/RESTUtils.h"
#include "fdbclient/HTTP.h"
#include "flow/IRateControl.h"
#include "fdbclient/RESTUtils.h"
#include "flow/Arena.h"
#include "flow/Error.h"
#include "flow/FastRef.h"

View File

@ -25,8 +25,8 @@
#pragma once
#include "fdbclient/JSONDoc.h"
#include "fdbrpc/HTTP.h"
#include "fdbrpc/RESTUtils.h"
#include "fdbclient/HTTP.h"
#include "fdbclient/RESTUtils.h"
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/flow.h"

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#include "fdbrpc/RESTUtils.h"
#include "fdbclient/RESTUtils.h"
#include "flow/flat_buffers.h"
#include "flow/UnitTest.h"
@ -268,4 +268,4 @@ TEST_CASE("/RESTUtils/ValidURIWithParams") {
ASSERT_EQ(r.resource.compare("foo/bar"), 0);
ASSERT_EQ(r.reqParameters.compare("param1,param2"), 0);
return Void();
}
}

View File

@ -37,11 +37,13 @@
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string.hpp>
#include "fdbrpc/IAsyncFile.h"
#include "flow/IAsyncFile.h"
#include "flow/Hostname.h"
#include "flow/UnitTest.h"
#include "fdbclient/rapidxml/rapidxml.hpp"
#ifdef BUILD_AWS_BACKUP
#include "fdbclient/FDBAWSCredentialsProvider.h"
#endif
#include "flow/actorcompiler.h" // has to be last include

View File

@ -25,8 +25,8 @@
#include "flow/flow.h"
#include "flow/Net2Packet.h"
#include "fdbclient/Knobs.h"
#include "fdbrpc/IRateControl.h"
#include "fdbrpc/HTTP.h"
#include "flow/IRateControl.h"
#include "fdbclient/HTTP.h"
#include "fdbclient/JSONDoc.h"
// Representation of all the things you need to connect to a blob store instance with some credentials.

View File

@ -307,7 +307,6 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( CLEAR_TIME_ESTIMATE, .00005 );
init( COMMIT_TIME_ESTIMATE, .005 );
init( CHECK_FREE_PAGE_AMOUNT, 100 ); if( randomize && BUGGIFY ) CHECK_FREE_PAGE_AMOUNT = 5;
init( DISK_METRIC_LOGGING_INTERVAL, 5.0 );
init( SOFT_HEAP_LIMIT, 300e6 );
init( SQLITE_PAGE_SCAN_ERROR_LIMIT, 10000 );

View File

@ -267,7 +267,6 @@ public:
double CLEAR_TIME_ESTIMATE;
double COMMIT_TIME_ESTIMATE;
int CHECK_FREE_PAGE_AMOUNT;
double DISK_METRIC_LOGGING_INTERVAL;
int64_t SOFT_HEAP_LIMIT;
int SQLITE_PAGE_SCAN_ERROR_LIMIT;

View File

@ -24,7 +24,6 @@
#include "fdbclient/DatabaseContext.h"
#include "fdbclient/IKnobCollection.h"
#include "fdbclient/SimpleConfigTransaction.h"
#include "fdbserver/Knobs.h"
#include "flow/Arena.h"
#include "flow/actorcompiler.h" // This must be the last #include.

View File

@ -349,13 +349,7 @@ ACTOR Future<Optional<StatusObject>> clientCoordinatorsStatusFetcher(Reference<I
int coordinatorsUnavailable = 0;
for (int i = 0; i < leaderServers.size(); i++) {
StatusObject coordStatus;
if (coord.clientLeaderServers[i].hostname.present()) {
coordStatus["address"] = coord.clientLeaderServers[i].hostname.get().toString();
} else {
coordStatus["address"] =
coord.clientLeaderServers[i].getLeader.getEndpoint().getPrimaryAddress().toString();
}
coordStatus["address"] = coord.clientLeaderServers[i].getAddressString();
if (leaderServers[i].isReady()) {
coordStatus["reachable"] = true;
} else {

View File

@ -35,7 +35,7 @@
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/TagThrottle.actor.h"
#include "fdbclient/Tenant.h"
#include "flow/Tracing.h"
#include "fdbclient/Tracing.h"
#include "flow/UnitTest.h"
#include "fdbclient/VersionVector.h"

View File

@ -27,7 +27,7 @@
#pragma once
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fmt/format.h"
#include "flow/Error.h"
#include "flow/flow.h"
#include "flow/network.h"

View File

@ -78,7 +78,8 @@ void ThreadSafeDatabase::setOption(FDBDatabaseOptions::Option option, Optional<S
db->checkDeferredError();
db->setOption(option, passValue.contents());
},
&db->deferredError);
db,
&DatabaseContext::deferredError);
}
ThreadFuture<int64_t> ThreadSafeDatabase::rebootWorker(const StringRef& address, bool check, int duration) {
@ -109,7 +110,7 @@ ThreadFuture<DatabaseSharedState*> ThreadSafeDatabase::createSharedState() {
void ThreadSafeDatabase::setSharedState(DatabaseSharedState* p) {
DatabaseContext* db = this->db;
onMainThreadVoid([db, p]() { db->setSharedState(p); }, nullptr);
onMainThreadVoid([db, p]() { db->setSharedState(p); });
}
// Return the main network thread busyness
@ -149,24 +150,22 @@ ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion)
// but run its constructor on the main thread
DatabaseContext* db = this->db = DatabaseContext::allocateOnForeignThread();
onMainThreadVoid(
[db, connFile, apiVersion]() {
try {
Database::createDatabase(
Reference<ClusterConnectionFile>(connFile), apiVersion, IsInternal::False, LocalityData(), db)
.extractPtr();
} catch (Error& e) {
new (db) DatabaseContext(e);
} catch (...) {
new (db) DatabaseContext(unknown_error());
}
},
nullptr);
onMainThreadVoid([db, connFile, apiVersion]() {
try {
Database::createDatabase(
Reference<ClusterConnectionFile>(connFile), apiVersion, IsInternal::False, LocalityData(), db)
.extractPtr();
} catch (Error& e) {
new (db) DatabaseContext(e);
} catch (...) {
new (db) DatabaseContext(unknown_error());
}
});
}
ThreadSafeDatabase::~ThreadSafeDatabase() {
DatabaseContext* db = this->db;
onMainThreadVoid([db]() { db->delref(); }, nullptr);
onMainThreadVoid([db]() { db->delref(); });
}
Reference<ITransaction> ThreadSafeTenant::createTransaction() {
@ -194,7 +193,7 @@ ThreadSafeTenant::~ThreadSafeTenant() {}
ThreadSafeTransaction::ThreadSafeTransaction(DatabaseContext* cx,
ISingleThreadTransaction::Type type,
Optional<TenantName> tenant)
: tenantName(tenant) {
: tenantName(tenant), initialized(std::make_shared<std::atomic_bool>(false)) {
// Allocate memory for the transaction from this thread (so the pointer is known for subsequent method calls)
// but run its constructor on the main thread
@ -203,29 +202,30 @@ ThreadSafeTransaction::ThreadSafeTransaction(DatabaseContext* cx,
// immediately after this call, it will defer the DatabaseContext::delref (and onMainThread preserves the order of
// these operations).
auto tr = this->tr = ISingleThreadTransaction::allocateOnForeignThread(type);
auto init = this->initialized;
// No deferred error -- if the construction of the RYW transaction fails, we have no where to put it
onMainThreadVoid(
[tr, cx, type, tenant]() {
cx->addref();
if (tenant.present()) {
if (type == ISingleThreadTransaction::Type::RYW) {
new (tr) ReadYourWritesTransaction(Database(cx), tenant.get());
} else {
tr->construct(Database(cx), tenant.get());
}
} else {
if (type == ISingleThreadTransaction::Type::RYW) {
new (tr) ReadYourWritesTransaction(Database(cx));
} else {
tr->construct(Database(cx));
}
}
},
nullptr);
onMainThreadVoid([tr, cx, type, tenant, init]() {
cx->addref();
if (tenant.present()) {
if (type == ISingleThreadTransaction::Type::RYW) {
new (tr) ReadYourWritesTransaction(Database(cx), tenant.get());
} else {
tr->construct(Database(cx), tenant.get());
}
} else {
if (type == ISingleThreadTransaction::Type::RYW) {
new (tr) ReadYourWritesTransaction(Database(cx));
} else {
tr->construct(Database(cx));
}
}
*init = true;
});
}
// This constructor is only used while refactoring fdbcli and only called from the main thread
ThreadSafeTransaction::ThreadSafeTransaction(ReadYourWritesTransaction* ryw) : tr(ryw) {
ThreadSafeTransaction::ThreadSafeTransaction(ReadYourWritesTransaction* ryw)
: tr(ryw), initialized(std::make_shared<std::atomic_bool>(true)) {
if (tr)
tr->addref();
}
@ -233,17 +233,17 @@ ThreadSafeTransaction::ThreadSafeTransaction(ReadYourWritesTransaction* ryw) : t
ThreadSafeTransaction::~ThreadSafeTransaction() {
ISingleThreadTransaction* tr = this->tr;
if (tr)
onMainThreadVoid([tr]() { tr->delref(); }, nullptr);
onMainThreadVoid([tr]() { tr->delref(); });
}
void ThreadSafeTransaction::cancel() {
ISingleThreadTransaction* tr = this->tr;
onMainThreadVoid([tr]() { tr->cancel(); }, nullptr);
onMainThreadVoid([tr]() { tr->cancel(); });
}
void ThreadSafeTransaction::setVersion(Version v) {
ISingleThreadTransaction* tr = this->tr;
onMainThreadVoid([tr, v]() { tr->setVersion(v); }, &tr->deferredError);
onMainThreadVoid([tr, v]() { tr->setVersion(v); }, tr, &ISingleThreadTransaction::deferredError);
}
ThreadFuture<Version> ThreadSafeTransaction::getReadVersion() {
@ -402,12 +402,12 @@ void ThreadSafeTransaction::addReadConflictRange(const KeyRangeRef& keys) {
KeyRange r = keys;
ISingleThreadTransaction* tr = this->tr;
onMainThreadVoid([tr, r]() { tr->addReadConflictRange(r); }, &tr->deferredError);
onMainThreadVoid([tr, r]() { tr->addReadConflictRange(r); }, tr, &ISingleThreadTransaction::deferredError);
}
void ThreadSafeTransaction::makeSelfConflicting() {
ISingleThreadTransaction* tr = this->tr;
onMainThreadVoid([tr]() { tr->makeSelfConflicting(); }, &tr->deferredError);
onMainThreadVoid([tr]() { tr->makeSelfConflicting(); }, tr, &ISingleThreadTransaction::deferredError);
}
void ThreadSafeTransaction::atomicOp(const KeyRef& key, const ValueRef& value, uint32_t operationType) {
@ -415,7 +415,9 @@ void ThreadSafeTransaction::atomicOp(const KeyRef& key, const ValueRef& value, u
Value v = value;
ISingleThreadTransaction* tr = this->tr;
onMainThreadVoid([tr, k, v, operationType]() { tr->atomicOp(k, v, operationType); }, &tr->deferredError);
onMainThreadVoid([tr, k, v, operationType]() { tr->atomicOp(k, v, operationType); },
tr,
&ISingleThreadTransaction::deferredError);
}
void ThreadSafeTransaction::set(const KeyRef& key, const ValueRef& value) {
@ -423,14 +425,14 @@ void ThreadSafeTransaction::set(const KeyRef& key, const ValueRef& value) {
Value v = value;
ISingleThreadTransaction* tr = this->tr;
onMainThreadVoid([tr, k, v]() { tr->set(k, v); }, &tr->deferredError);
onMainThreadVoid([tr, k, v]() { tr->set(k, v); }, tr, &ISingleThreadTransaction::deferredError);
}
void ThreadSafeTransaction::clear(const KeyRangeRef& range) {
KeyRange r = range;
ISingleThreadTransaction* tr = this->tr;
onMainThreadVoid([tr, r]() { tr->clear(r); }, &tr->deferredError);
onMainThreadVoid([tr, r]() { tr->clear(r); }, tr, &ISingleThreadTransaction::deferredError);
}
void ThreadSafeTransaction::clear(const KeyRef& begin, const KeyRef& end) {
@ -445,14 +447,15 @@ void ThreadSafeTransaction::clear(const KeyRef& begin, const KeyRef& end) {
tr->clear(KeyRangeRef(b, e));
},
&tr->deferredError);
tr,
&ISingleThreadTransaction::deferredError);
}
void ThreadSafeTransaction::clear(const KeyRef& key) {
Key k = key;
ISingleThreadTransaction* tr = this->tr;
onMainThreadVoid([tr, k]() { tr->clear(k); }, &tr->deferredError);
onMainThreadVoid([tr, k]() { tr->clear(k); }, tr, &ISingleThreadTransaction::deferredError);
}
ThreadFuture<Void> ThreadSafeTransaction::watch(const KeyRef& key) {
@ -469,7 +472,7 @@ void ThreadSafeTransaction::addWriteConflictRange(const KeyRangeRef& keys) {
KeyRange r = keys;
ISingleThreadTransaction* tr = this->tr;
onMainThreadVoid([tr, r]() { tr->addWriteConflictRange(r); }, &tr->deferredError);
onMainThreadVoid([tr, r]() { tr->addWriteConflictRange(r); }, tr, &ISingleThreadTransaction::deferredError);
}
ThreadFuture<Void> ThreadSafeTransaction::commit() {
@ -482,15 +485,20 @@ ThreadFuture<Void> ThreadSafeTransaction::commit() {
Version ThreadSafeTransaction::getCommittedVersion() {
// This should be thread safe when called legally, but it is fragile
if (!initialized || !*initialized) {
return ::invalidVersion;
}
return tr->getCommittedVersion();
}
VersionVector ThreadSafeTransaction::getVersionVector() {
return tr->getVersionVector();
ThreadFuture<VersionVector> ThreadSafeTransaction::getVersionVector() {
ISingleThreadTransaction* tr = this->tr;
return onMainThread([tr]() -> Future<VersionVector> { return tr->getVersionVector(); });
}
SpanContext ThreadSafeTransaction::getSpanContext() {
return tr->getSpanContext();
ThreadFuture<SpanContext> ThreadSafeTransaction::getSpanContext() {
ISingleThreadTransaction* tr = this->tr;
return onMainThread([tr]() -> Future<SpanContext> { return tr->getSpanContext(); });
}
ThreadFuture<int64_t> ThreadSafeTransaction::getApproximateSize() {
@ -513,7 +521,9 @@ void ThreadSafeTransaction::setOption(FDBTransactionOptions::Option option, Opti
Standalone<Optional<StringRef>> passValue = value;
// ThreadSafeTransaction is not allowed to do anything with options except pass them through to RYW.
onMainThreadVoid([tr, option, passValue]() { tr->setOption(option, passValue.contents()); }, &tr->deferredError);
onMainThreadVoid([tr, option, passValue]() { tr->setOption(option, passValue.contents()); },
tr,
&ISingleThreadTransaction::deferredError);
}
ThreadFuture<Void> ThreadSafeTransaction::checkDeferredError() {
@ -541,16 +551,18 @@ Optional<TenantName> ThreadSafeTransaction::getTenant() {
void ThreadSafeTransaction::operator=(ThreadSafeTransaction&& r) noexcept {
tr = r.tr;
r.tr = nullptr;
initialized = std::move(r.initialized);
}
ThreadSafeTransaction::ThreadSafeTransaction(ThreadSafeTransaction&& r) noexcept {
tr = r.tr;
r.tr = nullptr;
initialized = std::move(r.initialized);
}
void ThreadSafeTransaction::reset() {
ISingleThreadTransaction* tr = this->tr;
onMainThreadVoid([tr]() { tr->reset(); }, nullptr);
onMainThreadVoid([tr]() { tr->reset(); });
}
extern const char* getSourceVersion();

View File

@ -170,8 +170,8 @@ public:
ThreadFuture<Void> commit() override;
Version getCommittedVersion() override;
VersionVector getVersionVector() override;
SpanContext getSpanContext() override;
ThreadFuture<VersionVector> getVersionVector() override;
ThreadFuture<SpanContext> getSpanContext() override;
ThreadFuture<int64_t> getApproximateSize() override;
ThreadFuture<uint64_t> getProtocolVersion();
@ -184,7 +184,7 @@ public:
Optional<TenantName> getTenant() override;
// These are to permit use as state variables in actors:
ThreadSafeTransaction() : tr(nullptr) {}
ThreadSafeTransaction() : tr(nullptr), initialized(std::make_shared<std::atomic_bool>(false)) {}
void operator=(ThreadSafeTransaction&& r) noexcept;
ThreadSafeTransaction(ThreadSafeTransaction&& r) noexcept;
@ -196,6 +196,7 @@ public:
private:
ISingleThreadTransaction* tr;
const Optional<TenantName> tenantName;
std::shared_ptr<std::atomic_bool> initialized;
};
// An implementation of IClientApi that serializes operations onto the network thread and interacts with the lower-level

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#include "flow/Tracing.h"
#include "fdbclient/Tracing.h"
#include "flow/IRandom.h"
#include "flow/UnitTest.h"
#include "flow/Knobs.h"

View File

@ -20,9 +20,11 @@
#pragma once
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/FlowTransport.h"
#include "flow/network.h"
#include "flow/IRandom.h"
#include "flow/Arena.h"
#include "fdbrpc/FlowTransport.h"
#include "fdbclient/FDBTypes.h"
#include <unordered_set>
#include <atomic>

View File

@ -1,6 +1,7 @@
set(FDBMONITOR_SRCS fdbmonitor.cpp)
add_executable(fdbmonitor ${FDBMONITOR_SRCS})
target_include_directories(fdbmonitor PUBLIC "${CMAKE_BINARY_DIR}/flow/include" "${CMAKE_BINARY_DIR}/fdbclient/include")
strip_debug_symbols(fdbmonitor)
assert_no_version_h(fdbmonitor)
if(UNIX AND NOT APPLE)

View File

@ -32,7 +32,7 @@
#include <type_traits>
#include "flow/flow.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/IAsyncFile.h"
#include "flow/Knobs.h"
#include "flow/TDMetric.actor.h"
#include "flow/network.h"

View File

@ -22,7 +22,7 @@
#include "flow/flow.h"
#include "flow/serialize.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/IAsyncFile.h"
#include "flow/network.h"
#include "flow/ActorCollection.h"

View File

@ -35,10 +35,10 @@
#include <fcntl.h>
#include <sys/stat.h>
#include "fdbrpc/libeio/eio.h"
#include "eio.h"
#include "flow/flow.h"
#include "flow/ThreadHelper.actor.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/IAsyncFile.h"
#include "flow/TDMetric.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -538,7 +538,7 @@ private:
static void eio_want_poll() {
want_poll = 1;
// SOMEDAY: nullptr for deferred error, no analysis of correctness (itp)
onMainThreadVoid([]() { poll_eio(); }, nullptr, TaskPriority::PollEIO);
onMainThreadVoid([]() { poll_eio(); }, TaskPriority::PollEIO);
}
static int eio_callback(eio_req* req) {

View File

@ -20,7 +20,7 @@
#pragma once
#include "fdbrpc/IAsyncFile.h"
#include "flow/IAsyncFile.h"
#include "flow/FastRef.h"
#include "flow/flow.h"
#include "flow/IRandom.h"

View File

@ -29,7 +29,7 @@
#elif !defined(FLOW_ASYNCFILEKAIO_ACTOR_H)
#define FLOW_ASYNCFILEKAIO_ACTOR_H
#include "fdbrpc/IAsyncFile.h"
#include "flow/IAsyncFile.h"
#include <stdio.h>
#include <fcntl.h>
@ -37,7 +37,6 @@
#include <sys/eventfd.h>
#include <sys/syscall.h>
#include "fdbrpc/linux_kaio.h"
#include "fdbserver/Knobs.h"
#include "flow/Knobs.h"
#include "flow/Histogram.h"
#include "flow/UnitTest.h"
@ -649,7 +648,7 @@ private:
Reference<HistogramRegistry>(), "AsyncFileKAIO", "WriteLatency", Histogram::Unit::microseconds));
metrics.syncLatencyDist = Reference<Histogram>(new Histogram(
Reference<HistogramRegistry>(), "AsyncFileKAIO", "SyncLatency", Histogram::Unit::microseconds));
g_asyncFileKAIOHistogramLogger = histogramLogger(SERVER_KNOBS->DISK_METRIC_LOGGING_INTERVAL);
g_asyncFileKAIOHistogramLogger = histogramLogger(FLOW_KNOBS->DISK_METRIC_LOGGING_INTERVAL);
}
}

View File

@ -29,7 +29,7 @@
#define FLOW_ASYNCFILENONDURABLE_ACTOR_H
#include "flow/flow.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/IAsyncFile.h"
#include "flow/ActorCollection.h"
#include "fdbrpc/simulator.h"
#include "fdbrpc/TraceFileIO.h"

View File

@ -29,7 +29,7 @@
#define FDBRPC_ASYNCFILEREADAHEAD_ACTOR_H
#include "flow/flow.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/IAsyncFile.h"
#include "flow/actorcompiler.h" // This must be the last #include.
// Read-only file type that wraps another file instance, reads in large blocks, and reads ahead of the actual range

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#include "fdbrpc/IAsyncFile.h"
#include "flow/IAsyncFile.h"
#include "flow/crc32c.h"
#if VALGRIND

View File

@ -1,4 +1,5 @@
set(FDBRPC_SRCS
ActorFuzz.h
AsyncFileCached.actor.h
AsyncFileChaos.h
AsyncFileEIO.actor.h
@ -10,46 +11,64 @@ set(FDBRPC_SRCS
AsyncFileCached.actor.cpp
AsyncFileEncrypted.actor.cpp
AsyncFileNonDurable.actor.cpp
AsyncFileWriteChecker.h
AsyncFileWriteChecker.cpp
Base64UrlDecode.h
Base64UrlDecode.cpp
Base64UrlEncode.h
Base64UrlEncode.cpp
ContinuousSample.h
FailureMonitor.h
FailureMonitor.actor.cpp
fdbrpc.h
FlowProcess.actor.h
FlowTransport.h
FlowTransport.actor.cpp
genericactors.actor.h
genericactors.actor.cpp
HealthMonitor.h
HealthMonitor.actor.cpp
HTTP.actor.cpp
IAsyncFile.actor.cpp
IPAllowList.h
IPAllowList.cpp
LoadBalance.actor.cpp
linux_kaio.h
LoadBalance.h
LoadBalance.actor.h
LoadBalance.actor.cpp
Locality.h
Locality.cpp
MultiInterface.h
Net2FileSystem.h
Net2FileSystem.cpp
networksender.actor.h
PerfMetric.cpp
PerfMetric.h
QueueModel.h
QueueModel.cpp
RangeMap.h
Replication.h
ReplicationPolicy.h
ReplicationPolicy.cpp
ReplicationTypes.h
ReplicationTypes.cpp
ReplicationUtils.h
ReplicationUtils.cpp
RESTClient.h
RESTClient.actor.cpp
RESTUtils.h
RESTUtils.actor.cpp
SimExternalConnection.actor.cpp
SimExternalConnection.h
Smoother.h
Stats.actor.cpp
Stats.h
simulator.h
sim2.actor.cpp
sim_validation.h
sim_validation.cpp
TenantInfo.h
TimedRequest.h
TokenSign.h
TokenSign.cpp
TraceFileIO.h
TraceFileIO.cpp
TSSComparison.h)
TSSComparison.h
WellKnownEndpoints.h)
set(COMPILE_EIO OFF)
@ -105,6 +124,11 @@ if(${COROUTINE_IMPL} STREQUAL libcoro)
list(APPEND CORO_SRCS libcoroutine/context.c)
endif()
add_library(coro STATIC ${CORO_SRCS})
target_include_directories(coro PRIVATE ${CMAKE_BINARY_DIR}/flow/include)
target_include_directories(coro PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/libcoroutine)
# we don't want to link against flow, but we need tob make sure it is built first
# since we rely on files being copied because we include Platform.h
add_dependencies(coro flow)
if(WIN32)
target_compile_definitions(coro PRIVATE USE_FIBERS)
else()
@ -114,6 +138,6 @@ if(${COROUTINE_IMPL} STREQUAL libcoro)
if(USE_VALGRIND)
target_link_libraries(coro PUBLIC Valgrind)
endif()
target_link_libraries(fdbrpc PRIVATE coro)
target_link_libraries(fdbrpc_sampling PRIVATE coro)
target_link_libraries(fdbrpc PUBLIC coro)
target_link_libraries(fdbrpc_sampling PUBLIC coro)
endif()

View File

@ -27,7 +27,7 @@
#include "flow/IThreadPool.h"
#include "flow/WriteOnlySet.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/IAsyncFile.h"
#include "flow/TLSConfig.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.

View File

@ -23,7 +23,7 @@
#include <string>
#pragma once
#include "fdbrpc/IAsyncFile.h"
#include "flow/IAsyncFile.h"
class Net2FileSystem final : public IAsyncFileSystem {
public:

View File

@ -25,7 +25,6 @@
#include <boost/range.hpp>
#include <thread>
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/SimExternalConnection.h"
#include "flow/Net2Packet.h"
#include "flow/Platform.h"
@ -214,7 +213,8 @@ TEST_CASE("fdbrpc/SimExternalClient") {
// Wait until server is ready
threadSleep(0.01);
}
state Key data = deterministicRandom()->randomAlphaNumeric(deterministicRandom()->randomInt(0, maxDataLength + 1));
state Standalone<StringRef> data =
deterministicRandom()->randomAlphaNumeric(deterministicRandom()->randomInt(0, maxDataLength + 1));
PacketWriter packetWriter(packetQueue.getWriteBuffer(data.size()), nullptr, Unversioned());
packetWriter.serializeBytes(data);
wait(externalConn->onWritable());

View File

@ -28,7 +28,7 @@
#include "flow/genericactors.actor.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbclient/WellKnownEndpoints.h"
#include "fdbrpc/WellKnownEndpoints.h"
#include "flow/Hostname.h"
#include "flow/actorcompiler.h" // This must be the last #include.

View File

@ -22,7 +22,7 @@
#include <memory>
#include <string>
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fmt/format.h"
#include "fdbrpc/simulator.h"
#include "flow/Arena.h"
#define BOOST_SYSTEM_NO_LIB
@ -35,7 +35,7 @@
#include "flow/ProtocolVersion.h"
#include "flow/Util.h"
#include "flow/WriteOnlySet.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/IAsyncFile.h"
#include "fdbrpc/AsyncFileCached.actor.h"
#include "fdbrpc/AsyncFileEncrypted.h"
#include "fdbrpc/AsyncFileNonDurable.actor.h"
@ -54,6 +54,16 @@
#include "flow/FaultInjection.h"
#include "flow/actorcompiler.h" // This must be the last #include.
ISimulator* g_pSimulator = nullptr;
thread_local ISimulator::ProcessInfo* ISimulator::currentProcess = nullptr;
ISimulator::ISimulator()
: desiredCoordinators(1), physicalDatacenters(1), processesPerMachine(0), listenersPerProcess(1), usableRegions(1),
allowLogSetKills(true), tssMode(TSSMode::Disabled), isStopped(false), lastConnectionFailure(0),
connectionFailuresDisableDuration(0), speedUpSimulation(false), backupAgents(BackupAgentType::WaitForType),
drAgents(BackupAgentType::WaitForType), allSwapsDisabled(false) {}
ISimulator::~ISimulator() = default;
bool simulator_should_inject_fault(const char* context, const char* file, int line, int error_code) {
if (!g_network->isSimulated() || !faultInjectionActivated)
return false;

View File

@ -29,7 +29,7 @@
#include "flow/Histogram.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbrpc/Locality.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/IAsyncFile.h"
#include "flow/TDMetric.actor.h"
#include <random>
#include "fdbrpc/ReplicationPolicy.h"
@ -419,7 +419,7 @@ public:
int listenersPerProcess;
std::set<NetworkAddress> protectedAddresses;
std::map<NetworkAddress, ProcessInfo*> currentlyRebootingProcesses;
std::unique_ptr<class ClusterConnectionString> extraDB;
class ClusterConnectionString* extraDB = nullptr;
Reference<IReplicationPolicy> storagePolicy;
Reference<IReplicationPolicy> tLogPolicy;
int32_t tLogWriteAntiQuorum;

View File

@ -20,7 +20,7 @@
#ifndef ART_MUTATION_BUFFER
#define ART_MUTATION_BUFFER
#include "art.h"
#include "fdbserver/art.h"
#include "flow/Arena.h"
struct MutationBufferART {

View File

@ -34,7 +34,7 @@
#include "flow/Error.h"
#include "flow/IRandom.h"
#include "flow/Tracing.h"
#include "fdbclient/Tracing.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#define SevDebugMemory SevVerbose

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fmt/format.h"
#include "fdbclient/BlobGranuleCommon.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/FDBTypes.h"

View File

@ -25,7 +25,7 @@
#include <vector>
#include <unordered_map>
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fmt/format.h"
#include "fdbclient/BackupContainerFileSystem.h"
#include "fdbclient/BlobGranuleCommon.h"
#include "fdbclient/BlobWorkerInterface.h"

View File

@ -23,7 +23,7 @@
#include <utility>
#include <vector>
#include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "fmt/format.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/SystemData.h"
#include "fdbclient/BackupContainerFileSystem.h"

View File

@ -1,6 +1,9 @@
set(FDBSERVER_SRCS
ApplyMetadataMutation.cpp
ApplyMetadataMutation.h
art.h
art_impl.h
ArtMutationBuffer.h
BackupInterface.h
BackupProgress.actor.cpp
BackupProgress.actor.h
@ -19,6 +22,7 @@ set(FDBSERVER_SRCS
CommitProxyServer.actor.cpp
ConfigBroadcaster.actor.cpp
ConfigBroadcaster.h
ConfigBroadcastInterface.h
ConfigDatabaseUnitTests.actor.cpp
ConfigFollowerInterface.cpp
ConfigFollowerInterface.h
@ -38,7 +42,9 @@ set(FDBSERVER_SRCS
DBCoreState.h
DDTeamCollection.actor.cpp
DDTeamCollection.h
DDTxnProcessor.h
DDTxnProcessor.actor.cpp
DeltaTree.h
DiskQueue.actor.cpp
EncryptKeyProxy.actor.cpp
EncryptKeyProxyInterface.h
@ -338,6 +344,7 @@ add_library(fdb_sqlite STATIC
sqlite/sqliteInt.h
sqlite/sqliteLimit.h
sqlite/sqlite3.amalgamation.c)
target_include_directories(fdb_sqlite PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/sqlite)
if (WITH_ROCKSDB_EXPERIMENTAL)
add_definitions(-DSSD_ROCKSDB_EXPERIMENTAL)

View File

@ -2779,11 +2779,14 @@ TEST_CASE("/fdbserver/clustercontroller/updateWorkerHealth") {
ASSERT(health.degradedPeers.find(badPeer1) != health.degradedPeers.end());
ASSERT_EQ(health.degradedPeers[badPeer1].startTime, health.degradedPeers[badPeer1].lastRefreshTime);
ASSERT(health.degradedPeers.find(badPeer2) != health.degradedPeers.end());
ASSERT_EQ(health.degradedPeers[badPeer2].startTime, health.degradedPeers[badPeer2].lastRefreshTime);
}
// Create a `UpdateWorkerHealthRequest` with two bad peers, one from the previous test and a new one.
// The one from the previous test should have lastRefreshTime updated.
// The other one from the previous test not included in this test should be removed.
// The other one from the previous test not included in this test should not be removed.
state double previousStartTime;
state double previousRefreshTime;
{
// Make the time to move so that now() guarantees to return a larger value than before.
wait(delay(0.001));
@ -2794,20 +2797,31 @@ TEST_CASE("/fdbserver/clustercontroller/updateWorkerHealth") {
data.updateWorkerHealth(req);
ASSERT(data.workerHealth.find(workerAddress) != data.workerHealth.end());
auto& health = data.workerHealth[workerAddress];
ASSERT_EQ(health.degradedPeers.size(), 2);
ASSERT_EQ(health.degradedPeers.size(), 3);
ASSERT(health.degradedPeers.find(badPeer1) != health.degradedPeers.end());
ASSERT_LT(health.degradedPeers[badPeer1].startTime, health.degradedPeers[badPeer1].lastRefreshTime);
ASSERT(health.degradedPeers.find(badPeer2) == health.degradedPeers.end());
ASSERT(health.degradedPeers.find(badPeer2) != health.degradedPeers.end());
ASSERT_EQ(health.degradedPeers[badPeer2].startTime, health.degradedPeers[badPeer2].lastRefreshTime);
ASSERT_EQ(health.degradedPeers[badPeer2].startTime, health.degradedPeers[badPeer1].startTime);
ASSERT(health.degradedPeers.find(badPeer3) != health.degradedPeers.end());
ASSERT_EQ(health.degradedPeers[badPeer3].startTime, health.degradedPeers[badPeer3].lastRefreshTime);
previousStartTime = health.degradedPeers[badPeer3].startTime;
previousRefreshTime = health.degradedPeers[badPeer3].lastRefreshTime;
}
// Create a `UpdateWorkerHealthRequest` with empty `degradedPeers`, which should remove the worker from
// Create a `UpdateWorkerHealthRequest` with empty `degradedPeers`, which should not remove the worker from
// `workerHealth`.
{
wait(delay(0.001));
UpdateWorkerHealthRequest req;
req.address = workerAddress;
data.updateWorkerHealth(req);
ASSERT(data.workerHealth.find(workerAddress) == data.workerHealth.end());
ASSERT(data.workerHealth.find(workerAddress) != data.workerHealth.end());
auto& health = data.workerHealth[workerAddress];
ASSERT_EQ(health.degradedPeers.size(), 3);
ASSERT(health.degradedPeers.find(badPeer3) != health.degradedPeers.end());
ASSERT_EQ(health.degradedPeers[badPeer3].startTime, previousStartTime);
ASSERT_EQ(health.degradedPeers[badPeer3].lastRefreshTime, previousRefreshTime);
}
return Void();

View File

@ -2907,13 +2907,6 @@ public:
.detail("WorkerAddress", req.address)
.detail("DegradedPeers", degradedPeersString);
// `req.degradedPeers` contains the latest peer performance view from the worker. Clear the worker if the
// requested worker doesn't see any degraded peers.
if (req.degradedPeers.empty()) {
workerHealth.erase(req.address);
return;
}
double currentTime = now();
// Current `workerHealth` doesn't have any information about the incoming worker. Add the worker into
@ -2931,21 +2924,6 @@ public:
auto& health = workerHealth[req.address];
// First, remove any degraded peers recorded in the `workerHealth`, but aren't in the incoming request. These
// machines network performance should have recovered.
std::unordered_set<NetworkAddress> recoveredPeers;
for (const auto& [peer, times] : health.degradedPeers) {
recoveredPeers.insert(peer);
}
for (const auto& peer : req.degradedPeers) {
if (recoveredPeers.find(peer) != recoveredPeers.end()) {
recoveredPeers.erase(peer);
}
}
for (const auto& peer : recoveredPeers) {
health.degradedPeers.erase(peer);
}
// Update the worker's degradedPeers.
for (const auto& peer : req.degradedPeers) {
auto it = health.degradedPeers.find(peer);

View File

@ -52,7 +52,7 @@
#include "flow/IRandom.h"
#include "flow/Knobs.h"
#include "flow/Trace.h"
#include "flow/Tracing.h"
#include "fdbclient/Tracing.h"
#include "flow/actorcompiler.h" // This must be the last #include.

View File

@ -23,7 +23,7 @@
#pragma once
#include "fdbclient/CoordinationInterface.h"
#include "fdbclient/WellKnownEndpoints.h"
#include "fdbrpc/WellKnownEndpoints.h"
#include "fdbserver/ConfigFollowerInterface.h"
#include "fdbserver/ConfigBroadcastInterface.h"

View File

@ -20,7 +20,7 @@
#include "fdbserver/CoroFlow.h"
#include "flow/ActorCollection.h"
#include "fdbrpc/libcoroutine/Coro.h"
#include "Coro.h"
#include "flow/TDMetric.actor.h"
#include "fdbrpc/simulator.h"
#include "flow/actorcompiler.h" // has to be last include

View File

@ -2748,7 +2748,7 @@ public:
when(wait(checkSignal)) {
checkSignal = Never();
isFetchingResults = true;
serverListAndProcessClasses = getServerListAndProcessClasses(&tr);
serverListAndProcessClasses = NativeAPI::getServerListAndProcessClasses(&tr);
}
when(std::vector<std::pair<StorageServerInterface, ProcessClass>> results =
wait(serverListAndProcessClasses)) {
@ -2790,7 +2790,7 @@ public:
when(waitNext(serverRemoved)) {
if (isFetchingResults) {
tr = Transaction(self->cx);
serverListAndProcessClasses = getServerListAndProcessClasses(&tr);
serverListAndProcessClasses = NativeAPI::getServerListAndProcessClasses(&tr);
}
}
}

View File

@ -19,6 +19,7 @@
*/
#include "fdbserver/DDTxnProcessor.h"
#include "fdbclient/NativeAPI.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
class DDTxnProcessorImpl {
@ -91,4 +92,9 @@ class DDTxnProcessorImpl {
Future<IDDTxnProcessor::SourceServers> DDTxnProcessor::getSourceServersForRange(const KeyRangeRef range) {
return DDTxnProcessorImpl::getSourceServersForRange(cx, range);
}
Future<std::vector<std::pair<StorageServerInterface, ProcessClass>>> DDTxnProcessor::getServerListAndProcessClasses() {
Transaction tr(cx);
return NativeAPI::getServerListAndProcessClasses(&tr);
}

View File

@ -31,6 +31,10 @@ public:
};
// get the source server list and complete source server list for range
virtual Future<SourceServers> getSourceServersForRange(const KeyRangeRef range) = 0;
// get the storage server list and Process class
virtual Future<std::vector<std::pair<StorageServerInterface, ProcessClass>>> getServerListAndProcessClasses() = 0;
virtual ~IDDTxnProcessor() = default;
};
@ -46,6 +50,9 @@ public:
explicit DDTxnProcessor(Database cx) : cx(cx) {}
Future<SourceServers> getSourceServersForRange(const KeyRangeRef range) override;
// Call NativeAPI implementation directly
Future<std::vector<std::pair<StorageServerInterface, ProcessClass>>> getServerListAndProcessClasses() override;
};
// run mock transaction

View File

@ -322,26 +322,6 @@ Future<Void> StorageWiggler::finishWiggle() {
return StorageWiggleMetrics::runSetTransaction(teamCollection->cx, teamCollection->isPrimary(), metrics);
}
ACTOR Future<std::vector<std::pair<StorageServerInterface, ProcessClass>>> getServerListAndProcessClasses(
Transaction* tr) {
state Future<std::vector<ProcessData>> workers = getWorkers(tr);
state Future<RangeResult> serverList = tr->getRange(serverListKeys, CLIENT_KNOBS->TOO_MANY);
wait(success(workers) && success(serverList));
ASSERT(!serverList.get().more && serverList.get().size() < CLIENT_KNOBS->TOO_MANY);
std::map<Optional<Standalone<StringRef>>, ProcessData> id_data;
for (int i = 0; i < workers.get().size(); i++)
id_data[workers.get()[i].locality.processId()] = workers.get()[i];
std::vector<std::pair<StorageServerInterface, ProcessClass>> results;
for (int i = 0; i < serverList.get().size(); i++) {
auto ssi = decodeServerListValue(serverList.get()[i].value);
results.emplace_back(ssi, id_data[ssi.locality.processId()].processClass);
}
return results;
}
ACTOR Future<Void> remoteRecovered(Reference<AsyncVar<ServerDBInfo> const> db) {
TraceEvent("DDTrackerStarting").log();
while (db->get().recoveryState < RecoveryState::ALL_LOGS_RECRUITED) {

View File

@ -518,7 +518,5 @@ struct StorageWiggler : ReferenceCounted<StorageWiggler> {
}
};
ACTOR Future<std::vector<std::pair<StorageServerInterface, ProcessClass>>> getServerListAndProcessClasses(
Transaction* tr);
#include "flow/unactorcompiler.h"
#endif

View File

@ -19,7 +19,7 @@
*/
#include "fdbserver/IDiskQueue.h"
#include "fdbrpc/IAsyncFile.h"
#include "flow/IAsyncFile.h"
#include "fdbserver/Knobs.h"
#include "fdbrpc/simulator.h"
#include "flow/crc32c.h"

View File

@ -27,7 +27,7 @@
#include "fdbrpc/FlowProcess.actor.h"
#include "fdbrpc/Net2FileSystem.h"
#include "fdbrpc/simulator.h"
#include "fdbclient/WellKnownEndpoints.h"
#include "fdbrpc/WellKnownEndpoints.h"
#include "fdbclient/versions.h"
#include "fdbserver/CoroFlow.h"
#include "fdbserver/FDBExecHelper.actor.h"

View File

@ -110,9 +110,9 @@ std::string getErrorReason(BackgroundErrorReason reason) {
// could potentially cause segmentation fault.
class RocksDBErrorListener : public rocksdb::EventListener {
public:
RocksDBErrorListener(){};
RocksDBErrorListener(UID id) : id(id){};
void OnBackgroundError(rocksdb::BackgroundErrorReason reason, rocksdb::Status* bg_error) override {
TraceEvent(SevError, "RocksDBBGError")
TraceEvent(SevError, "RocksDBBGError", id)
.detail("Reason", getErrorReason(reason))
.detail("RocksDBSeverity", bg_error->severity())
.detail("Status", bg_error->ToString());
@ -145,6 +145,7 @@ public:
private:
ThreadReturnPromise<Void> errorPromise;
std::mutex mutex;
UID id;
};
using DB = rocksdb::DB*;
using CF = rocksdb::ColumnFamilyHandle*;
@ -365,12 +366,11 @@ gets deleted as the ref count becomes 0.
*/
class ReadIteratorPool {
public:
ReadIteratorPool(DB& db, CF& cf, const std::string& path)
ReadIteratorPool(UID id, DB& db, CF& cf)
: db(db), cf(cf), index(0), iteratorsReuseCount(0), readRangeOptions(getReadOptions()) {
readRangeOptions.background_purge_on_iterator_cleanup = true;
readRangeOptions.auto_prefix_mode = (SERVER_KNOBS->ROCKSDB_PREFIX_LEN > 0);
TraceEvent("ReadIteratorPool")
.detail("Path", path)
TraceEvent("ReadIteratorPool", id)
.detail("KnobRocksDBReadRangeReuseIterators", SERVER_KNOBS->ROCKSDB_READ_RANGE_REUSE_ITERATORS)
.detail("KnobRocksDBPrefixLen", SERVER_KNOBS->ROCKSDB_PREFIX_LEN);
}
@ -723,10 +723,10 @@ ACTOR Future<Void> refreshReadIteratorPool(std::shared_ptr<ReadIteratorPool> rea
return Void();
}
ACTOR Future<Void> flowLockLogger(const FlowLock* readLock, const FlowLock* fetchLock) {
ACTOR Future<Void> flowLockLogger(UID id, const FlowLock* readLock, const FlowLock* fetchLock) {
loop {
wait(delay(SERVER_KNOBS->ROCKSDB_METRICS_DELAY));
TraceEvent e("RocksDBFlowLock");
TraceEvent e("RocksDBFlowLock", id);
e.detail("ReadAvailable", readLock->available());
e.detail("ReadActivePermits", readLock->activePermits());
e.detail("ReadWaiters", readLock->waiters());
@ -736,7 +736,8 @@ ACTOR Future<Void> flowLockLogger(const FlowLock* readLock, const FlowLock* fetc
}
}
ACTOR Future<Void> rocksDBMetricLogger(std::shared_ptr<rocksdb::Statistics> statistics,
ACTOR Future<Void> rocksDBMetricLogger(UID id,
std::shared_ptr<rocksdb::Statistics> statistics,
std::shared_ptr<PerfContextMetrics> perfContextMetrics,
rocksdb::DB* db,
std::shared_ptr<ReadIteratorPool> readIterPool,
@ -813,7 +814,7 @@ ACTOR Future<Void> rocksDBMetricLogger(std::shared_ptr<rocksdb::Statistics> stat
loop {
wait(delay(SERVER_KNOBS->ROCKSDB_METRICS_DELAY));
TraceEvent e("RocksDBMetrics");
TraceEvent e("RocksDBMetrics", id);
uint64_t stat;
for (auto& t : tickerStats) {
auto& [name, ticker, cum] = t;
@ -846,11 +847,12 @@ ACTOR Future<Void> rocksDBMetricLogger(std::shared_ptr<rocksdb::Statistics> stat
}
}
void logRocksDBError(const rocksdb::Status& status,
void logRocksDBError(UID id,
const rocksdb::Status& status,
const std::string& method,
Optional<Severity> sev = Optional<Severity>()) {
Severity level = sev.present() ? sev.get() : (status.IsTimedOut() ? SevWarn : SevError);
TraceEvent e(level, "RocksDBError");
TraceEvent e(level, "RocksDBError", id);
e.detail("Error", status.ToString()).detail("Method", method).detail("RocksDBSeverity", status.severity());
if (status.IsIOError()) {
e.detail("SubCode", status.subcode());
@ -976,7 +978,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
status = rocksdb::DB::Open(options, a.path, descriptors, &handles, &db);
if (!status.ok()) {
logRocksDBError(status, "Open");
logRocksDBError(id, status, "Open");
a.done.sendError(statusToError(status));
return;
}
@ -991,12 +993,12 @@ struct RocksDBKeyValueStore : IKeyValueStore {
if (cf == nullptr) {
status = db->CreateColumnFamily(cfOptions, SERVER_KNOBS->DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY, &cf);
if (!status.ok()) {
logRocksDBError(status, "Open");
logRocksDBError(id, status, "Open");
a.done.sendError(statusToError(status));
}
}
TraceEvent(SevInfo, "RocksDB")
TraceEvent(SevInfo, "RocksDB", id)
.detail("Path", a.path)
.detail("Method", "Open")
.detail("KnobRocksDBWriteRateLimiterBytesPerSec",
@ -1008,13 +1010,13 @@ struct RocksDBKeyValueStore : IKeyValueStore {
// blockUntilReady() is getting the thread into deadlock state, so directly calling
// the metricsLogger.
a.metrics =
rocksDBMetricLogger(options.statistics, perfContextMetrics, db, readIterPool, &a.counters) &&
flowLockLogger(a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool);
rocksDBMetricLogger(id, options.statistics, perfContextMetrics, db, readIterPool, &a.counters) &&
flowLockLogger(id, a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool);
} else {
onMainThread([&] {
a.metrics =
rocksDBMetricLogger(options.statistics, perfContextMetrics, db, readIterPool, &a.counters) &&
flowLockLogger(a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool);
a.metrics = rocksDBMetricLogger(
id, options.statistics, perfContextMetrics, db, readIterPool, &a.counters) &&
flowLockLogger(id, a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool);
return Future<bool>(true);
}).blockUntilReady();
}
@ -1088,7 +1090,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
DeleteVisitor dv(deletes, deletes.arena());
rocksdb::Status s = a.batchToCommit->Iterate(&dv);
if (!s.ok()) {
logRocksDBError(s, "CommitDeleteVisitor");
logRocksDBError(id, s, "CommitDeleteVisitor");
a.done.sendError(statusToError(s));
return;
}
@ -1111,7 +1113,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
}
if (!s.ok()) {
logRocksDBError(s, "Commit");
logRocksDBError(id, s, "Commit");
a.done.sendError(statusToError(s));
} else {
a.done.send(Void());
@ -1154,7 +1156,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
}
auto s = db->Close();
if (!s.ok()) {
logRocksDBError(s, "Close");
logRocksDBError(id, s, "Close");
}
if (a.deleteOnClose) {
std::set<std::string> columnFamilies{ "default" };
@ -1165,12 +1167,12 @@ struct RocksDBKeyValueStore : IKeyValueStore {
}
s = rocksdb::DestroyDB(a.path, getOptions(), descriptors);
if (!s.ok()) {
logRocksDBError(s, "Destroy");
logRocksDBError(id, s, "Destroy");
} else {
TraceEvent("RocksDB").detail("Path", a.path).detail("Method", "Destroy");
TraceEvent("RocksDB", id).detail("Path", a.path).detail("Method", "Destroy");
}
}
TraceEvent("RocksDB").detail("Path", a.path).detail("Method", "Close");
TraceEvent("RocksDB", id).detail("Path", a.path).detail("Method", "Close");
a.done.send(Void());
}
@ -1180,6 +1182,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
};
struct Reader : IThreadPoolReceiver {
UID id;
DB& db;
CF& cf;
double readValueTimeout;
@ -1192,13 +1195,14 @@ struct RocksDBKeyValueStore : IKeyValueStore {
// ThreadReturnPromiseStream pair.first stores the histogram name and
// pair.second stores the corresponding measured latency (seconds)
explicit Reader(DB& db,
explicit Reader(UID id,
DB& db,
CF& cf,
std::shared_ptr<ReadIteratorPool> readIterPool,
std::shared_ptr<PerfContextMetrics> perfContextMetrics,
int threadIndex,
ThreadReturnPromiseStream<std::pair<std::string, double>>* metricPromiseStream)
: db(db), cf(cf), readIterPool(readIterPool), perfContextMetrics(perfContextMetrics),
: id(id), db(db), cf(cf), readIterPool(readIterPool), perfContextMetrics(perfContextMetrics),
metricPromiseStream(metricPromiseStream), threadIndex(threadIndex) {
if (g_network->isSimulated()) {
// In simulation, increasing the read operation timeouts to 5 minutes, as some of the tests have
@ -1252,7 +1256,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
traceBatch.get().addEvent("GetValueDebug", a.debugID.get().first(), "Reader.Before");
}
if (readBeginTime - a.startTime > readValueTimeout) {
TraceEvent(SevWarn, "KVSTimeout")
TraceEvent(SevWarn, "KVSTimeout", id)
.detail("Error", "Read value request timedout")
.detail("Method", "ReadValueAction")
.detail("TimeoutValue", readValueTimeout);
@ -1270,7 +1274,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
double dbGetBeginTime = a.getHistograms ? timer_monotonic() : 0;
auto s = db->Get(options, cf, toSlice(a.key), &value);
if (!s.ok() && !s.IsNotFound()) {
logRocksDBError(s, "ReadValue");
logRocksDBError(id, s, "ReadValue");
a.result.sendError(statusToError(s));
return;
}
@ -1289,7 +1293,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
} else if (s.IsNotFound()) {
a.result.send(Optional<Value>());
} else {
logRocksDBError(s, "ReadValue");
logRocksDBError(id, s, "ReadValue");
a.result.sendError(statusToError(s));
}
@ -1339,7 +1343,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
"Reader.Before"); //.detail("TaskID", g_network->getCurrentTask());
}
if (readBeginTime - a.startTime > readValuePrefixTimeout) {
TraceEvent(SevWarn, "KVSTimeout")
TraceEvent(SevWarn, "KVSTimeout", id)
.detail("Error", "Read value prefix request timedout")
.detail("Method", "ReadValuePrefixAction")
.detail("TimeoutValue", readValuePrefixTimeout);
@ -1373,7 +1377,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
} else if (s.IsNotFound()) {
a.result.send(Optional<Value>());
} else {
logRocksDBError(s, "ReadValuePrefix");
logRocksDBError(id, s, "ReadValuePrefix");
a.result.sendError(statusToError(s));
}
if (a.getHistograms) {
@ -1414,7 +1418,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
std::make_pair(ROCKSDB_READRANGE_QUEUEWAIT_HISTOGRAM.toString(), readBeginTime - a.startTime));
}
if (readBeginTime - a.startTime > readRangeTimeout) {
TraceEvent(SevWarn, "KVSTimeout")
TraceEvent(SevWarn, "KVSTimeout", id)
.detail("Error", "Read range request timedout")
.detail("Method", "ReadRangeAction")
.detail("TimeoutValue", readRangeTimeout);
@ -1446,7 +1450,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
break;
}
if (timer_monotonic() - a.startTime > readRangeTimeout) {
TraceEvent(SevWarn, "KVSTimeout")
TraceEvent(SevWarn, "KVSTimeout", id)
.detail("Error", "Read range request timedout")
.detail("Method", "ReadRangeAction")
.detail("TimeoutValue", readRangeTimeout);
@ -1478,7 +1482,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
break;
}
if (timer_monotonic() - a.startTime > readRangeTimeout) {
TraceEvent(SevWarn, "KVSTimeout")
TraceEvent(SevWarn, "KVSTimeout", id)
.detail("Error", "Read range request timedout")
.detail("Method", "ReadRangeAction")
.detail("TimeoutValue", readRangeTimeout);
@ -1492,7 +1496,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
}
if (!s.ok()) {
logRocksDBError(s, "ReadRange");
logRocksDBError(id, s, "ReadRange");
a.result.sendError(statusToError(s));
return;
}
@ -1543,12 +1547,12 @@ struct RocksDBKeyValueStore : IKeyValueStore {
explicit RocksDBKeyValueStore(const std::string& path, UID id)
: path(path), id(id), perfContextMetrics(new PerfContextMetrics()),
readIterPool(new ReadIteratorPool(db, defaultFdbCF, path)),
readIterPool(new ReadIteratorPool(id, db, defaultFdbCF)),
readSemaphore(SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
fetchSemaphore(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX),
numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
numFetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX),
errorListener(std::make_shared<RocksDBErrorListener>()), errorFuture(errorListener->getFuture()) {
errorListener(std::make_shared<RocksDBErrorListener>(id)), errorFuture(errorListener->getFuture()) {
// In simluation, run the reader/writer threads as Coro threads (i.e. in the network thread. The storage engine
// is still multi-threaded as background compaction threads are still present. Reads/writes to disk will also
// block the network thread in a way that would be unacceptable in production but is a necessary evil here. When
@ -1588,10 +1592,12 @@ struct RocksDBKeyValueStore : IKeyValueStore {
? metricPromiseStreams[SERVER_KNOBS->ROCKSDB_READ_PARALLELISM].get()
: nullptr),
"fdb-rocksdb-wr");
TraceEvent("RocksDBReadThreads").detail("KnobRocksDBReadParallelism", SERVER_KNOBS->ROCKSDB_READ_PARALLELISM);
TraceEvent("RocksDBReadThreads", id)
.detail("KnobRocksDBReadParallelism", SERVER_KNOBS->ROCKSDB_READ_PARALLELISM);
for (unsigned i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; ++i) {
readThreads->addThread(
new Reader(db,
new Reader(id,
db,
defaultFdbCF,
readIterPool,
perfContextMetrics,
@ -1956,7 +1962,7 @@ void RocksDBKeyValueStore::Writer::action(CheckpointAction& a) {
rocksdb::Checkpoint* checkpoint;
rocksdb::Status s = rocksdb::Checkpoint::Create(db, &checkpoint);
if (!s.ok()) {
logRocksDBError(s, "Checkpoint");
logRocksDBError(id, s, "Checkpoint");
a.reply.sendError(statusToError(s));
return;
}
@ -1966,7 +1972,7 @@ void RocksDBKeyValueStore::Writer::action(CheckpointAction& a) {
s = db->Get(readOptions, cf, toSlice(persistVersion), &value);
if (!s.ok() && !s.IsNotFound()) {
logRocksDBError(s, "Checkpoint");
logRocksDBError(id, s, "Checkpoint");
a.reply.sendError(statusToError(s));
return;
}
@ -1988,7 +1994,7 @@ void RocksDBKeyValueStore::Writer::action(CheckpointAction& a) {
platform::eraseDirectoryRecursive(checkpointDir);
s = checkpoint->ExportColumnFamily(cf, checkpointDir, &pMetadata);
if (!s.ok()) {
logRocksDBError(s, "ExportColumnFamily");
logRocksDBError(id, s, "ExportColumnFamily");
a.reply.sendError(statusToError(s));
return;
}
@ -2003,7 +2009,7 @@ void RocksDBKeyValueStore::Writer::action(CheckpointAction& a) {
uint64_t debugCheckpointSeq = -1;
s = checkpoint->CreateCheckpoint(checkpointDir, /*log_size_for_flush=*/0, &debugCheckpointSeq);
if (!s.ok()) {
logRocksDBError(s, "Checkpoint");
logRocksDBError(id, s, "Checkpoint");
a.reply.sendError(statusToError(s));
return;
}
@ -2056,10 +2062,10 @@ void RocksDBKeyValueStore::Writer::action(RestoreAction& a) {
getCFOptions(), SERVER_KNOBS->DEFAULT_FDB_ROCKSDB_COLUMN_FAMILY, importOptions, metaData, &cf);
if (!status.ok()) {
logRocksDBError(status, "Restore");
logRocksDBError(id, status, "Restore");
a.done.sendError(statusToError(status));
} else {
TraceEvent(SevInfo, "RocksDBRestoreCFSuccess")
TraceEvent(SevInfo, "RocksDBRestoreCFSuccess", id)
.detail("Path", a.path)
.detail("Checkpoint", a.checkpoints[0].toString());
a.done.send(Void());
@ -2071,7 +2077,7 @@ void RocksDBKeyValueStore::Writer::action(RestoreAction& a) {
.detail("Path", a.path)
.detail("Checkpoint", describe(a.checkpoints));
if (!status.ok()) {
logRocksDBError(status, "CreateColumnFamily");
logRocksDBError(id, status, "CreateColumnFamily");
a.done.sendError(statusToError(status));
return;
}
@ -2095,7 +2101,7 @@ void RocksDBKeyValueStore::Writer::action(RestoreAction& a) {
ingestOptions.verify_checksums_before_ingest = true;
status = db->IngestExternalFile(cf, sstFiles, ingestOptions);
if (!status.ok()) {
logRocksDBError(status, "IngestExternalFile", SevWarnAlways);
logRocksDBError(id, status, "IngestExternalFile", SevWarnAlways);
a.done.sendError(statusToError(status));
return;
}
@ -2123,7 +2129,7 @@ IKeyValueStore* keyValueStoreRocksDB(std::string const& path,
#ifdef SSD_ROCKSDB_EXPERIMENTAL
return new RocksDBKeyValueStore(path, logID);
#else
TraceEvent(SevError, "RocksDBEngineInitFailure").detail("Reason", "Built without RocksDB");
TraceEvent(SevError, "RocksDBEngineInitFailure", logID).detail("Reason", "Built without RocksDB");
ASSERT(false);
return nullptr;
#endif // SSD_ROCKSDB_EXPERIMENTAL

Some files were not shown because too many files have changed in this diff Show More