Merge branch 'master' of https://github.com/apple/foundationdb into debugging-cleanup
# Conflicts: # fdbbackup/FileDecoder.actor.cpp
This commit is contained in:
commit
e5e8a56b66
|
@ -8,7 +8,7 @@ bindings/java/foundationdb-client*.jar
|
|||
bindings/java/foundationdb-tests*.jar
|
||||
bindings/java/fdb-java-*-sources.jar
|
||||
packaging/msi/FDBInstaller.msi
|
||||
builds/
|
||||
build/
|
||||
cmake-build-debug/
|
||||
# Generated source, build, and packaging files
|
||||
*.g.cpp
|
||||
|
|
|
@ -171,7 +171,7 @@ add_subdirectory(fdbbackup)
|
|||
add_subdirectory(contrib)
|
||||
add_subdirectory(tests)
|
||||
add_subdirectory(flowbench EXCLUDE_FROM_ALL)
|
||||
if(WITH_PYTHON)
|
||||
if(WITH_PYTHON AND WITH_C_BINDING)
|
||||
add_subdirectory(bindings)
|
||||
endif()
|
||||
if(WITH_DOCUMENTATION)
|
||||
|
|
|
@ -3,14 +3,16 @@ if(NOT OPEN_FOR_IDE)
|
|||
add_subdirectory(c)
|
||||
add_subdirectory(flow)
|
||||
endif()
|
||||
add_subdirectory(python)
|
||||
if(WITH_JAVA)
|
||||
if(WITH_PYTHON_BINDING)
|
||||
add_subdirectory(python)
|
||||
endif()
|
||||
if(WITH_JAVA_BINDING)
|
||||
add_subdirectory(java)
|
||||
endif()
|
||||
if(WITH_GO AND NOT OPEN_FOR_IDE)
|
||||
if(WITH_GO_BINDING AND NOT OPEN_FOR_IDE)
|
||||
add_subdirectory(go)
|
||||
endif()
|
||||
if(WITH_RUBY)
|
||||
if(WITH_RUBY_BINDING)
|
||||
add_subdirectory(ruby)
|
||||
endif()
|
||||
if(NOT WIN32 AND NOT OPEN_FOR_IDE)
|
||||
|
|
|
@ -79,6 +79,7 @@ if(NOT WIN32)
|
|||
test/unit/fdb_api.hpp)
|
||||
|
||||
set(UNIT_TEST_VERSION_510_SRCS test/unit/unit_tests_version_510.cpp)
|
||||
set(TRACE_PARTIAL_FILE_SUFFIX_TEST_SRCS test/unit/trace_partial_file_suffix_test.cpp)
|
||||
|
||||
if(OPEN_FOR_IDE)
|
||||
add_library(fdb_c_performance_test OBJECT test/performance_test.c test/test.h)
|
||||
|
@ -88,6 +89,7 @@ if(NOT WIN32)
|
|||
add_library(fdb_c_setup_tests OBJECT test/unit/setup_tests.cpp)
|
||||
add_library(fdb_c_unit_tests OBJECT ${UNIT_TEST_SRCS})
|
||||
add_library(fdb_c_unit_tests_version_510 OBJECT ${UNIT_TEST_VERSION_510_SRCS})
|
||||
add_library(trace_partial_file_suffix_test OBJECT ${TRACE_PARTIAL_FILE_SUFFIX_TEST_SRCS})
|
||||
else()
|
||||
add_executable(fdb_c_performance_test test/performance_test.c test/test.h)
|
||||
add_executable(fdb_c_ryw_benchmark test/ryw_benchmark.c test/test.h)
|
||||
|
@ -96,6 +98,7 @@ if(NOT WIN32)
|
|||
add_executable(fdb_c_setup_tests test/unit/setup_tests.cpp)
|
||||
add_executable(fdb_c_unit_tests ${UNIT_TEST_SRCS})
|
||||
add_executable(fdb_c_unit_tests_version_510 ${UNIT_TEST_VERSION_510_SRCS})
|
||||
add_executable(trace_partial_file_suffix_test ${TRACE_PARTIAL_FILE_SUFFIX_TEST_SRCS})
|
||||
strip_debug_symbols(fdb_c_performance_test)
|
||||
strip_debug_symbols(fdb_c_ryw_benchmark)
|
||||
strip_debug_symbols(fdb_c_txn_size_test)
|
||||
|
@ -106,12 +109,14 @@ if(NOT WIN32)
|
|||
|
||||
add_dependencies(fdb_c_setup_tests doctest)
|
||||
add_dependencies(fdb_c_unit_tests doctest)
|
||||
add_dependencies(fdb_c_unit_tests_version_510 doctest)
|
||||
target_include_directories(fdb_c_setup_tests PUBLIC ${DOCTEST_INCLUDE_DIR})
|
||||
target_include_directories(fdb_c_unit_tests PUBLIC ${DOCTEST_INCLUDE_DIR})
|
||||
target_include_directories(fdb_c_unit_tests_version_510 PUBLIC ${DOCTEST_INCLUDE_DIR})
|
||||
target_link_libraries(fdb_c_setup_tests PRIVATE fdb_c Threads::Threads)
|
||||
target_link_libraries(fdb_c_unit_tests PRIVATE fdb_c Threads::Threads)
|
||||
target_link_libraries(fdb_c_unit_tests_version_510 PRIVATE fdb_c Threads::Threads)
|
||||
target_link_libraries(trace_partial_file_suffix_test PRIVATE fdb_c Threads::Threads)
|
||||
|
||||
# do not set RPATH for mako
|
||||
set_property(TARGET mako PROPERTY SKIP_BUILD_RPATH TRUE)
|
||||
|
@ -146,6 +151,11 @@ if(NOT WIN32)
|
|||
COMMAND $<TARGET_FILE:fdb_c_unit_tests_version_510>
|
||||
@CLUSTER_FILE@
|
||||
fdb)
|
||||
add_fdbclient_test(
|
||||
NAME trace_partial_file_suffix_test
|
||||
COMMAND $<TARGET_FILE:trace_partial_file_suffix_test>
|
||||
@CLUSTER_FILE@
|
||||
fdb)
|
||||
add_fdbclient_test(
|
||||
NAME fdb_c_external_client_unit_tests
|
||||
COMMAND $<TARGET_FILE:fdb_c_unit_tests>
|
||||
|
|
|
@ -0,0 +1,111 @@
|
|||
/*
|
||||
* trace_partial_file_suffix_test.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <fstream>
|
||||
#include <iostream>
|
||||
#include <random>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
#include "flow/Platform.h"
|
||||
|
||||
#define FDB_API_VERSION 710
|
||||
#include "foundationdb/fdb_c.h"
|
||||
|
||||
#undef NDEBUG
|
||||
#include <cassert>
|
||||
|
||||
void fdb_check(fdb_error_t e) {
|
||||
if (e) {
|
||||
std::cerr << fdb_get_error(e) << std::endl;
|
||||
std::abort();
|
||||
}
|
||||
}
|
||||
|
||||
void set_net_opt(FDBNetworkOption option, const std::string& value) {
|
||||
fdb_check(fdb_network_set_option(option, reinterpret_cast<const uint8_t*>(value.c_str()), value.size()));
|
||||
}
|
||||
|
||||
bool file_exists(const char* path) {
|
||||
FILE* f = fopen(path, "r");
|
||||
if (f) {
|
||||
fclose(f);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
fdb_check(fdb_select_api_version(710));
|
||||
|
||||
std::string file_identifier = "trace_partial_file_suffix_test" + std::to_string(std::random_device{}());
|
||||
std::string trace_partial_file_suffix = ".tmp";
|
||||
std::string simulated_stray_partial_file =
|
||||
"trace.127.0.0.1." + file_identifier + ".simulated.xml" + trace_partial_file_suffix;
|
||||
|
||||
// Simulate this process crashing previously by creating a ".tmp" file
|
||||
{ std::ofstream file{ simulated_stray_partial_file }; }
|
||||
|
||||
set_net_opt(FDBNetworkOption::FDB_NET_OPTION_TRACE_ENABLE, "");
|
||||
set_net_opt(FDBNetworkOption::FDB_NET_OPTION_TRACE_FILE_IDENTIFIER, file_identifier);
|
||||
set_net_opt(FDBNetworkOption::FDB_NET_OPTION_TRACE_PARTIAL_FILE_SUFFIX, trace_partial_file_suffix);
|
||||
|
||||
fdb_check(fdb_setup_network());
|
||||
std::thread network_thread{ &fdb_run_network };
|
||||
|
||||
// Apparently you need to open a database to initialize logging
|
||||
FDBDatabase* out;
|
||||
fdb_check(fdb_create_database(nullptr, &out));
|
||||
fdb_database_destroy(out);
|
||||
|
||||
// Eventually there's a new trace file for this test ending in .tmp
|
||||
std::string name;
|
||||
for (;;) {
|
||||
for (const auto& path : platform::listFiles(".")) {
|
||||
if (path.find(file_identifier) != std::string::npos && path.find(".simulated.") == std::string::npos) {
|
||||
assert(path.substr(path.size() - trace_partial_file_suffix.size()) == trace_partial_file_suffix);
|
||||
name = path;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!name.empty()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
fdb_check(fdb_stop_network());
|
||||
network_thread.join();
|
||||
|
||||
// After shutting down, the suffix is removed for both the simulated stray file and our new file
|
||||
if (!trace_partial_file_suffix.empty()) {
|
||||
assert(!file_exists(name.c_str()));
|
||||
assert(!file_exists(simulated_stray_partial_file.c_str()));
|
||||
}
|
||||
|
||||
auto new_name = name.substr(0, name.size() - trace_partial_file_suffix.size());
|
||||
auto new_stray_name =
|
||||
simulated_stray_partial_file.substr(0, simulated_stray_partial_file.size() - trace_partial_file_suffix.size());
|
||||
assert(file_exists(new_name.c_str()));
|
||||
assert(file_exists(new_stray_name.c_str()));
|
||||
remove(new_name.c_str());
|
||||
remove(new_stray_name.c_str());
|
||||
assert(!file_exists(new_name.c_str()));
|
||||
assert(!file_exists(new_stray_name.c_str()));
|
||||
}
|
|
@ -332,9 +332,10 @@ def transaction(logger):
|
|||
output7 = run_fdbcli_command('get', 'key')
|
||||
assert output7 == "`key': not found"
|
||||
|
||||
def get_fdb_process_addresses():
|
||||
def get_fdb_process_addresses(logger):
|
||||
# get all processes' network addresses
|
||||
output = run_fdbcli_command('kill')
|
||||
logger.debug(output)
|
||||
# except the first line, each line is one process
|
||||
addresses = output.split('\n')[1:]
|
||||
assert len(addresses) == process_number
|
||||
|
@ -354,7 +355,7 @@ def coordinators(logger):
|
|||
assert coordinator_list[0]['address'] == coordinators
|
||||
# verify the cluster description
|
||||
assert get_value_from_status_json(True, 'cluster', 'connection_string').startswith('{}:'.format(cluster_description))
|
||||
addresses = get_fdb_process_addresses()
|
||||
addresses = get_fdb_process_addresses(logger)
|
||||
# set all 5 processes as coordinators and update the cluster description
|
||||
new_cluster_description = 'a_simple_description'
|
||||
run_fdbcli_command('coordinators', *addresses, 'description={}'.format(new_cluster_description))
|
||||
|
@ -369,7 +370,7 @@ def coordinators(logger):
|
|||
@enable_logging()
|
||||
def exclude(logger):
|
||||
# get all processes' network addresses
|
||||
addresses = get_fdb_process_addresses()
|
||||
addresses = get_fdb_process_addresses(logger)
|
||||
logger.debug("Cluster processes: {}".format(' '.join(addresses)))
|
||||
# There should be no excluded process for now
|
||||
no_excluded_process_output = 'There are currently no servers or localities excluded from the database.'
|
||||
|
@ -377,16 +378,28 @@ def exclude(logger):
|
|||
assert no_excluded_process_output in output1
|
||||
# randomly pick one and exclude the process
|
||||
excluded_address = random.choice(addresses)
|
||||
# If we see "not enough space" error, use FORCE option to proceed
|
||||
# this should be a safe operation as we do not need any storage space for the test
|
||||
force = False
|
||||
# sometimes we need to retry the exclude
|
||||
while True:
|
||||
logger.debug("Excluding process: {}".format(excluded_address))
|
||||
error_message = run_fdbcli_command_and_get_error('exclude', excluded_address)
|
||||
if force:
|
||||
error_message = run_fdbcli_command_and_get_error('exclude', 'FORCE', excluded_address)
|
||||
else:
|
||||
error_message = run_fdbcli_command_and_get_error('exclude', excluded_address)
|
||||
if error_message == 'WARNING: {} is a coordinator!'.format(excluded_address):
|
||||
# exclude coordinator will print the warning, verify the randomly selected process is the coordinator
|
||||
coordinator_list = get_value_from_status_json(True, 'client', 'coordinators', 'coordinators')
|
||||
assert len(coordinator_list) == 1
|
||||
assert coordinator_list[0]['address'] == excluded_address
|
||||
break
|
||||
elif 'ERROR: This exclude may cause the total free space in the cluster to drop below 10%.' in error_message:
|
||||
# exclude the process may cause the free space not enough
|
||||
# use FORCE option to ignore it and proceed
|
||||
assert not force
|
||||
force = True
|
||||
logger.debug("Use FORCE option to exclude the process")
|
||||
elif not error_message:
|
||||
break
|
||||
else:
|
||||
|
@ -436,7 +449,8 @@ if __name__ == '__main__':
|
|||
# assertions will fail if fdbcli does not work as expected
|
||||
process_number = int(sys.argv[3])
|
||||
if process_number == 1:
|
||||
advanceversion()
|
||||
# TODO: disable for now, the change can cause the database unavailable
|
||||
#advanceversion()
|
||||
cache_range()
|
||||
consistencycheck()
|
||||
datadistribution()
|
||||
|
@ -449,10 +463,7 @@ if __name__ == '__main__':
|
|||
throttle()
|
||||
else:
|
||||
assert process_number > 1, "Process number should be positive"
|
||||
# the kill command which used to list processes seems to not work as expected sometime
|
||||
# which makes the test flaky.
|
||||
# We need to figure out the reason and then re-enable these tests
|
||||
#coordinators()
|
||||
#exclude()
|
||||
coordinators()
|
||||
exclude()
|
||||
|
||||
|
||||
|
|
|
@ -352,7 +352,7 @@ function(package_bindingtester)
|
|||
COMMENT "Copy Flow tester for bindingtester")
|
||||
|
||||
set(generated_binding_files python/fdb/fdboptions.py)
|
||||
if(WITH_JAVA)
|
||||
if(WITH_JAVA_BINDING)
|
||||
if(NOT FDB_RELEASE)
|
||||
set(prerelease_string "-PRERELEASE")
|
||||
else()
|
||||
|
@ -369,7 +369,7 @@ function(package_bindingtester)
|
|||
set(generated_binding_files ${generated_binding_files} java/foundationdb-tests.jar)
|
||||
endif()
|
||||
|
||||
if(WITH_GO AND NOT OPEN_FOR_IDE)
|
||||
if(WITH_GO_BINDING AND NOT OPEN_FOR_IDE)
|
||||
add_dependencies(copy_binding_output_files fdb_go_tester fdb_go)
|
||||
add_custom_command(
|
||||
TARGET copy_binding_output_files
|
||||
|
|
|
@ -1,61 +1,73 @@
|
|||
function(compile_boost)
|
||||
|
||||
# Initialize function incoming parameters
|
||||
set(options)
|
||||
set(oneValueArgs TARGET)
|
||||
set(multiValueArgs BUILD_ARGS CXXFLAGS LDFLAGS)
|
||||
cmake_parse_arguments(MY "${options}" "${oneValueArgs}"
|
||||
cmake_parse_arguments(COMPILE_BOOST "${options}" "${oneValueArgs}"
|
||||
"${multiValueArgs}" ${ARGN} )
|
||||
# Configure the boost toolset to use
|
||||
set(BOOTSTRAP_ARGS "--with-libraries=context")
|
||||
set(B2_COMMAND "./b2")
|
||||
set(BOOST_COMPILER_FLAGS -fvisibility=hidden -fPIC -std=c++14 -w)
|
||||
|
||||
# Configure bootstrap command
|
||||
set(BOOTSTRAP_COMMAND "./bootstrap.sh")
|
||||
set(BOOTSTRAP_LIBRARIES "context")
|
||||
|
||||
set(BOOST_CXX_COMPILER "${CMAKE_CXX_COMPILER}")
|
||||
if(APPLE)
|
||||
set(BOOST_TOOLSET "clang-darwin")
|
||||
# this is to fix a weird macOS issue -- by default
|
||||
# cmake would otherwise pass a compiler that can't
|
||||
# compile boost
|
||||
set(BOOST_CXX_COMPILER "/usr/bin/clang++")
|
||||
elseif(CLANG)
|
||||
if(CLANG)
|
||||
set(BOOST_TOOLSET "clang")
|
||||
list(APPEND BOOTSTRAP_ARGS "${BOOTSTRAP_COMMAND} --with-toolset=clang")
|
||||
if(APPLE)
|
||||
# this is to fix a weird macOS issue -- by default
|
||||
# cmake would otherwise pass a compiler that can't
|
||||
# compile boost
|
||||
set(BOOST_CXX_COMPILER "/usr/bin/clang++")
|
||||
endif()
|
||||
else()
|
||||
set(BOOST_TOOLSET "gcc")
|
||||
endif()
|
||||
if(APPLE OR USE_LIBCXX)
|
||||
list(APPEND BOOST_COMPILER_FLAGS -stdlib=libc++)
|
||||
endif()
|
||||
set(BOOST_ADDITIONAL_COMPILE_OPTIOINS "")
|
||||
foreach(flag IN LISTS BOOST_COMPILER_FLAGS MY_CXXFLAGS)
|
||||
string(APPEND BOOST_ADDITIONAL_COMPILE_OPTIOINS "<cxxflags>${flag} ")
|
||||
endforeach()
|
||||
foreach(flag IN LISTS MY_LDFLAGS)
|
||||
string(APPEND BOOST_ADDITIONAL_COMPILE_OPTIOINS "<linkflags>${flag} ")
|
||||
endforeach()
|
||||
configure_file(${CMAKE_SOURCE_DIR}/cmake/user-config.jam.cmake ${CMAKE_BINARY_DIR}/user-config.jam)
|
||||
message(STATUS "Use ${BOOST_TOOLSET} to build boost")
|
||||
|
||||
# Configure b2 command
|
||||
set(B2_COMMAND "./b2")
|
||||
set(BOOST_COMPILER_FLAGS -fvisibility=hidden -fPIC -std=c++17 -w)
|
||||
set(BOOST_LINK_FLAGS "")
|
||||
if(APPLE OR CLANG OR USE_LIBCXX)
|
||||
list(APPEND BOOST_COMPILER_FLAGS -stdlib=libc++ -nostdlib++)
|
||||
list(APPEND BOOST_LINK_FLAGS -static-libgcc -lc++ -lc++abi)
|
||||
endif()
|
||||
|
||||
# Update the user-config.jam
|
||||
set(BOOST_ADDITIONAL_COMPILE_OPTIOINS "")
|
||||
foreach(flag IN LISTS BOOST_COMPILER_FLAGS COMPILE_BOOST_CXXFLAGS)
|
||||
string(APPEND BOOST_ADDITIONAL_COMPILE_OPTIONS "<cxxflags>${flag} ")
|
||||
endforeach()
|
||||
#foreach(flag IN LISTS BOOST_LINK_FLAGS COMPILE_BOOST_LDFLAGS)
|
||||
# string(APPEND BOOST_ADDITIONAL_COMPILE_OPTIONS "<linkflags>${flag} ")
|
||||
#endforeach()
|
||||
configure_file(${CMAKE_SOURCE_DIR}/cmake/user-config.jam.cmake ${CMAKE_BINARY_DIR}/user-config.jam)
|
||||
set(USER_CONFIG_FLAG --user-config=${CMAKE_BINARY_DIR}/user-config.jam)
|
||||
|
||||
# Build boost
|
||||
include(ExternalProject)
|
||||
set(BOOST_INSTALL_DIR "${CMAKE_BINARY_DIR}/boost_install")
|
||||
ExternalProject_add("${MY_TARGET}Project"
|
||||
ExternalProject_add("${COMPILE_BOOST_TARGET}Project"
|
||||
URL "https://boostorg.jfrog.io/artifactory/main/release/1.72.0/source/boost_1_72_0.tar.bz2"
|
||||
URL_HASH SHA256=59c9b274bc451cf91a9ba1dd2c7fdcaf5d60b1b3aa83f2c9fa143417cc660722
|
||||
CONFIGURE_COMMAND ./bootstrap.sh ${BOOTSTRAP_ARGS}
|
||||
BUILD_COMMAND ${B2_COMMAND} link=static ${MY_BUILD_ARGS} --prefix=${BOOST_INSTALL_DIR} ${USER_CONFIG_FLAG} install
|
||||
CONFIGURE_COMMAND ${BOOTSTRAP_COMMAND} ${BOOTSTRAP_ARGS} --with-libraries=${BOOTSTRAP_LIBRARIES} --with-toolset=${BOOST_TOOLSET}
|
||||
BUILD_COMMAND ${B2_COMMAND} link=static ${COMPILE_BOOST_BUILD_ARGS} --prefix=${BOOST_INSTALL_DIR} ${USER_CONFIG_FLAG} install
|
||||
BUILD_IN_SOURCE ON
|
||||
INSTALL_COMMAND ""
|
||||
UPDATE_COMMAND ""
|
||||
BUILD_BYPRODUCTS "${BOOST_INSTALL_DIR}/boost/config.hpp"
|
||||
"${BOOST_INSTALL_DIR}/lib/libboost_context.a")
|
||||
|
||||
add_library(${MY_TARGET}_context STATIC IMPORTED)
|
||||
add_dependencies(${MY_TARGET}_context ${MY_TARGET}Project)
|
||||
set_target_properties(${MY_TARGET}_context PROPERTIES IMPORTED_LOCATION "${BOOST_INSTALL_DIR}/lib/libboost_context.a")
|
||||
add_library(${COMPILE_BOOST_TARGET}_context STATIC IMPORTED)
|
||||
add_dependencies(${COMPILE_BOOST_TARGET}_context ${COMPILE_BOOST_TARGET}Project)
|
||||
set_target_properties(${COMPILE_BOOST_TARGET}_context PROPERTIES IMPORTED_LOCATION "${BOOST_INSTALL_DIR}/lib/libboost_context.a")
|
||||
|
||||
add_library(${MY_TARGET} INTERFACE)
|
||||
target_include_directories(${MY_TARGET} SYSTEM INTERFACE ${BOOST_INSTALL_DIR}/include)
|
||||
target_link_libraries(${MY_TARGET} INTERFACE ${MY_TARGET}_context)
|
||||
endfunction()
|
||||
add_library(${COMPILE_BOOST_TARGET} INTERFACE)
|
||||
target_include_directories(${COMPILE_BOOST_TARGET} SYSTEM INTERFACE ${BOOST_INSTALL_DIR}/include)
|
||||
target_link_libraries(${COMPILE_BOOST_TARGET} INTERFACE ${COMPILE_BOOST_TARGET}_context)
|
||||
|
||||
endfunction(compile_boost)
|
||||
|
||||
if(USE_SANITIZER)
|
||||
if(WIN32)
|
||||
|
@ -72,10 +84,20 @@ if(USE_SANITIZER)
|
|||
return()
|
||||
endif()
|
||||
|
||||
list(APPEND CMAKE_PREFIX_PATH /opt/boost_1_72_0)
|
||||
# since boost 1.72 boost installs cmake configs. We will enforce config mode
|
||||
set(Boost_USE_STATIC_LIBS ON)
|
||||
set(BOOST_HINT_PATHS /opt/boost_1_72_0)
|
||||
|
||||
# Clang and Gcc will have different name mangling to std::call_once, etc.
|
||||
if (UNIX AND CMAKE_CXX_COMPILER_ID MATCHES "Clang$")
|
||||
list(APPEND CMAKE_PREFIX_PATH /opt/boost_1_72_0_clang)
|
||||
set(BOOST_HINT_PATHS /opt/boost_1_72_0_clang)
|
||||
message(STATUS "Using Clang version of boost::context")
|
||||
else ()
|
||||
list(APPEND CMAKE_PREFIX_PATH /opt/boost_1_72_0)
|
||||
set(BOOST_HINT_PATHS /opt/boost_1_72_0)
|
||||
message(STATUS "Using g++ version of boost::context")
|
||||
endif ()
|
||||
|
||||
if(BOOST_ROOT)
|
||||
list(APPEND BOOST_HINT_PATHS ${BOOST_ROOT})
|
||||
endif()
|
||||
|
|
|
@ -261,10 +261,6 @@ else()
|
|||
|
||||
if (CLANG)
|
||||
add_compile_options()
|
||||
# Clang has link errors unless `atomic` is specifically requested.
|
||||
if(NOT APPLE)
|
||||
#add_link_options(-latomic)
|
||||
endif()
|
||||
if (APPLE OR USE_LIBCXX)
|
||||
add_compile_options($<$<COMPILE_LANGUAGE:CXX>:-stdlib=libc++>)
|
||||
if (NOT APPLE)
|
||||
|
|
|
@ -47,22 +47,6 @@ else()
|
|||
endif()
|
||||
endif()
|
||||
|
||||
################################################################################
|
||||
# Java Bindings
|
||||
################################################################################
|
||||
|
||||
set(WITH_JAVA OFF)
|
||||
find_package(JNI 1.8)
|
||||
find_package(Java 1.8 COMPONENTS Development)
|
||||
# leave FreeBSD JVM compat for later
|
||||
if(JNI_FOUND AND Java_FOUND AND Java_Development_FOUND AND NOT (CMAKE_SYSTEM_NAME STREQUAL "FreeBSD"))
|
||||
set(WITH_JAVA ON)
|
||||
include(UseJava)
|
||||
enable_language(Java)
|
||||
else()
|
||||
set(WITH_JAVA OFF)
|
||||
endif()
|
||||
|
||||
################################################################################
|
||||
# Python Bindings
|
||||
################################################################################
|
||||
|
@ -75,12 +59,57 @@ else()
|
|||
set(WITH_PYTHON OFF)
|
||||
endif()
|
||||
|
||||
option(BUILD_PYTHON_BINDING "build python binding" ON)
|
||||
if(NOT BUILD_PYTHON_BINDING OR NOT WITH_PYTHON)
|
||||
set(WITH_PYTHON_BINDING OFF)
|
||||
else()
|
||||
if(WITH_PYTHON)
|
||||
set(WITH_PYTHON_BINDING ON)
|
||||
else()
|
||||
#message(FATAL_ERROR "Could not found a suitable python interpreter")
|
||||
set(WITH_PYTHON_BINDING OFF)
|
||||
endif()
|
||||
endif()
|
||||
|
||||
################################################################################
|
||||
# C Bindings
|
||||
################################################################################
|
||||
|
||||
option(BUILD_C_BINDING "build C binding" ON)
|
||||
if(BUILD_C_BINDING AND WITH_PYTHON)
|
||||
set(WITH_C_BINDING ON)
|
||||
else()
|
||||
set(WITH_C_BINDING OFF)
|
||||
endif()
|
||||
|
||||
################################################################################
|
||||
# Java Bindings
|
||||
################################################################################
|
||||
|
||||
option(BUILD_JAVA_BINDING "build java binding" ON)
|
||||
if(NOT BUILD_JAVA_BINDING OR NOT WITH_C_BINDING)
|
||||
set(WITH_JAVA_BINDING OFF)
|
||||
else()
|
||||
set(WITH_JAVA_BINDING OFF)
|
||||
find_package(JNI 1.8)
|
||||
find_package(Java 1.8 COMPONENTS Development)
|
||||
# leave FreeBSD JVM compat for later
|
||||
if(JNI_FOUND AND Java_FOUND AND Java_Development_FOUND AND NOT (CMAKE_SYSTEM_NAME STREQUAL "FreeBSD") AND WITH_C_BINDING)
|
||||
set(WITH_JAVA_BINDING ON)
|
||||
include(UseJava)
|
||||
enable_language(Java)
|
||||
else()
|
||||
set(WITH_JAVA_BINDING OFF)
|
||||
endif()
|
||||
endif()
|
||||
|
||||
################################################################################
|
||||
# Pip
|
||||
################################################################################
|
||||
|
||||
option(BUILD_DOCUMENTATION "build documentation" ON)
|
||||
find_package(Python3 COMPONENTS Interpreter)
|
||||
if (Python3_Interpreter_FOUND)
|
||||
if (WITH_PYTHON AND Python3_Interpreter_FOUND AND BUILD_DOCUMENTATION)
|
||||
set(WITH_DOCUMENTATION ON)
|
||||
else()
|
||||
set(WITH_DOCUMENTATION OFF)
|
||||
|
@ -90,27 +119,37 @@ endif()
|
|||
# GO
|
||||
################################################################################
|
||||
|
||||
find_program(GO_EXECUTABLE go)
|
||||
# building the go binaries is currently not supported on Windows
|
||||
if(GO_EXECUTABLE AND NOT WIN32)
|
||||
set(WITH_GO ON)
|
||||
option(BUILD_GO_BINDING "build go binding" ON)
|
||||
if(NOT BUILD_GO_BINDING OR NOT BUILD_C_BINDING)
|
||||
set(WITH_GO_BINDING OFF)
|
||||
else()
|
||||
set(WITH_GO OFF)
|
||||
endif()
|
||||
if (USE_SANITIZER)
|
||||
# Disable building go for sanitizers, since _stacktester doesn't link properly
|
||||
set(WITH_GO OFF)
|
||||
find_program(GO_EXECUTABLE go)
|
||||
# building the go binaries is currently not supported on Windows
|
||||
if(GO_EXECUTABLE AND NOT WIN32 AND WITH_C_BINDING)
|
||||
set(WITH_GO_BINDING ON)
|
||||
else()
|
||||
set(WITH_GO_BINDING OFF)
|
||||
endif()
|
||||
if (USE_SANITIZER)
|
||||
# Disable building go for sanitizers, since _stacktester doesn't link properly
|
||||
set(WITH_GO_BINDING OFF)
|
||||
endif()
|
||||
endif()
|
||||
|
||||
################################################################################
|
||||
# Ruby
|
||||
################################################################################
|
||||
|
||||
find_program(GEM_EXECUTABLE gem)
|
||||
set(WITH_RUBY OFF)
|
||||
if(GEM_EXECUTABLE)
|
||||
set(GEM_COMMAND ${RUBY_EXECUTABLE} ${GEM_EXECUTABLE})
|
||||
set(WITH_RUBY ON)
|
||||
option(BUILD_RUBY_BINDING "build ruby binding" ON)
|
||||
if(NOT BUILD_RUBY_BINDING OR NOT BUILD_C_BINDING)
|
||||
set(WITH_RUBY_BINDING OFF)
|
||||
else()
|
||||
find_program(GEM_EXECUTABLE gem)
|
||||
set(WITH_RUBY_BINDING OFF)
|
||||
if(GEM_EXECUTABLE AND WITH_C_BINDING)
|
||||
set(GEM_COMMAND ${RUBY_EXECUTABLE} ${GEM_EXECUTABLE})
|
||||
set(WITH_RUBY_BINDING ON)
|
||||
endif()
|
||||
endif()
|
||||
|
||||
################################################################################
|
||||
|
@ -160,20 +199,22 @@ function(print_components)
|
|||
message(STATUS "=========================================")
|
||||
message(STATUS " Components Build Overview ")
|
||||
message(STATUS "=========================================")
|
||||
message(STATUS "Build Java Bindings: ${WITH_JAVA}")
|
||||
message(STATUS "Build with TLS support: ${WITH_TLS}")
|
||||
message(STATUS "Build Go bindings: ${WITH_GO}")
|
||||
message(STATUS "Build Ruby bindings: ${WITH_RUBY}")
|
||||
message(STATUS "Build Python sdist (make package): ${WITH_PYTHON}")
|
||||
message(STATUS "Build Documentation (make html): ${WITH_DOCUMENTATION}")
|
||||
message(STATUS "Build Bindings (depends on Python): ${WITH_PYTHON}")
|
||||
message(STATUS "Build C Bindings: ${WITH_C_BINDING}")
|
||||
message(STATUS "Build Python Bindings: ${WITH_PYTHON_BINDING}")
|
||||
message(STATUS "Build Java Bindings: ${WITH_JAVA_BINDING}")
|
||||
message(STATUS "Build Go bindings: ${WITH_GO_BINDING}")
|
||||
message(STATUS "Build Ruby bindings: ${WITH_RUBY_BINDING}")
|
||||
message(STATUS "Build with TLS support: ${WITH_TLS}")
|
||||
message(STATUS "Build Documentation (make html): ${WITH_DOCUMENTATION}")
|
||||
message(STATUS "Build Python sdist (make package): ${WITH_PYTHON_BINDING}")
|
||||
message(STATUS "Configure CTest (depends on Python): ${WITH_PYTHON}")
|
||||
message(STATUS "Build with RocksDB: ${WITH_ROCKSDB_EXPERIMENTAL}")
|
||||
message(STATUS "=========================================")
|
||||
endfunction()
|
||||
|
||||
if(FORCE_ALL_COMPONENTS)
|
||||
if(NOT WITH_JAVA OR NOT WITH_TLS OR NOT WITH_GO OR NOT WITH_RUBY OR NOT WITH_PYTHON OR NOT WITH_DOCUMENTATION)
|
||||
if(NOT WITH_C_BINDING OR NOT WITH_JAVA_BINDING OR NOT WITH_TLS OR NOT WITH_GO_BINDING OR NOT WITH_RUBY_BINDING OR NOT WITH_PYTHON_BINDING OR NOT WITH_DOCUMENTATION)
|
||||
print_components()
|
||||
message(FATAL_ERROR "FORCE_ALL_COMPONENTS is set but not all dependencies could be found")
|
||||
endif()
|
||||
|
|
|
@ -1 +1 @@
|
|||
using @BOOST_TOOLSET@ : : @BOOST_CXX_COMPILER@ : @BOOST_ADDITIONAL_COMPILE_OPTIOINS@ ;
|
||||
using @BOOST_TOOLSET@ : : @BOOST_CXX_COMPILER@ : @BOOST_ADDITIONAL_COMPILE_OPTIONS@ ;
|
||||
|
|
|
@ -2,10 +2,17 @@
|
|||
Release Notes
|
||||
#############
|
||||
|
||||
6.3.19
|
||||
======
|
||||
* Add the ``trace_partial_file_suffix`` network option. This option will give unfinished trace files a special suffix to indicate they're not complete yet. When the trace file is complete, it is renamed to remove the suffix. `(PR #5330) <https://github.com/apple/foundationdb/pull/5330>`_
|
||||
|
||||
6.3.18
|
||||
======
|
||||
* The multi-version client API would not propagate errors that occurred when creating databases on external clients. This could result in a invalid memory accesses. `(PR #5221) <https://github.com/apple/foundationdb/pull/5221>`_
|
||||
* Fixed a race between the multi-version client connecting to a cluster and destroying the database that could cause an assertion failure. `(PR #5221) <https://github.com/apple/foundationdb/pull/5221>`_
|
||||
* Added Mako latency measurements. `(PR #5255) <https://github.com/apple/foundationdb/pull/5255>`_
|
||||
* Fixed a bug introduced when porting restoring an inconsistent snapshot feature from 7.0 branch to 6.3 branch. The parameter that controls whether to perform an inconsistent snapshot restore may instead be used to lock the database during restore. `(PR #5228) <https://github.com/apple/foundationdb/pull/5228>`_
|
||||
* Added SidebandMultiThreadClientTest, which validates causal consistency for multi-threaded client. `(PR #5173) <https://github.com/apple/foundationdb/pull/5173>`_
|
||||
|
||||
6.3.17
|
||||
======
|
||||
|
|
|
@ -29,6 +29,7 @@ Features
|
|||
* Added perpetual storage wiggle that supports less impactful B-trees recreation and data migration. These will also be used for deploying the Testing Storage Server which compares 2 storage engines' results. See :ref:`Documentation <perpetual-storage-wiggle>` for details. `(PR #4838) <https://github.com/apple/foundationdb/pull/4838>`_
|
||||
* Improved the efficiency with which storage servers replicate data between themselves. `(PR #5017) <https://github.com/apple/foundationdb/pull/5017>`_
|
||||
* Added support to ``exclude command`` to exclude based on locality match. `(PR #5113) <https://github.com/apple/foundationdb/pull/5113>`_
|
||||
* Add the ``trace_partial_file_suffix`` network option. This option will give unfinished trace files a special suffix to indicate they're not complete yet. When the trace file is complete, it is renamed to remove the suffix. `(PR #5328) <https://github.com/apple/foundationdb/pull/5328>`_
|
||||
|
||||
Performance
|
||||
-----------
|
||||
|
|
|
@ -311,71 +311,10 @@ std::vector<LogFile> getRelevantLogFiles(const std::vector<LogFile>& files, cons
|
|||
return filtered;
|
||||
}
|
||||
|
||||
std::pair<Version, int32_t> decode_key(const StringRef& key) {
|
||||
ASSERT(key.size() == sizeof(uint8_t) + sizeof(Version) + sizeof(int32_t));
|
||||
|
||||
uint8_t hash;
|
||||
Version version;
|
||||
int32_t part;
|
||||
BinaryReader rd(key, Unversioned());
|
||||
rd >> hash >> version >> part;
|
||||
version = bigEndian64(version);
|
||||
part = bigEndian32(part);
|
||||
|
||||
int32_t v = version / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
|
||||
ASSERT(((uint8_t)hashlittle(&v, sizeof(v), 0)) == hash);
|
||||
|
||||
return std::make_pair(version, part);
|
||||
}
|
||||
|
||||
// Decodes an encoded list of mutations in the format of:
|
||||
// [includeVersion:uint64_t][val_length:uint32_t][mutation_1][mutation_2]...[mutation_k],
|
||||
// where a mutation is encoded as:
|
||||
// [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][key][value]
|
||||
std::vector<MutationRef> decode_value(const StringRef& value) {
|
||||
StringRefReader reader(value, restore_corrupted_data());
|
||||
|
||||
reader.consume<uint64_t>(); // Consume the includeVersion
|
||||
uint32_t val_length = reader.consume<uint32_t>();
|
||||
if (val_length != value.size() - sizeof(uint64_t) - sizeof(uint32_t)) {
|
||||
TraceEvent(SevError, "ValueError")
|
||||
.detail("ValueLen", val_length)
|
||||
.detail("ValueSize", value.size())
|
||||
.detail("Value", printable(value));
|
||||
}
|
||||
|
||||
std::vector<MutationRef> mutations;
|
||||
while (1) {
|
||||
if (reader.eof())
|
||||
break;
|
||||
|
||||
// Deserialization of a MutationRef, which was packed by MutationListRef::push_back_deep()
|
||||
uint32_t type, p1len, p2len;
|
||||
type = reader.consume<uint32_t>();
|
||||
p1len = reader.consume<uint32_t>();
|
||||
p2len = reader.consume<uint32_t>();
|
||||
|
||||
const uint8_t* key = reader.consume(p1len);
|
||||
const uint8_t* val = reader.consume(p2len);
|
||||
|
||||
mutations.emplace_back((MutationRef::Type)type, StringRef(key, p1len), StringRef(val, p2len));
|
||||
}
|
||||
return mutations;
|
||||
}
|
||||
|
||||
struct VersionedMutations {
|
||||
Version version;
|
||||
std::vector<MutationRef> mutations;
|
||||
Arena arena; // The arena that contains the mutations.
|
||||
};
|
||||
|
||||
struct VersionedKVPart {
|
||||
Arena arena;
|
||||
Version version;
|
||||
int32_t part;
|
||||
StringRef kv;
|
||||
VersionedKVPart(Arena arena, Version version, int32_t part, StringRef kv)
|
||||
: arena(arena), version(version), part(part), kv(kv) {}
|
||||
std::string serializedMutations; // buffer that contains mutations
|
||||
};
|
||||
|
||||
/*
|
||||
|
@ -395,174 +334,66 @@ struct VersionedKVPart {
|
|||
* at any time this object might have two blocks of data in memory.
|
||||
*/
|
||||
class DecodeProgress {
|
||||
std::vector<VersionedKVPart> keyValues;
|
||||
std::vector<Standalone<VectorRef<KeyValueRef>>> blocks;
|
||||
std::unordered_map<Version, fileBackup::AccumulatedMutations> mutationBlocksByVersion;
|
||||
|
||||
public:
|
||||
DecodeProgress() = default;
|
||||
template <class U>
|
||||
DecodeProgress(const LogFile& file, U&& values) : keyValues(std::forward<U>(values)), file(file) {}
|
||||
DecodeProgress(const LogFile& file) : file(file) {}
|
||||
|
||||
// If there are no more mutations to pull from the file.
|
||||
// However, we could have unfinished version in the buffer when EOF is true,
|
||||
// which means we should look for data in the next file. The caller
|
||||
// should call getUnfinishedBuffer() to get these left data.
|
||||
bool finished() const { return (eof && keyValues.empty()) || (leftover && !keyValues.empty()); }
|
||||
|
||||
std::vector<VersionedKVPart>&& getUnfinishedBuffer() && { return std::move(keyValues); }
|
||||
|
||||
// Returns all mutations of the next version in a batch.
|
||||
Future<VersionedMutations> getNextBatch() { return getNextBatchImpl(this); }
|
||||
bool finished() const { return done; }
|
||||
|
||||
// Open and loads file into memory
|
||||
Future<Void> openFile(Reference<IBackupContainer> container) { return openFileImpl(this, container); }
|
||||
|
||||
// The following are private APIs:
|
||||
|
||||
// Returns true if value contains complete data.
|
||||
static bool isValueComplete(StringRef value) {
|
||||
StringRefReader reader(value, restore_corrupted_data());
|
||||
|
||||
reader.consume<uint64_t>(); // Consume the includeVersion
|
||||
uint32_t val_length = reader.consume<uint32_t>();
|
||||
return val_length == value.size() - sizeof(uint64_t) - sizeof(uint32_t);
|
||||
}
|
||||
|
||||
// PRECONDITION: finished() must return false before calling this function.
|
||||
// Returns the next batch of mutations along with the arena backing it.
|
||||
// Note the returned batch can be empty when the file has unfinished
|
||||
// version batch data that are in the next file.
|
||||
ACTOR static Future<VersionedMutations> getNextBatchImpl(DecodeProgress* self) {
|
||||
ASSERT(!self->finished());
|
||||
VersionedMutations getNextBatch() {
|
||||
ASSERT(!finished());
|
||||
|
||||
loop {
|
||||
if (self->keyValues.size() <= 1) {
|
||||
// Try to decode another block when less than one left
|
||||
wait(readAndDecodeFile(self));
|
||||
}
|
||||
|
||||
const auto& kv = self->keyValues[0];
|
||||
ASSERT(kv.part == 0);
|
||||
|
||||
// decode next versions, check if they are continuous parts
|
||||
int idx = 1; // next kv pair in "keyValues"
|
||||
int bufSize = kv.kv.size();
|
||||
for (int lastPart = 0; idx < self->keyValues.size(); idx++, lastPart++) {
|
||||
if (idx == self->keyValues.size())
|
||||
break;
|
||||
|
||||
const auto& nextKV = self->keyValues[idx];
|
||||
if (kv.version != nextKV.version) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (lastPart + 1 != nextKV.part) {
|
||||
TraceEvent("DecodeError").detail("Part1", lastPart).detail("Part2", nextKV.part);
|
||||
throw restore_corrupted_data();
|
||||
}
|
||||
bufSize += nextKV.kv.size();
|
||||
}
|
||||
|
||||
VersionedMutations m;
|
||||
m.version = kv.version;
|
||||
TraceEvent("Decode").detail("Version", m.version).detail("Idx", idx).detail("Q", self->keyValues.size());
|
||||
StringRef value = kv.kv;
|
||||
if (idx > 1) {
|
||||
// Stitch parts into one and then decode one by one
|
||||
Standalone<StringRef> buf = self->combineValues(idx, bufSize);
|
||||
value = buf;
|
||||
m.arena = buf.arena();
|
||||
}
|
||||
if (isValueComplete(value)) {
|
||||
m.mutations = decode_value(value);
|
||||
if (m.arena.getSize() == 0) {
|
||||
m.arena = kv.arena;
|
||||
}
|
||||
self->keyValues.erase(self->keyValues.begin(), self->keyValues.begin() + idx);
|
||||
return m;
|
||||
} else if (!self->eof) {
|
||||
// Read one more block, hopefully the missing part of the value can be found.
|
||||
wait(readAndDecodeFile(self));
|
||||
} else {
|
||||
TraceEvent(SevWarn, "MissingValue").detail("Version", m.version);
|
||||
self->leftover = true;
|
||||
return m; // Empty mutations
|
||||
VersionedMutations vms;
|
||||
for (auto& [version, m] : mutationBlocksByVersion) {
|
||||
if (m.isComplete()) {
|
||||
vms.version = version;
|
||||
std::vector<MutationRef> mutations = fileBackup::decodeMutationLogValue(m.serializedMutations);
|
||||
TraceEvent("Decode").detail("Version", vms.version).detail("N", mutations.size());
|
||||
vms.mutations.insert(vms.mutations.end(), mutations.begin(), mutations.end());
|
||||
vms.serializedMutations = m.serializedMutations;
|
||||
mutationBlocksByVersion.erase(version);
|
||||
return vms;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Returns a buffer which stitches first "idx" values into one.
|
||||
// "len" MUST equal the summation of these values.
|
||||
Standalone<StringRef> combineValues(const int idx, const int len) {
|
||||
ASSERT(idx <= keyValues.size() && idx > 1);
|
||||
|
||||
Standalone<StringRef> buf = makeString(len);
|
||||
int n = 0;
|
||||
for (int i = 0; i < idx; i++) {
|
||||
const auto& value = keyValues[i].kv;
|
||||
memcpy(mutateString(buf) + n, value.begin(), value.size());
|
||||
n += value.size();
|
||||
}
|
||||
|
||||
ASSERT(n == len);
|
||||
return buf;
|
||||
}
|
||||
|
||||
// Decodes a block into KeyValueRef stored in "keyValues".
|
||||
void decode_block(const Standalone<StringRef>& buf, int len) {
|
||||
StringRef block(buf.begin(), len);
|
||||
StringRefReader reader(block, restore_corrupted_data());
|
||||
|
||||
try {
|
||||
// Read header, currently only decoding version BACKUP_AGENT_MLOG_VERSION
|
||||
if (reader.consume<int32_t>() != BACKUP_AGENT_MLOG_VERSION)
|
||||
throw restore_unsupported_file_version();
|
||||
|
||||
// Read k/v pairs. Block ends either at end of last value exactly or with 0xFF as first key len byte.
|
||||
while (1) {
|
||||
// If eof reached or first key len bytes is 0xFF then end of block was reached.
|
||||
if (reader.eof() || *reader.rptr == 0xFF)
|
||||
break;
|
||||
|
||||
// Read key and value. If anything throws then there is a problem.
|
||||
uint32_t kLen = reader.consumeNetworkUInt32();
|
||||
const uint8_t* k = reader.consume(kLen);
|
||||
std::pair<Version, int32_t> version_part = decode_key(StringRef(k, kLen));
|
||||
uint32_t vLen = reader.consumeNetworkUInt32();
|
||||
const uint8_t* v = reader.consume(vLen);
|
||||
TraceEvent(SevDecodeInfo, "Block")
|
||||
.detail("KeySize", kLen)
|
||||
.detail("valueSize", vLen)
|
||||
.detail("Offset", reader.rptr - buf.begin())
|
||||
.detail("Version", version_part.first)
|
||||
.detail("Part", version_part.second);
|
||||
keyValues.emplace_back(buf.arena(), version_part.first, version_part.second, StringRef(v, vLen));
|
||||
}
|
||||
|
||||
// Make sure any remaining bytes in the block are 0xFF
|
||||
for (auto b : reader.remainder()) {
|
||||
if (b != 0xFF)
|
||||
throw restore_corrupted_data_padding();
|
||||
}
|
||||
|
||||
// The (version, part) in a block can be out of order, i.e., (3, 0)
|
||||
// can be followed by (4, 0), and then (3, 1). So we need to sort them
|
||||
// first by version, and then by part number.
|
||||
std::sort(keyValues.begin(), keyValues.end(), [](const VersionedKVPart& a, const VersionedKVPart& b) {
|
||||
return a.version == b.version ? a.part < b.part : a.version < b.version;
|
||||
});
|
||||
return;
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevWarn, "CorruptBlock").error(e).detail("Offset", reader.rptr - buf.begin());
|
||||
throw;
|
||||
// No complete versions
|
||||
if (!mutationBlocksByVersion.empty()) {
|
||||
TraceEvent(SevWarn, "UnfishedBlocks").detail("NumberOfVersions", mutationBlocksByVersion.size());
|
||||
}
|
||||
done = true;
|
||||
return vms;
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> openFileImpl(DecodeProgress* self, Reference<IBackupContainer> container) {
|
||||
Reference<IAsyncFile> fd = wait(container->readFile(self->file.fileName));
|
||||
self->fd = fd;
|
||||
wait(readAndDecodeFile(self));
|
||||
while (!self->eof) {
|
||||
wait(readAndDecodeFile(self));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Add chunks to mutationBlocksByVersion
|
||||
void addBlockKVPairs(VectorRef<KeyValueRef> chunks) {
|
||||
for (auto& kv : chunks) {
|
||||
auto versionAndChunkNumber = fileBackup::decodeMutationLogKey(kv.key);
|
||||
mutationBlocksByVersion[versionAndChunkNumber.first].addChunk(versionAndChunkNumber.second, kv);
|
||||
}
|
||||
}
|
||||
|
||||
// Reads a file block, decodes it into key/value pairs, and stores these pairs.
|
||||
ACTOR static Future<Void> readAndDecodeFile(DecodeProgress* self) {
|
||||
try {
|
||||
|
@ -572,17 +403,18 @@ public:
|
|||
return Void();
|
||||
}
|
||||
|
||||
state Standalone<StringRef> buf = makeString(len);
|
||||
state int rLen = wait(self->fd->read(mutateString(buf), len, self->offset));
|
||||
// Decode a file block into log_key and log_value chunks
|
||||
Standalone<VectorRef<KeyValueRef>> chunks =
|
||||
wait(fileBackup::decodeMutationLogFileBlock(self->fd, self->offset, len));
|
||||
self->blocks.push_back(chunks);
|
||||
|
||||
TraceEvent("ReadFile")
|
||||
.detail("Name", self->file.fileName)
|
||||
.detail("Len", rLen)
|
||||
.detail("Len", len)
|
||||
.detail("Offset", self->offset);
|
||||
if (rLen != len) {
|
||||
throw restore_corrupted_data();
|
||||
}
|
||||
self->decode_block(buf, rLen);
|
||||
self->offset += rLen;
|
||||
self->addBlockKVPairs(chunks);
|
||||
self->offset += len;
|
||||
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevWarn, "CorruptLogFileBlock")
|
||||
|
@ -598,9 +430,52 @@ public:
|
|||
Reference<IAsyncFile> fd;
|
||||
int64_t offset = 0;
|
||||
bool eof = false;
|
||||
bool leftover = false; // Done but has unfinished version batch data left
|
||||
bool done = false;
|
||||
};
|
||||
|
||||
ACTOR Future<Void> process_file(Reference<IBackupContainer> container, LogFile file, UID uid, DecodeParams params) {
|
||||
if (file.fileSize == 0) {
|
||||
TraceEvent("SkipEmptyFile", uid).detail("Name", file.fileName);
|
||||
return Void();
|
||||
}
|
||||
|
||||
state DecodeProgress progress(file);
|
||||
wait(progress.openFile(container));
|
||||
while (!progress.finished()) {
|
||||
VersionedMutations vms = progress.getNextBatch();
|
||||
if (vms.version < params.beginVersionFilter || vms.version >= params.endVersionFilter) {
|
||||
TraceEvent("SkipVersion").detail("Version", vms.version);
|
||||
continue;
|
||||
}
|
||||
|
||||
int sub = 0;
|
||||
for (const auto& m : vms.mutations) {
|
||||
sub++; // sub sequence number starts at 1
|
||||
bool print = params.prefix.empty(); // no filtering
|
||||
|
||||
if (!print) {
|
||||
if (isSingleKeyMutation((MutationRef::Type)m.type)) {
|
||||
print = m.param1.startsWith(StringRef(params.prefix));
|
||||
} else if (m.type == MutationRef::ClearRange) {
|
||||
KeyRange range(KeyRangeRef(m.param1, m.param2));
|
||||
print = range.contains(StringRef(params.prefix));
|
||||
} else {
|
||||
ASSERT(false);
|
||||
}
|
||||
}
|
||||
if (print) {
|
||||
TraceEvent(format("Mutation_%llu_%d", vms.version, sub).c_str(), uid)
|
||||
.detail("Version", vms.version)
|
||||
.setMaxFieldLength(10000)
|
||||
.detail("M", m.toString());
|
||||
std::cout << vms.version << " " << m.toString() << "\n";
|
||||
}
|
||||
}
|
||||
}
|
||||
TraceEvent("ProcessFileDone", uid).detail("File", file.fileName);
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> decode_logs(DecodeParams params) {
|
||||
state Reference<IBackupContainer> container = IBackupContainer::openContainer(params.container_url);
|
||||
state UID uid = deterministicRandom()->randomUniqueID();
|
||||
|
@ -625,49 +500,11 @@ ACTOR Future<Void> decode_logs(DecodeParams params) {
|
|||
|
||||
if (params.list_only) return Void();
|
||||
|
||||
state int i = 0;
|
||||
// Previous file's unfinished version data
|
||||
state std::vector<VersionedKVPart> left;
|
||||
for (; i < logs.size(); i++) {
|
||||
if (logs[i].fileSize == 0)
|
||||
continue;
|
||||
|
||||
state DecodeProgress progress(logs[i], std::move(left));
|
||||
wait(progress.openFile(container));
|
||||
while (!progress.finished()) {
|
||||
VersionedMutations vms = wait(progress.getNextBatch());
|
||||
if (vms.version < params.beginVersionFilter || vms.version >= params.endVersionFilter) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int i = 0;
|
||||
for (const auto& m : vms.mutations) {
|
||||
i++; // sub sequence number starts at 1
|
||||
bool print = params.prefix.empty(); // no filtering
|
||||
|
||||
if (!print) {
|
||||
if (isSingleKeyMutation((MutationRef::Type)m.type)) {
|
||||
print = m.param1.startsWith(StringRef(params.prefix));
|
||||
} else if (m.type == MutationRef::ClearRange) {
|
||||
KeyRange range(KeyRangeRef(m.param1, m.param2));
|
||||
print = range.contains(StringRef(params.prefix));
|
||||
} else {
|
||||
ASSERT(false);
|
||||
}
|
||||
}
|
||||
if (print) {
|
||||
TraceEvent(format("Mutation_%d_%d", vms.version, i).c_str(), uid)
|
||||
.detail("Version", vms.version)
|
||||
.setMaxFieldLength(10000)
|
||||
.detail("M", m);
|
||||
std::cout << vms.version << " " << m.toString() << "\n";
|
||||
}
|
||||
}
|
||||
}
|
||||
left = std::move(progress).getUnfinishedBuffer();
|
||||
if (!left.empty()) {
|
||||
TraceEvent("UnfinishedFile").detail("File", logs[i].fileName).detail("Q", left.size());
|
||||
}
|
||||
state int idx = 0;
|
||||
while (idx < logs.size()) {
|
||||
TraceEvent("ProcessFile").detail("Name", logs[idx].fileName).detail("I", idx);
|
||||
wait(process_file(container, logs[idx], uid, params));
|
||||
idx++;
|
||||
}
|
||||
TraceEvent("DecodeDone", uid).log();
|
||||
return Void();
|
||||
|
|
|
@ -1667,7 +1667,7 @@ ACTOR Future<Void> cleanupStatus(Reference<ReadYourWritesTransaction> tr,
|
|||
readMore = true;
|
||||
} catch (Error& e) {
|
||||
// If doc can't be parsed or isn't alive, delete it.
|
||||
TraceEvent(SevWarn, "RemovedDeadBackupLayerStatus").detail("Key", docs[i].key);
|
||||
TraceEvent(SevWarn, "RemovedDeadBackupLayerStatus").detail("Key", docs[i].key).error(e, true);
|
||||
tr->clear(docs[i].key);
|
||||
// If limit is 1 then read more.
|
||||
if (limit == 1)
|
||||
|
|
|
@ -7,6 +7,7 @@ set(FDBCLI_SRCS
|
|||
FlowLineNoise.h
|
||||
ForceRecoveryWithDataLossCommand.actor.cpp
|
||||
MaintenanceCommand.actor.cpp
|
||||
SetClassCommand.actor.cpp
|
||||
SnapshotCommand.actor.cpp
|
||||
ThrottleCommand.actor.cpp
|
||||
Util.cpp
|
||||
|
|
|
@ -0,0 +1,123 @@
|
|||
/*
|
||||
* SetClassCommand.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbcli/fdbcli.actor.h"
|
||||
|
||||
#include "fdbclient/FDBOptions.g.h"
|
||||
#include "fdbclient/IClientApi.h"
|
||||
#include "fdbclient/Knobs.h"
|
||||
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/FastRef.h"
|
||||
#include "flow/ThreadHelper.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
namespace {
|
||||
|
||||
ACTOR Future<Void> printProcessClass(Reference<IDatabase> db) {
|
||||
state Reference<ITransaction> tr = db->createTransaction();
|
||||
loop {
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
try {
|
||||
// Hold the reference to the memory
|
||||
state ThreadFuture<RangeResult> classTypeFuture =
|
||||
tr->getRange(fdb_cli::processClassTypeSpecialKeyRange, CLIENT_KNOBS->TOO_MANY);
|
||||
state ThreadFuture<RangeResult> classSourceFuture =
|
||||
tr->getRange(fdb_cli::processClassSourceSpecialKeyRange, CLIENT_KNOBS->TOO_MANY);
|
||||
wait(success(safeThreadFutureToFuture(classSourceFuture)) &&
|
||||
success(safeThreadFutureToFuture(classTypeFuture)));
|
||||
RangeResult processTypeList = classTypeFuture.get();
|
||||
RangeResult processSourceList = classSourceFuture.get();
|
||||
ASSERT(processSourceList.size() == processTypeList.size());
|
||||
if (!processTypeList.size())
|
||||
printf("No processes are registered in the database.\n");
|
||||
printf("There are currently %zu processes in the database:\n", processTypeList.size());
|
||||
for (int index = 0; index < processTypeList.size(); index++) {
|
||||
std::string address =
|
||||
processTypeList[index].key.removePrefix(fdb_cli::processClassTypeSpecialKeyRange.begin).toString();
|
||||
// check the addresses are the same in each list
|
||||
std::string addressFromSourceList =
|
||||
processSourceList[index]
|
||||
.key.removePrefix(fdb_cli::processClassSourceSpecialKeyRange.begin)
|
||||
.toString();
|
||||
ASSERT(address == addressFromSourceList);
|
||||
printf(" %s: %s (%s)\n",
|
||||
address.c_str(),
|
||||
processTypeList[index].value.toString().c_str(),
|
||||
processSourceList[index].value.toString().c_str());
|
||||
}
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(safeThreadFutureToFuture(tr->onError(e)));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
ACTOR Future<bool> setProcessClass(Reference<IDatabase> db, KeyRef network_address, KeyRef class_type) {
|
||||
state Reference<ITransaction> tr = db->createTransaction();
|
||||
loop {
|
||||
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
|
||||
try {
|
||||
tr->set(network_address.withPrefix(fdb_cli::processClassTypeSpecialKeyRange.begin), class_type);
|
||||
wait(safeThreadFutureToFuture(tr->commit()));
|
||||
return true;
|
||||
} catch (Error& e) {
|
||||
wait(safeThreadFutureToFuture(tr->onError(e)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
namespace fdb_cli {
|
||||
|
||||
const KeyRangeRef processClassSourceSpecialKeyRange =
|
||||
KeyRangeRef(LiteralStringRef("\xff\xff/configuration/process/class_source/"),
|
||||
LiteralStringRef("\xff\xff/configuration/process/class_source0"));
|
||||
|
||||
const KeyRangeRef processClassTypeSpecialKeyRange =
|
||||
KeyRangeRef(LiteralStringRef("\xff\xff/configuration/process/class_type/"),
|
||||
LiteralStringRef("\xff\xff/configuration/process/class_type0"));
|
||||
|
||||
ACTOR Future<bool> setClassCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens) {
|
||||
if (tokens.size() != 3 && tokens.size() != 1) {
|
||||
printUsage(tokens[0]);
|
||||
return false;
|
||||
} else if (tokens.size() == 1) {
|
||||
wait(printProcessClass(db));
|
||||
} else {
|
||||
bool successful = wait(setProcessClass(db, tokens[1], tokens[2]));
|
||||
return successful;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
CommandFactory setClassFactory(
|
||||
"setclass",
|
||||
CommandHelp("setclass [<ADDRESS> <CLASS>]",
|
||||
"change the class of a process",
|
||||
"If no address and class are specified, lists the classes of all servers.\n\nSetting the class to "
|
||||
"`default' resets the process class to the class specified on the command line. The available "
|
||||
"classes are `unset', `storage', `transaction', `resolution', `commit_proxy', `grv_proxy', "
|
||||
"`master', `test', "
|
||||
"`stateless', `log', `router', `cluster_controller', `fast_restore', `data_distributor', "
|
||||
"`coordinator', `ratekeeper', `storage_cache', `backup', and `default'."));
|
||||
|
||||
} // namespace fdb_cli
|
|
@ -566,15 +566,6 @@ void initHelp() {
|
|||
"pair in <ADDRESS...> or any LocalityData (like dcid, zoneid, machineid, processid), removes any "
|
||||
"matching exclusions from the excluded servers and localities list. "
|
||||
"(A specified IP will match all IP:* exclusion entries)");
|
||||
helpMap["setclass"] =
|
||||
CommandHelp("setclass [<ADDRESS> <CLASS>]",
|
||||
"change the class of a process",
|
||||
"If no address and class are specified, lists the classes of all servers.\n\nSetting the class to "
|
||||
"`default' resets the process class to the class specified on the command line. The available "
|
||||
"classes are `unset', `storage', `transaction', `resolution', `commit_proxy', `grv_proxy', "
|
||||
"`master', `test', "
|
||||
"`stateless', `log', `router', `cluster_controller', `fast_restore', `data_distributor', "
|
||||
"`coordinator', `ratekeeper', `storage_cache', `backup', and `default'.");
|
||||
helpMap["status"] =
|
||||
CommandHelp("status [minimal|details|json]",
|
||||
"get the status of a FoundationDB cluster",
|
||||
|
@ -2742,45 +2733,6 @@ ACTOR Future<bool> createSnapshot(Database db, std::vector<StringRef> tokens) {
|
|||
return false;
|
||||
}
|
||||
|
||||
ACTOR Future<bool> setClass(Database db, std::vector<StringRef> tokens) {
|
||||
if (tokens.size() == 1) {
|
||||
vector<ProcessData> _workers = wait(makeInterruptable(getWorkers(db)));
|
||||
auto workers = _workers; // strip const
|
||||
|
||||
if (!workers.size()) {
|
||||
printf("No processes are registered in the database.\n");
|
||||
return false;
|
||||
}
|
||||
|
||||
std::sort(workers.begin(), workers.end(), ProcessData::sort_by_address());
|
||||
|
||||
printf("There are currently %zu processes in the database:\n", workers.size());
|
||||
for (const auto& w : workers)
|
||||
printf(" %s: %s (%s)\n",
|
||||
w.address.toString().c_str(),
|
||||
w.processClass.toString().c_str(),
|
||||
w.processClass.sourceString().c_str());
|
||||
return false;
|
||||
}
|
||||
|
||||
AddressExclusion addr = AddressExclusion::parse(tokens[1]);
|
||||
if (!addr.isValid()) {
|
||||
fprintf(stderr, "ERROR: '%s' is not a valid network endpoint address\n", tokens[1].toString().c_str());
|
||||
if (tokens[1].toString().find(":tls") != std::string::npos)
|
||||
printf(" Do not include the `:tls' suffix when naming a process\n");
|
||||
return true;
|
||||
}
|
||||
|
||||
ProcessClass processClass(tokens[2].toString(), ProcessClass::DBSource);
|
||||
if (processClass.classType() == ProcessClass::InvalidClass && tokens[2] != LiteralStringRef("default")) {
|
||||
fprintf(stderr, "ERROR: '%s' is not a valid process class\n", tokens[2].toString().c_str());
|
||||
return true;
|
||||
}
|
||||
|
||||
wait(makeInterruptable(setClass(db, addr, processClass)));
|
||||
return false;
|
||||
};
|
||||
|
||||
Reference<ReadYourWritesTransaction> getTransaction(Database db,
|
||||
Reference<ReadYourWritesTransaction>& tr,
|
||||
FdbOptions* options,
|
||||
|
@ -3689,14 +3641,9 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
}
|
||||
|
||||
if (tokencmp(tokens[0], "setclass")) {
|
||||
if (tokens.size() != 3 && tokens.size() != 1) {
|
||||
printUsage(tokens[0]);
|
||||
bool _result = wait(makeInterruptable(setClassCommandActor(db2, tokens)));
|
||||
if (!_result)
|
||||
is_error = true;
|
||||
} else {
|
||||
bool err = wait(setClass(db, tokens));
|
||||
if (err)
|
||||
is_error = true;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -64,7 +64,9 @@ extern const KeyRef consistencyCheckSpecialKey;
|
|||
// maintenance
|
||||
extern const KeyRangeRef maintenanceSpecialKeyRange;
|
||||
extern const KeyRef ignoreSSFailureSpecialKey;
|
||||
|
||||
// setclass
|
||||
extern const KeyRangeRef processClassSourceSpecialKeyRange;
|
||||
extern const KeyRangeRef processClassTypeSpecialKeyRange;
|
||||
// help functions (Copied from fdbcli.actor.cpp)
|
||||
|
||||
// compare StringRef with the given c string
|
||||
|
@ -81,6 +83,8 @@ ACTOR Future<bool> consistencyCheckCommandActor(Reference<ITransaction> tr, std:
|
|||
ACTOR Future<bool> forceRecoveryWithDataLossCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
|
||||
// maintenance command
|
||||
ACTOR Future<bool> maintenanceCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
|
||||
// setclass command
|
||||
ACTOR Future<bool> setClassCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
|
||||
// snapshot command
|
||||
ACTOR Future<bool> snapshotCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
|
||||
// throttle command
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <atomic>
|
||||
|
||||
#include "fdbclient/AsyncTaskThread.h"
|
||||
#include "flow/UnitTest.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
@ -30,13 +32,22 @@ public:
|
|||
bool isTerminate() const override { return true; }
|
||||
};
|
||||
|
||||
ACTOR Future<Void> asyncTaskThreadClient(AsyncTaskThread* asyncTaskThread, int* sum, int count) {
|
||||
ACTOR Future<Void> asyncTaskThreadClient(AsyncTaskThread* asyncTaskThread, std::atomic<int> *sum, int count, int clientId, double meanSleep) {
|
||||
state int i = 0;
|
||||
state double randomSleep = 0.0;
|
||||
for (; i < count; ++i) {
|
||||
randomSleep = deterministicRandom()->random01() * 2 * meanSleep;
|
||||
wait(delay(randomSleep));
|
||||
wait(asyncTaskThread->execAsync([sum = sum] {
|
||||
++(*sum);
|
||||
sum->fetch_add(1);
|
||||
return Void();
|
||||
}));
|
||||
TraceEvent("AsyncTaskThreadIncrementedSum")
|
||||
.detail("Index", i)
|
||||
.detail("Sum", sum->load())
|
||||
.detail("ClientId", clientId)
|
||||
.detail("RandomSleep", randomSleep)
|
||||
.detail("MeanSleep", meanSleep);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
@ -51,7 +62,7 @@ AsyncTaskThread::~AsyncTaskThread() {
|
|||
bool wakeUp = false;
|
||||
{
|
||||
std::lock_guard<std::mutex> g(m);
|
||||
wakeUp = queue.push(std::make_shared<TerminateTask>());
|
||||
wakeUp = queue.push(std::make_unique<TerminateTask>());
|
||||
}
|
||||
if (wakeUp) {
|
||||
cv.notify_one();
|
||||
|
@ -61,7 +72,7 @@ AsyncTaskThread::~AsyncTaskThread() {
|
|||
|
||||
void AsyncTaskThread::run(AsyncTaskThread* self) {
|
||||
while (true) {
|
||||
std::shared_ptr<IAsyncTask> task;
|
||||
std::unique_ptr<IAsyncTask> task;
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(self->m);
|
||||
self->cv.wait(lk, [self] { return !self->queue.canSleep(); });
|
||||
|
@ -75,14 +86,30 @@ void AsyncTaskThread::run(AsyncTaskThread* self) {
|
|||
}
|
||||
|
||||
TEST_CASE("/asynctaskthread/add") {
|
||||
state int sum = 0;
|
||||
state std::atomic<int> sum = 0;
|
||||
state AsyncTaskThread asyncTaskThread;
|
||||
state int numClients = 10;
|
||||
state int incrementsPerClient = 100;
|
||||
std::vector<Future<Void>> clients;
|
||||
clients.reserve(10);
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
clients.push_back(asyncTaskThreadClient(&asyncTaskThread, &sum, 100));
|
||||
clients.reserve(numClients);
|
||||
for (int clientId = 0; clientId < numClients; ++clientId) {
|
||||
clients.push_back(asyncTaskThreadClient(&asyncTaskThread, &sum, incrementsPerClient, clientId, deterministicRandom()->random01() * 0.01));
|
||||
}
|
||||
wait(waitForAll(clients));
|
||||
ASSERT_EQ(sum, 1000);
|
||||
ASSERT_EQ(sum.load(), numClients * incrementsPerClient);
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/asynctaskthread/error") {
|
||||
state AsyncTaskThread asyncTaskThread;
|
||||
try {
|
||||
wait(asyncTaskThread.execAsync([]{
|
||||
throw operation_failed();
|
||||
return Void();
|
||||
}));
|
||||
ASSERT(false);
|
||||
} catch (Error &e) {
|
||||
ASSERT_EQ(e.code(), error_code_operation_failed);
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ public:
|
|||
};
|
||||
|
||||
class AsyncTaskThread {
|
||||
ThreadSafeQueue<std::shared_ptr<IAsyncTask>> queue;
|
||||
ThreadSafeQueue<std::unique_ptr<IAsyncTask>> queue;
|
||||
std::condition_variable cv;
|
||||
std::mutex m;
|
||||
std::thread thread;
|
||||
|
@ -60,7 +60,7 @@ class AsyncTaskThread {
|
|||
bool wakeUp = false;
|
||||
{
|
||||
std::lock_guard<std::mutex> g(m);
|
||||
wakeUp = queue.push(std::make_shared<AsyncTask<F>>(func));
|
||||
wakeUp = queue.push(std::make_unique<AsyncTask<F>>(func));
|
||||
}
|
||||
if (wakeUp) {
|
||||
cv.notify_one();
|
||||
|
@ -88,6 +88,7 @@ public:
|
|||
auto funcResult = func();
|
||||
onMainThreadVoid([promise, funcResult] { promise.send(funcResult); }, nullptr, priority);
|
||||
} catch (Error& e) {
|
||||
TraceEvent("ErrorExecutingAsyncTask").error(e);
|
||||
onMainThreadVoid([promise, e] { promise.sendError(e); }, nullptr, priority);
|
||||
}
|
||||
});
|
||||
|
|
|
@ -717,11 +717,22 @@ protected:
|
|||
|
||||
template <>
|
||||
inline Tuple Codec<Reference<IBackupContainer>>::pack(Reference<IBackupContainer> const& bc) {
|
||||
return Tuple().append(StringRef(bc->getURL()));
|
||||
Tuple tuple;
|
||||
tuple.append(StringRef(bc->getURL()));
|
||||
if (bc->getEncryptionKeyFileName().present()) {
|
||||
tuple.append(bc->getEncryptionKeyFileName().get());
|
||||
}
|
||||
return tuple;
|
||||
}
|
||||
template <>
|
||||
inline Reference<IBackupContainer> Codec<Reference<IBackupContainer>>::unpack(Tuple const& val) {
|
||||
return IBackupContainer::openContainer(val.getString(0).toString());
|
||||
ASSERT(val.size() == 1 || val.size() == 2);
|
||||
auto url = val.getString(0).toString();
|
||||
Optional<std::string> encryptionKeyFileName;
|
||||
if (val.size() == 2) {
|
||||
encryptionKeyFileName = val.getString(1).toString();
|
||||
}
|
||||
return IBackupContainer::openContainer(url, encryptionKeyFileName);
|
||||
}
|
||||
|
||||
class BackupConfig : public KeyBackedConfig {
|
||||
|
@ -970,6 +981,11 @@ ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeRangeFileBlock(Reference<
|
|||
int64_t offset,
|
||||
int len);
|
||||
|
||||
// Reads a mutation log block from file and parses into batch mutation blocks for further parsing.
|
||||
ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeMutationLogFileBlock(Reference<IAsyncFile> file,
|
||||
int64_t offset,
|
||||
int len);
|
||||
|
||||
// Return a block of contiguous padding bytes "\0xff" for backup files, growing if needed.
|
||||
Value makePadding(int size);
|
||||
} // namespace fileBackup
|
||||
|
|
|
@ -284,11 +284,11 @@ Reference<IBackupContainer> IBackupContainer::openContainer(const std::string& u
|
|||
#ifdef BUILD_AZURE_BACKUP
|
||||
else if (u.startsWith("azure://"_sr)) {
|
||||
u.eat("azure://"_sr);
|
||||
auto address = NetworkAddress::parse(u.eat("/"_sr).toString());
|
||||
auto accountName = u.eat("@"_sr).toString();
|
||||
auto endpoint = u.eat("/"_sr).toString();
|
||||
auto containerName = u.eat("/"_sr).toString();
|
||||
auto accountName = u.eat("/"_sr).toString();
|
||||
r = makeReference<BackupContainerAzureBlobStore>(
|
||||
address, containerName, accountName, encryptionKeyFileName);
|
||||
endpoint, accountName, containerName, encryptionKeyFileName);
|
||||
}
|
||||
#endif
|
||||
else {
|
||||
|
@ -296,6 +296,7 @@ Reference<IBackupContainer> IBackupContainer::openContainer(const std::string& u
|
|||
throw backup_invalid_url();
|
||||
}
|
||||
|
||||
r->encryptionKeyFileName = encryptionKeyFileName;
|
||||
r->URL = url;
|
||||
return r;
|
||||
} catch (Error& e) {
|
||||
|
|
|
@ -298,12 +298,53 @@ public:
|
|||
static std::vector<std::string> getURLFormats();
|
||||
static Future<std::vector<std::string>> listContainers(const std::string& baseURL);
|
||||
|
||||
std::string getURL() const { return URL; }
|
||||
std::string const &getURL() const { return URL; }
|
||||
Optional<std::string> const &getEncryptionKeyFileName() const { return encryptionKeyFileName; }
|
||||
|
||||
static std::string lastOpenError;
|
||||
|
||||
private:
|
||||
std::string URL;
|
||||
Optional<std::string> encryptionKeyFileName;
|
||||
};
|
||||
|
||||
namespace fileBackup {
|
||||
// Accumulates mutation log value chunks, as both a vector of chunks and as a combined chunk,
|
||||
// in chunk order, and can check the chunk set for completion or intersection with a set
|
||||
// of ranges.
|
||||
struct AccumulatedMutations {
|
||||
AccumulatedMutations() : lastChunkNumber(-1) {}
|
||||
|
||||
// Add a KV pair for this mutation chunk set
|
||||
// It will be accumulated onto serializedMutations if the chunk number is
|
||||
// the next expected value.
|
||||
void addChunk(int chunkNumber, const KeyValueRef& kv);
|
||||
|
||||
// Returns true if both
|
||||
// - 1 or more chunks were added to this set
|
||||
// - The header of the first chunk contains a valid protocol version and a length
|
||||
// that matches the bytes after the header in the combined value in serializedMutations
|
||||
bool isComplete() const;
|
||||
|
||||
// Returns true if a complete chunk contains any MutationRefs which intersect with any
|
||||
// range in ranges.
|
||||
// It is undefined behavior to run this if isComplete() does not return true.
|
||||
bool matchesAnyRange(const std::vector<KeyRange>& ranges) const;
|
||||
|
||||
std::vector<KeyValueRef> kvs;
|
||||
std::string serializedMutations;
|
||||
int lastChunkNumber;
|
||||
};
|
||||
|
||||
// Decodes a mutation log key, which contains (hash, commitVersion, chunkNumber) and
|
||||
// returns (commitVersion, chunkNumber)
|
||||
std::pair<Version, int32_t> decodeMutationLogKey(const StringRef& key);
|
||||
|
||||
// Decodes an encoded list of mutations in the format of:
|
||||
// [includeVersion:uint64_t][val_length:uint32_t][mutation_1][mutation_2]...[mutation_k],
|
||||
// where a mutation is encoded as:
|
||||
// [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][param1][param2]
|
||||
std::vector<MutationRef> decodeMutationLogValue(const StringRef& value);
|
||||
} // namespace fileBackup
|
||||
|
||||
#endif
|
||||
|
|
|
@ -20,37 +20,69 @@
|
|||
|
||||
#include "fdbclient/BackupContainerAzureBlobStore.h"
|
||||
#include "fdbrpc/AsyncFileEncrypted.h"
|
||||
#include <future>
|
||||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
namespace {
|
||||
|
||||
std::string const notFoundErrorCode = "404";
|
||||
|
||||
void printAzureError(std::string const& operationName, azure::storage_lite::storage_error const& err) {
|
||||
printf("(%s) : Error from Azure SDK : %s (%s) : %s",
|
||||
operationName.c_str(),
|
||||
err.code_name.c_str(),
|
||||
err.code.c_str(),
|
||||
err.message.c_str());
|
||||
}
|
||||
|
||||
template <class T>
|
||||
T waitAzureFuture(std::future<azure::storage_lite::storage_outcome<T>>&& f, std::string const& operationName) {
|
||||
auto outcome = f.get();
|
||||
if (outcome.success()) {
|
||||
return outcome.response();
|
||||
} else {
|
||||
printAzureError(operationName, outcome.error());
|
||||
throw backup_error();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
class BackupContainerAzureBlobStoreImpl {
|
||||
public:
|
||||
using AzureClient = azure::storage_lite::blob_client;
|
||||
|
||||
class ReadFile final : public IAsyncFile, ReferenceCounted<ReadFile> {
|
||||
AsyncTaskThread& asyncTaskThread;
|
||||
AsyncTaskThread* asyncTaskThread;
|
||||
std::string containerName;
|
||||
std::string blobName;
|
||||
AzureClient* client;
|
||||
std::shared_ptr<AzureClient> client;
|
||||
|
||||
public:
|
||||
ReadFile(AsyncTaskThread& asyncTaskThread,
|
||||
const std::string& containerName,
|
||||
const std::string& blobName,
|
||||
AzureClient* client)
|
||||
: asyncTaskThread(asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {}
|
||||
std::shared_ptr<AzureClient> const& client)
|
||||
: asyncTaskThread(&asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {}
|
||||
|
||||
void addref() override { ReferenceCounted<ReadFile>::addref(); }
|
||||
void delref() override { ReferenceCounted<ReadFile>::delref(); }
|
||||
Future<int> read(void* data, int length, int64_t offset) {
|
||||
return asyncTaskThread.execAsync([client = this->client,
|
||||
containerName = this->containerName,
|
||||
blobName = this->blobName,
|
||||
data,
|
||||
length,
|
||||
offset] {
|
||||
Future<int> read(void* data, int length, int64_t offset) override {
|
||||
TraceEvent(SevDebug, "BCAzureBlobStoreRead")
|
||||
.detail("Length", length)
|
||||
.detail("Offset", offset)
|
||||
.detail("ContainerName", containerName)
|
||||
.detail("BlobName", blobName);
|
||||
return asyncTaskThread->execAsync([client = this->client,
|
||||
containerName = this->containerName,
|
||||
blobName = this->blobName,
|
||||
data,
|
||||
length,
|
||||
offset] {
|
||||
std::ostringstream oss(std::ios::out | std::ios::binary);
|
||||
client->download_blob_to_stream(containerName, blobName, offset, length, oss);
|
||||
waitAzureFuture(client->download_blob_to_stream(containerName, blobName, offset, length, oss),
|
||||
"download_blob_to_stream");
|
||||
auto str = std::move(oss).str();
|
||||
memcpy(data, str.c_str(), str.size());
|
||||
return static_cast<int>(str.size());
|
||||
|
@ -61,19 +93,23 @@ public:
|
|||
Future<Void> truncate(int64_t size) override { throw file_not_writable(); }
|
||||
Future<Void> sync() override { throw file_not_writable(); }
|
||||
Future<int64_t> size() const override {
|
||||
return asyncTaskThread.execAsync([client = this->client,
|
||||
containerName = this->containerName,
|
||||
blobName = this->blobName] {
|
||||
return static_cast<int64_t>(client->get_blob_properties(containerName, blobName).get().response().size);
|
||||
});
|
||||
TraceEvent(SevDebug, "BCAzureBlobStoreReadFileSize")
|
||||
.detail("ContainerName", containerName)
|
||||
.detail("BlobName", blobName);
|
||||
return asyncTaskThread->execAsync(
|
||||
[client = this->client, containerName = this->containerName, blobName = this->blobName] {
|
||||
auto resp =
|
||||
waitAzureFuture(client->get_blob_properties(containerName, blobName), "get_blob_properties");
|
||||
return static_cast<int64_t>(resp.size);
|
||||
});
|
||||
}
|
||||
std::string getFilename() const override { return blobName; }
|
||||
int64_t debugFD() const override { return 0; }
|
||||
};
|
||||
|
||||
class WriteFile final : public IAsyncFile, ReferenceCounted<WriteFile> {
|
||||
AsyncTaskThread& asyncTaskThread;
|
||||
AzureClient* client;
|
||||
AsyncTaskThread* asyncTaskThread;
|
||||
std::shared_ptr<AzureClient> client;
|
||||
std::string containerName;
|
||||
std::string blobName;
|
||||
int64_t m_cursor{ 0 };
|
||||
|
@ -88,8 +124,8 @@ public:
|
|||
WriteFile(AsyncTaskThread& asyncTaskThread,
|
||||
const std::string& containerName,
|
||||
const std::string& blobName,
|
||||
AzureClient* client)
|
||||
: asyncTaskThread(asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {}
|
||||
std::shared_ptr<AzureClient> const& client)
|
||||
: asyncTaskThread(&asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {}
|
||||
|
||||
void addref() override { ReferenceCounted<WriteFile>::addref(); }
|
||||
void delref() override { ReferenceCounted<WriteFile>::delref(); }
|
||||
|
@ -114,22 +150,33 @@ public:
|
|||
return Void();
|
||||
}
|
||||
Future<Void> sync() override {
|
||||
TraceEvent(SevDebug, "BCAzureBlobStoreSync")
|
||||
.detail("Length", buffer.size())
|
||||
.detail("ContainerName", containerName)
|
||||
.detail("BlobName", blobName);
|
||||
auto movedBuffer = std::move(buffer);
|
||||
buffer.clear();
|
||||
return asyncTaskThread.execAsync([client = this->client,
|
||||
containerName = this->containerName,
|
||||
blobName = this->blobName,
|
||||
buffer = std::move(movedBuffer)] {
|
||||
std::istringstream iss(std::move(buffer));
|
||||
auto resp = client->append_block_from_stream(containerName, blobName, iss).get();
|
||||
return Void();
|
||||
});
|
||||
buffer = {};
|
||||
if (!movedBuffer.empty()) {
|
||||
return asyncTaskThread->execAsync([client = this->client,
|
||||
containerName = this->containerName,
|
||||
blobName = this->blobName,
|
||||
buffer = std::move(movedBuffer)] {
|
||||
std::istringstream iss(std::move(buffer));
|
||||
waitAzureFuture(client->append_block_from_stream(containerName, blobName, iss),
|
||||
"append_block_from_stream");
|
||||
return Void();
|
||||
});
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
Future<int64_t> size() const override {
|
||||
return asyncTaskThread.execAsync(
|
||||
TraceEvent(SevDebug, "BCAzureBlobStoreSize")
|
||||
.detail("ContainerName", containerName)
|
||||
.detail("BlobName", blobName);
|
||||
return asyncTaskThread->execAsync(
|
||||
[client = this->client, containerName = this->containerName, blobName = this->blobName] {
|
||||
auto resp = client->get_blob_properties(containerName, blobName).get().response();
|
||||
ASSERT(resp.valid()); // TODO: Should instead throw here
|
||||
auto resp =
|
||||
waitAzureFuture(client->get_blob_properties(containerName, blobName), "get_blob_properties");
|
||||
return static_cast<int64_t>(resp.size);
|
||||
});
|
||||
}
|
||||
|
@ -163,44 +210,53 @@ public:
|
|||
|
||||
static bool isDirectory(const std::string& blobName) { return blobName.size() && blobName.back() == '/'; }
|
||||
|
||||
// Hack to get around the fact that macros don't work inside actor functions
|
||||
static Reference<IAsyncFile> encryptFile(Reference<IAsyncFile> const& f, AsyncFileEncrypted::Mode mode) {
|
||||
Reference<IAsyncFile> result = f;
|
||||
#if ENCRYPTION_ENABLED
|
||||
result = makeReference<AsyncFileEncrypted>(result, mode);
|
||||
#endif
|
||||
return result;
|
||||
}
|
||||
|
||||
ACTOR static Future<Reference<IAsyncFile>> readFile(BackupContainerAzureBlobStore* self, std::string fileName) {
|
||||
bool exists = wait(self->blobExists(fileName));
|
||||
if (!exists) {
|
||||
throw file_not_found();
|
||||
}
|
||||
Reference<IAsyncFile> f =
|
||||
makeReference<ReadFile>(self->asyncTaskThread, self->containerName, fileName, self->client.get());
|
||||
#if ENCRYPTION_ENABLED
|
||||
makeReference<ReadFile>(self->asyncTaskThread, self->containerName, fileName, self->client);
|
||||
if (self->usesEncryption()) {
|
||||
f = makeReference<AsyncFileEncrypted>(f, false);
|
||||
f = encryptFile(f, AsyncFileEncrypted::Mode::READ_ONLY);
|
||||
}
|
||||
#endif
|
||||
return f;
|
||||
}
|
||||
|
||||
ACTOR static Future<Reference<IBackupFile>> writeFile(BackupContainerAzureBlobStore* self, std::string fileName) {
|
||||
TraceEvent(SevDebug, "BCAzureBlobStoreCreateWriteFile")
|
||||
.detail("ContainerName", self->containerName)
|
||||
.detail("FileName", fileName);
|
||||
wait(self->asyncTaskThread.execAsync(
|
||||
[client = self->client.get(), containerName = self->containerName, fileName = fileName] {
|
||||
auto outcome = client->create_append_blob(containerName, fileName).get();
|
||||
[client = self->client, containerName = self->containerName, fileName = fileName] {
|
||||
waitAzureFuture(client->create_append_blob(containerName, fileName), "create_append_blob");
|
||||
return Void();
|
||||
}));
|
||||
auto f = makeReference<WriteFile>(self->asyncTaskThread, self->containerName, fileName, self->client.get());
|
||||
#if ENCRYPTION_ENABLED
|
||||
Reference<IAsyncFile> f =
|
||||
makeReference<WriteFile>(self->asyncTaskThread, self->containerName, fileName, self->client);
|
||||
if (self->usesEncryption()) {
|
||||
f = makeReference<AsyncFileEncrypted>(f, true);
|
||||
f = encryptFile(f, AsyncFileEncrypted::Mode::APPEND_ONLY);
|
||||
}
|
||||
#endif
|
||||
return makeReference<BackupFile>(fileName, f);
|
||||
}
|
||||
|
||||
static void listFiles(AzureClient* client,
|
||||
static void listFiles(std::shared_ptr<AzureClient> const& client,
|
||||
const std::string& containerName,
|
||||
const std::string& path,
|
||||
std::function<bool(std::string const&)> folderPathFilter,
|
||||
BackupContainerFileSystem::FilesAndSizesT& result) {
|
||||
auto resp = client->list_blobs_segmented(containerName, "/", "", path).get().response();
|
||||
auto resp = waitAzureFuture(client->list_blobs_segmented(containerName, "/", "", path), "list_blobs_segmented");
|
||||
for (const auto& blob : resp.blobs) {
|
||||
if (isDirectory(blob.name) && folderPathFilter(blob.name)) {
|
||||
if (isDirectory(blob.name) && (!folderPathFilter || folderPathFilter(blob.name))) {
|
||||
listFiles(client, containerName, blob.name, folderPathFilter, result);
|
||||
} else {
|
||||
result.emplace_back(blob.name, blob.content_length);
|
||||
|
@ -214,8 +270,12 @@ public:
|
|||
BackupContainerFileSystem::FilesAndSizesT files = wait(self->listFiles());
|
||||
filesToDelete = files.size();
|
||||
}
|
||||
wait(self->asyncTaskThread.execAsync([containerName = self->containerName, client = self->client.get()] {
|
||||
client->delete_container(containerName).wait();
|
||||
TraceEvent(SevDebug, "BCAzureBlobStoreDeleteContainer")
|
||||
.detail("FilesToDelete", filesToDelete)
|
||||
.detail("ContainerName", self->containerName)
|
||||
.detail("TrackNumDeleted", pNumDeleted != nullptr);
|
||||
wait(self->asyncTaskThread.execAsync([containerName = self->containerName, client = self->client] {
|
||||
waitAzureFuture(client->delete_container(containerName), "delete_container");
|
||||
return Void();
|
||||
}));
|
||||
if (pNumDeleted) {
|
||||
|
@ -224,36 +284,44 @@ public:
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> create(BackupContainerAzureBlobStore* self) {
|
||||
state Future<Void> f1 =
|
||||
self->asyncTaskThread.execAsync([containerName = self->containerName, client = self->client.get()] {
|
||||
client->create_container(containerName).wait();
|
||||
return Void();
|
||||
});
|
||||
state Future<Void> f2 = self->usesEncryption() ? self->encryptionSetupComplete() : Void();
|
||||
return f1 && f2;
|
||||
}
|
||||
};
|
||||
|
||||
Future<bool> BackupContainerAzureBlobStore::blobExists(const std::string& fileName) {
|
||||
return asyncTaskThread.execAsync(
|
||||
[client = this->client.get(), containerName = this->containerName, fileName = fileName] {
|
||||
auto resp = client->get_blob_properties(containerName, fileName).get().response();
|
||||
return resp.valid();
|
||||
});
|
||||
TraceEvent(SevDebug, "BCAzureBlobStoreCheckExists")
|
||||
.detail("FileName", fileName)
|
||||
.detail("ContainerName", containerName);
|
||||
return asyncTaskThread.execAsync([client = this->client, containerName = this->containerName, fileName = fileName] {
|
||||
auto outcome = client->get_blob_properties(containerName, fileName).get();
|
||||
if (outcome.success()) {
|
||||
return true;
|
||||
} else {
|
||||
auto const& err = outcome.error();
|
||||
if (err.code == notFoundErrorCode) {
|
||||
return false;
|
||||
} else {
|
||||
printAzureError("get_blob_properties", err);
|
||||
throw backup_error();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
BackupContainerAzureBlobStore::BackupContainerAzureBlobStore(const NetworkAddress& address,
|
||||
BackupContainerAzureBlobStore::BackupContainerAzureBlobStore(const std::string& endpoint,
|
||||
const std::string& accountName,
|
||||
const std::string& containerName,
|
||||
const Optional<std::string>& encryptionKeyFileName)
|
||||
: containerName(containerName) {
|
||||
setEncryptionKey(encryptionKeyFileName);
|
||||
std::string accountKey = std::getenv("AZURE_KEY");
|
||||
const char* _accountKey = std::getenv("AZURE_KEY");
|
||||
if (!_accountKey) {
|
||||
TraceEvent(SevError, "EnvironmentVariableNotFound").detail("EnvVariable", "AZURE_KEY");
|
||||
// TODO: More descriptive error?
|
||||
throw backup_error();
|
||||
}
|
||||
std::string accountKey = _accountKey;
|
||||
auto credential = std::make_shared<azure::storage_lite::shared_key_credential>(accountName, accountKey);
|
||||
auto storageAccount = std::make_shared<azure::storage_lite::storage_account>(
|
||||
accountName, credential, false, format("http://%s/%s", address.toString().c_str(), accountName.c_str()));
|
||||
|
||||
accountName, credential, true, format("https://%s", endpoint.c_str()));
|
||||
client = std::make_unique<AzureClient>(storageAccount, 1);
|
||||
}
|
||||
|
||||
|
@ -265,12 +333,30 @@ void BackupContainerAzureBlobStore::delref() {
|
|||
}
|
||||
|
||||
Future<Void> BackupContainerAzureBlobStore::create() {
|
||||
return BackupContainerAzureBlobStoreImpl::create(this);
|
||||
TraceEvent(SevDebug, "BCAzureBlobStoreCreateContainer").detail("ContainerName", containerName);
|
||||
Future<Void> createContainerFuture =
|
||||
asyncTaskThread.execAsync([containerName = this->containerName, client = this->client] {
|
||||
waitAzureFuture(client->create_container(containerName), "create_container");
|
||||
return Void();
|
||||
});
|
||||
Future<Void> encryptionSetupFuture = usesEncryption() ? encryptionSetupComplete() : Void();
|
||||
return createContainerFuture && encryptionSetupFuture;
|
||||
}
|
||||
Future<bool> BackupContainerAzureBlobStore::exists() {
|
||||
return asyncTaskThread.execAsync([containerName = this->containerName, client = this->client.get()] {
|
||||
auto resp = client->get_container_properties(containerName).get().response();
|
||||
return resp.valid();
|
||||
TraceEvent(SevDebug, "BCAzureBlobStoreCheckContainerExists").detail("ContainerName", containerName);
|
||||
return asyncTaskThread.execAsync([containerName = this->containerName, client = this->client] {
|
||||
auto outcome = client->get_container_properties(containerName).get();
|
||||
if (outcome.success()) {
|
||||
return true;
|
||||
} else {
|
||||
auto const& err = outcome.error();
|
||||
if (err.code == notFoundErrorCode) {
|
||||
return false;
|
||||
} else {
|
||||
printAzureError("got_container_properties", err);
|
||||
throw backup_error();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -285,22 +371,23 @@ Future<Reference<IBackupFile>> BackupContainerAzureBlobStore::writeFile(const st
|
|||
Future<BackupContainerFileSystem::FilesAndSizesT> BackupContainerAzureBlobStore::listFiles(
|
||||
const std::string& path,
|
||||
std::function<bool(std::string const&)> folderPathFilter) {
|
||||
return asyncTaskThread.execAsync([client = this->client.get(),
|
||||
containerName = this->containerName,
|
||||
path = path,
|
||||
folderPathFilter = folderPathFilter] {
|
||||
FilesAndSizesT result;
|
||||
BackupContainerAzureBlobStoreImpl::listFiles(client, containerName, path, folderPathFilter, result);
|
||||
return result;
|
||||
});
|
||||
TraceEvent(SevDebug, "BCAzureBlobStoreListFiles").detail("ContainerName", containerName).detail("Path", path);
|
||||
return asyncTaskThread.execAsync(
|
||||
[client = this->client, containerName = this->containerName, path = path, folderPathFilter = folderPathFilter] {
|
||||
FilesAndSizesT result;
|
||||
BackupContainerAzureBlobStoreImpl::listFiles(client, containerName, path, folderPathFilter, result);
|
||||
return result;
|
||||
});
|
||||
}
|
||||
|
||||
Future<Void> BackupContainerAzureBlobStore::deleteFile(const std::string& fileName) {
|
||||
return asyncTaskThread.execAsync(
|
||||
[containerName = this->containerName, fileName = fileName, client = client.get()]() {
|
||||
client->delete_blob(containerName, fileName).wait();
|
||||
return Void();
|
||||
});
|
||||
TraceEvent(SevDebug, "BCAzureBlobStoreDeleteFile")
|
||||
.detail("ContainerName", containerName)
|
||||
.detail("FileName", fileName);
|
||||
return asyncTaskThread.execAsync([containerName = this->containerName, fileName = fileName, client = client]() {
|
||||
client->delete_blob(containerName, fileName).wait();
|
||||
return Void();
|
||||
});
|
||||
}
|
||||
|
||||
Future<Void> BackupContainerAzureBlobStore::deleteContainer(int* pNumDeleted) {
|
||||
|
@ -313,5 +400,5 @@ Future<std::vector<std::string>> BackupContainerAzureBlobStore::listURLs(const s
|
|||
}
|
||||
|
||||
std::string BackupContainerAzureBlobStore::getURLFormat() {
|
||||
return "azure://<ip>:<port>/<accountname>/<container>/<path_to_file>";
|
||||
return "azure://<accountname>@<endpoint>/<container>/";
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ class BackupContainerAzureBlobStore final : public BackupContainerFileSystem,
|
|||
ReferenceCounted<BackupContainerAzureBlobStore> {
|
||||
using AzureClient = azure::storage_lite::blob_client;
|
||||
|
||||
std::unique_ptr<AzureClient> client;
|
||||
std::shared_ptr<AzureClient> client;
|
||||
std::string containerName;
|
||||
AsyncTaskThread asyncTaskThread;
|
||||
|
||||
|
@ -42,7 +42,7 @@ class BackupContainerAzureBlobStore final : public BackupContainerFileSystem,
|
|||
friend class BackupContainerAzureBlobStoreImpl;
|
||||
|
||||
public:
|
||||
BackupContainerAzureBlobStore(const NetworkAddress& address,
|
||||
BackupContainerAzureBlobStore(const std::string& endpoint,
|
||||
const std::string& accountName,
|
||||
const std::string& containerName,
|
||||
const Optional<std::string>& encryptionKeyFileName);
|
||||
|
|
|
@ -691,9 +691,9 @@ private:
|
|||
int64_t blockEnd;
|
||||
};
|
||||
|
||||
ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeLogFileBlock(Reference<IAsyncFile> file,
|
||||
int64_t offset,
|
||||
int len) {
|
||||
ACTOR Future<Standalone<VectorRef<KeyValueRef>>> decodeMutationLogFileBlock(Reference<IAsyncFile> file,
|
||||
int64_t offset,
|
||||
int len) {
|
||||
state Standalone<StringRef> buf = makeString(len);
|
||||
int rLen = wait(file->read(mutateString(buf), len, offset));
|
||||
if (rLen != len)
|
||||
|
@ -3244,7 +3244,7 @@ REGISTER_TASKFUNC(RestoreRangeTaskFunc);
|
|||
|
||||
// Decodes a mutation log key, which contains (hash, commitVersion, chunkNumber) and
|
||||
// returns (commitVersion, chunkNumber)
|
||||
std::pair<Version, int32_t> decodeLogKey(const StringRef& key) {
|
||||
std::pair<Version, int32_t> decodeMutationLogKey(const StringRef& key) {
|
||||
ASSERT(key.size() == sizeof(uint8_t) + sizeof(Version) + sizeof(int32_t));
|
||||
|
||||
uint8_t hash;
|
||||
|
@ -3265,7 +3265,7 @@ std::pair<Version, int32_t> decodeLogKey(const StringRef& key) {
|
|||
// [includeVersion:uint64_t][val_length:uint32_t][mutation_1][mutation_2]...[mutation_k],
|
||||
// where a mutation is encoded as:
|
||||
// [type:uint32_t][keyLength:uint32_t][valueLength:uint32_t][param1][param2]
|
||||
std::vector<MutationRef> decodeLogValue(const StringRef& value) {
|
||||
std::vector<MutationRef> decodeMutationLogValue(const StringRef& value) {
|
||||
StringRefReader reader(value, restore_corrupted_data());
|
||||
|
||||
Version protocolVersion = reader.consume<uint64_t>();
|
||||
|
@ -3300,72 +3300,54 @@ std::vector<MutationRef> decodeLogValue(const StringRef& value) {
|
|||
return mutations;
|
||||
}
|
||||
|
||||
// Accumulates mutation log value chunks, as both a vector of chunks and as a combined chunk,
|
||||
// in chunk order, and can check the chunk set for completion or intersection with a set
|
||||
// of ranges.
|
||||
struct AccumulatedMutations {
|
||||
AccumulatedMutations() : lastChunkNumber(-1) {}
|
||||
|
||||
// Add a KV pair for this mutation chunk set
|
||||
// It will be accumulated onto serializedMutations if the chunk number is
|
||||
// the next expected value.
|
||||
void addChunk(int chunkNumber, const KeyValueRef& kv) {
|
||||
if (chunkNumber == lastChunkNumber + 1) {
|
||||
lastChunkNumber = chunkNumber;
|
||||
serializedMutations += kv.value.toString();
|
||||
} else {
|
||||
lastChunkNumber = -2;
|
||||
serializedMutations.clear();
|
||||
}
|
||||
kvs.push_back(kv);
|
||||
void AccumulatedMutations::addChunk(int chunkNumber, const KeyValueRef& kv) {
|
||||
if (chunkNumber == lastChunkNumber + 1) {
|
||||
lastChunkNumber = chunkNumber;
|
||||
serializedMutations += kv.value.toString();
|
||||
} else {
|
||||
lastChunkNumber = -2;
|
||||
serializedMutations.clear();
|
||||
}
|
||||
kvs.push_back(kv);
|
||||
}
|
||||
|
||||
// Returns true if both
|
||||
// - 1 or more chunks were added to this set
|
||||
// - The header of the first chunk contains a valid protocol version and a length
|
||||
// that matches the bytes after the header in the combined value in serializedMutations
|
||||
bool isComplete() const {
|
||||
if (lastChunkNumber >= 0) {
|
||||
StringRefReader reader(serializedMutations, restore_corrupted_data());
|
||||
bool AccumulatedMutations::isComplete() const {
|
||||
if (lastChunkNumber >= 0) {
|
||||
StringRefReader reader(serializedMutations, restore_corrupted_data());
|
||||
|
||||
Version protocolVersion = reader.consume<uint64_t>();
|
||||
if (protocolVersion <= 0x0FDB00A200090001) {
|
||||
throw incompatible_protocol_version();
|
||||
}
|
||||
|
||||
uint32_t vLen = reader.consume<uint32_t>();
|
||||
return vLen == reader.remainder().size();
|
||||
Version protocolVersion = reader.consume<uint64_t>();
|
||||
if (protocolVersion <= 0x0FDB00A200090001) {
|
||||
throw incompatible_protocol_version();
|
||||
}
|
||||
|
||||
return false;
|
||||
uint32_t vLen = reader.consume<uint32_t>();
|
||||
return vLen == reader.remainder().size();
|
||||
}
|
||||
|
||||
// Returns true if a complete chunk contains any MutationRefs which intersect with any
|
||||
// range in ranges.
|
||||
// It is undefined behavior to run this if isComplete() does not return true.
|
||||
bool matchesAnyRange(const std::vector<KeyRange>& ranges) const {
|
||||
std::vector<MutationRef> mutations = decodeLogValue(serializedMutations);
|
||||
for (auto& m : mutations) {
|
||||
for (auto& r : ranges) {
|
||||
if (m.type == MutationRef::ClearRange) {
|
||||
if (r.intersects(KeyRangeRef(m.param1, m.param2))) {
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
if (r.contains(m.param1)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Returns true if a complete chunk contains any MutationRefs which intersect with any
|
||||
// range in ranges.
|
||||
// It is undefined behavior to run this if isComplete() does not return true.
|
||||
bool AccumulatedMutations::matchesAnyRange(const std::vector<KeyRange>& ranges) const {
|
||||
std::vector<MutationRef> mutations = decodeMutationLogValue(serializedMutations);
|
||||
for (auto& m : mutations) {
|
||||
for (auto& r : ranges) {
|
||||
if (m.type == MutationRef::ClearRange) {
|
||||
if (r.intersects(KeyRangeRef(m.param1, m.param2))) {
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
if (r.contains(m.param1)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
std::vector<KeyValueRef> kvs;
|
||||
std::string serializedMutations;
|
||||
int lastChunkNumber;
|
||||
};
|
||||
return false;
|
||||
}
|
||||
|
||||
// Returns a vector of filtered KV refs from data which are either part of incomplete mutation groups OR complete
|
||||
// and have data relevant to one of the KV ranges in ranges
|
||||
|
@ -3373,7 +3355,7 @@ std::vector<KeyValueRef> filterLogMutationKVPairs(VectorRef<KeyValueRef> data, c
|
|||
std::unordered_map<Version, AccumulatedMutations> mutationBlocksByVersion;
|
||||
|
||||
for (auto& kv : data) {
|
||||
auto versionAndChunkNumber = decodeLogKey(kv.key);
|
||||
auto versionAndChunkNumber = decodeMutationLogKey(kv.key);
|
||||
mutationBlocksByVersion[versionAndChunkNumber.first].addChunk(versionAndChunkNumber.second, kv);
|
||||
}
|
||||
|
||||
|
@ -3444,7 +3426,7 @@ struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase {
|
|||
|
||||
state Key mutationLogPrefix = restore.mutationLogPrefix();
|
||||
state Reference<IAsyncFile> inFile = wait(bc->readFile(logFile.fileName));
|
||||
state Standalone<VectorRef<KeyValueRef>> dataOriginal = wait(decodeLogFileBlock(inFile, readOffset, readLen));
|
||||
state Standalone<VectorRef<KeyValueRef>> dataOriginal = wait(decodeMutationLogFileBlock(inFile, readOffset, readLen));
|
||||
|
||||
// Filter the KV pairs extracted from the log file block to remove any records known to not be needed for this
|
||||
// restore based on the restore range set.
|
||||
|
|
|
@ -77,7 +77,7 @@ void GlobalConfig::trigger(KeyRef key, std::function<void(std::optional<std::any
|
|||
}
|
||||
|
||||
void GlobalConfig::insert(KeyRef key, ValueRef value) {
|
||||
TraceEvent(SevInfo, "GlobalConfig_Insert").detail("Key", key).detail("Value", value);
|
||||
// TraceEvent(SevInfo, "GlobalConfig_Insert").detail("Key", key).detail("Value", value);
|
||||
data.erase(key);
|
||||
|
||||
Arena arena(key.expectedSize() + value.expectedSize());
|
||||
|
@ -113,7 +113,7 @@ void GlobalConfig::erase(Key key) {
|
|||
}
|
||||
|
||||
void GlobalConfig::erase(KeyRangeRef range) {
|
||||
TraceEvent(SevInfo, "GlobalConfig_Erase").detail("Range", range);
|
||||
// TraceEvent(SevInfo, "GlobalConfig_Erase").detail("Range", range);
|
||||
auto it = data.begin();
|
||||
while (it != data.end()) {
|
||||
if (range.contains(it->first)) {
|
||||
|
@ -167,7 +167,7 @@ ACTOR Future<Void> GlobalConfig::migrate(GlobalConfig* self) {
|
|||
// attempt this migration at the same time, sometimes resulting in
|
||||
// aborts due to conflicts. Purposefully avoid retrying, making this
|
||||
// migration best-effort.
|
||||
TraceEvent(SevInfo, "GlobalConfigMigrationError").detail("What", e.what());
|
||||
TraceEvent(SevInfo, "GlobalConfig_MigrationError").detail("What", e.what());
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
@ -176,7 +176,7 @@ ACTOR Future<Void> GlobalConfig::migrate(GlobalConfig* self) {
|
|||
// Updates local copy of global configuration by reading the entire key-range
|
||||
// from storage.
|
||||
ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self) {
|
||||
TraceEvent trace(SevInfo, "GlobalConfig_Refresh");
|
||||
// TraceEvent trace(SevInfo, "GlobalConfig_Refresh");
|
||||
self->erase(KeyRangeRef(""_sr, "\xff"_sr));
|
||||
|
||||
Transaction tr(self->cx);
|
||||
|
|
|
@ -108,7 +108,6 @@ public:
|
|||
// the key.
|
||||
template <typename T, typename std::enable_if<std::is_arithmetic<T>{}, bool>::type = true>
|
||||
const T get(KeyRef name, T defaultVal) {
|
||||
TraceEvent(SevInfo, "GlobalConfig_Get").detail("Key", name);
|
||||
try {
|
||||
auto configValue = get(name);
|
||||
if (configValue.isValid()) {
|
||||
|
|
|
@ -1698,7 +1698,8 @@ Database Database::createDatabase(Reference<ClusterConnectionFile> connFile,
|
|||
networkOptions.traceDirectory.get(),
|
||||
"trace",
|
||||
networkOptions.traceLogGroup,
|
||||
networkOptions.traceFileIdentifier);
|
||||
networkOptions.traceFileIdentifier,
|
||||
networkOptions.tracePartialFileSuffix);
|
||||
|
||||
TraceEvent("ClientStart")
|
||||
.detail("SourceVersion", getSourceVersion())
|
||||
|
@ -1856,6 +1857,10 @@ void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> valu
|
|||
throw invalid_option_value();
|
||||
}
|
||||
break;
|
||||
case FDBNetworkOptions::TRACE_PARTIAL_FILE_SUFFIX:
|
||||
validateOptionValuePresent(value);
|
||||
networkOptions.tracePartialFileSuffix = value.get().toString();
|
||||
break;
|
||||
case FDBNetworkOptions::KNOB: {
|
||||
validateOptionValuePresent(value);
|
||||
|
||||
|
@ -4088,8 +4093,7 @@ SpanID generateSpanID(int transactionTracingEnabled) {
|
|||
}
|
||||
}
|
||||
|
||||
Transaction::Transaction()
|
||||
: info(TaskPriority::DefaultEndpoint, generateSpanID(true)), span(info.spanID, "Transaction"_loc) {}
|
||||
Transaction::Transaction() = default;
|
||||
|
||||
Transaction::Transaction(Database const& cx)
|
||||
: info(cx->taskID, generateSpanID(cx->transactionTracingEnabled)), numErrors(0), options(cx),
|
||||
|
|
|
@ -68,6 +68,7 @@ struct NetworkOptions {
|
|||
std::string traceFormat;
|
||||
std::string traceClockSource;
|
||||
std::string traceFileIdentifier;
|
||||
std::string tracePartialFileSuffix;
|
||||
Optional<bool> logClientInfo;
|
||||
Reference<ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>> supportedVersions;
|
||||
bool runLoopProfilingEnabled;
|
||||
|
@ -179,6 +180,9 @@ struct TransactionInfo {
|
|||
// prefix/<key2> : '0' - any keys equal or larger than this key are (definitely) not conflicting keys
|
||||
std::shared_ptr<CoalescedKeyRangeMap<Value>> conflictingKeys;
|
||||
|
||||
// Only available so that Transaction can have a default constructor, for use in state variables
|
||||
TransactionInfo() : taskID(), spanID(), useProvisionalProxies() {}
|
||||
|
||||
explicit TransactionInfo(TaskPriority taskID, SpanID spanID)
|
||||
: taskID(taskID), spanID(spanID), useProvisionalProxies(false) {}
|
||||
};
|
||||
|
|
|
@ -71,7 +71,7 @@ class SimpleConfigTransactionImpl {
|
|||
if (reply.value.present()) {
|
||||
return reply.value.get().toValue();
|
||||
} else {
|
||||
return {};
|
||||
return Optional<Value>{};
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -57,6 +57,9 @@ description is not currently required but encouraged.
|
|||
<Option name="trace_file_identifier" code="36"
|
||||
paramType="String" paramDescription="The identifier that will be part of all trace file names"
|
||||
description="Once provided, this string will be used to replace the port/PID in the log file names." />
|
||||
<Option name="trace_partial_file_suffix" code="39"
|
||||
paramType="String" paramDesciption="Append this suffix to partially written log files. When a log file is complete, it is renamed to remove the suffix. No separator is added between the file and the suffix. If you want to add a file extension, you should include the separator - e.g. '.tmp' instead of 'tmp' to add the 'tmp' extension."
|
||||
description="" />
|
||||
<Option name="knob" code="40"
|
||||
paramType="String" paramDescription="knob_name=knob_value"
|
||||
description="Set internal tuning or debugging knobs"/>
|
||||
|
|
|
@ -399,6 +399,14 @@ ACTOR Future<Void> releaseResolvingAfter(ProxyCommitData* self, Future<Void> rel
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<ResolveTransactionBatchReply> trackResolutionMetrics(Reference<Histogram> dist,
|
||||
Future<ResolveTransactionBatchReply> in) {
|
||||
state double startTime = now();
|
||||
ResolveTransactionBatchReply reply = wait(in);
|
||||
dist->sampleSeconds(now() - startTime);
|
||||
return reply;
|
||||
}
|
||||
|
||||
namespace CommitBatch {
|
||||
|
||||
struct CommitBatchContext {
|
||||
|
@ -579,6 +587,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
|
|||
TEST(pProxyCommitData->latestLocalCommitBatchResolving.get() < localBatchNumber - 1); // Wait for local batch
|
||||
wait(pProxyCommitData->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber - 1));
|
||||
double queuingDelay = g_network->now() - timeStart;
|
||||
pProxyCommitData->stats.commitBatchQueuingDist->sampleSeconds(queuingDelay);
|
||||
if ((queuingDelay > (double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS / SERVER_KNOBS->VERSIONS_PER_SECOND ||
|
||||
(g_network->isSimulated() && BUGGIFY_WITH_PROB(0.01))) &&
|
||||
SERVER_KNOBS->PROXY_REJECT_BATCH_QUEUED_TOO_LONG && canReject(trs)) {
|
||||
|
@ -619,6 +628,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
|
|||
pProxyCommitData->commitVersionRequestNumber++,
|
||||
pProxyCommitData->mostRecentProcessedRequestNumber,
|
||||
pProxyCommitData->dbgid);
|
||||
state double beforeGettingCommitVersion = now();
|
||||
GetCommitVersionReply versionReply = wait(brokenPromiseToNever(
|
||||
pProxyCommitData->master.getCommitVersion.getReply(req, TaskPriority::ProxyMasterVersionReply)));
|
||||
|
||||
|
@ -626,6 +636,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
|
|||
|
||||
pProxyCommitData->stats.txnCommitVersionAssigned += trs.size();
|
||||
pProxyCommitData->stats.lastCommitVersionAssigned = versionReply.version;
|
||||
pProxyCommitData->stats.getCommitVersionDist->sampleSeconds(now() - beforeGettingCommitVersion);
|
||||
|
||||
self->commitVersion = versionReply.version;
|
||||
self->prevVersion = versionReply.prevVersion;
|
||||
|
@ -646,6 +657,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
|
|||
}
|
||||
|
||||
ACTOR Future<Void> getResolution(CommitBatchContext* self) {
|
||||
state double resolutionStart = now();
|
||||
// Sending these requests is the fuzzy border between phase 1 and phase 2; it could conceivably overlap with
|
||||
// resolution processing but is still using CPU
|
||||
ProxyCommitData* pProxyCommitData = self->pProxyCommitData;
|
||||
|
@ -674,8 +686,9 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
|
|||
std::vector<Future<ResolveTransactionBatchReply>> replies;
|
||||
for (int r = 0; r < pProxyCommitData->resolvers.size(); r++) {
|
||||
requests.requests[r].debugID = self->debugID;
|
||||
replies.push_back(brokenPromiseToNever(
|
||||
pProxyCommitData->resolvers[r].resolve.getReply(requests.requests[r], TaskPriority::ProxyResolverReply)));
|
||||
replies.push_back(trackResolutionMetrics(pProxyCommitData->stats.resolverDist[r],
|
||||
brokenPromiseToNever(pProxyCommitData->resolvers[r].resolve.getReply(
|
||||
requests.requests[r], TaskPriority::ProxyResolverReply))));
|
||||
}
|
||||
|
||||
self->transactionResolverMap.swap(requests.transactionResolverMap);
|
||||
|
@ -700,6 +713,7 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
|
|||
std::vector<ResolveTransactionBatchReply> resolutionResp = wait(getAll(replies));
|
||||
self->resolution.swap(*const_cast<std::vector<ResolveTransactionBatchReply>*>(&resolutionResp));
|
||||
|
||||
self->pProxyCommitData->stats.resolutionDist->sampleSeconds(now() - resolutionStart);
|
||||
if (self->debugID.present()) {
|
||||
g_traceBatch.addEvent(
|
||||
"CommitDebug", self->debugID.get().first(), "CommitProxyServer.commitBatch.AfterResolution");
|
||||
|
@ -1052,6 +1066,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
|
|||
}
|
||||
|
||||
ACTOR Future<Void> postResolution(CommitBatchContext* self) {
|
||||
state double postResolutionStart = now();
|
||||
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
|
||||
state std::vector<CommitTransactionRequest>& trs = self->trs;
|
||||
state const int64_t localBatchNumber = self->localBatchNumber;
|
||||
|
@ -1061,6 +1076,8 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
|
|||
bool queuedCommits = pProxyCommitData->latestLocalCommitBatchLogging.get() < localBatchNumber - 1;
|
||||
TEST(queuedCommits); // Queuing post-resolution commit processing
|
||||
wait(pProxyCommitData->latestLocalCommitBatchLogging.whenAtLeast(localBatchNumber - 1));
|
||||
state double postResolutionQueuing = now();
|
||||
pProxyCommitData->stats.postResolutionDist->sampleSeconds(postResolutionQueuing - postResolutionStart);
|
||||
wait(yield(TaskPriority::ProxyCommitYield1));
|
||||
|
||||
self->computeStart = g_network->timer();
|
||||
|
@ -1209,10 +1226,12 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
|
|||
1e9 * pProxyCommitData->commitComputePerOperation[self->latencyBucket]);
|
||||
}
|
||||
|
||||
pProxyCommitData->stats.processingMutationDist->sampleSeconds(now() - postResolutionQueuing);
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> transactionLogging(CommitBatchContext* self) {
|
||||
state double tLoggingStart = now();
|
||||
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
|
||||
state Span span("MP:transactionLogging"_loc, self->span.context);
|
||||
|
||||
|
@ -1246,11 +1265,12 @@ ACTOR Future<Void> transactionLogging(CommitBatchContext* self) {
|
|||
pProxyCommitData->txsPopVersions.emplace_back(self->commitVersion, self->msg.popTo);
|
||||
}
|
||||
pProxyCommitData->logSystem->popTxs(self->msg.popTo);
|
||||
|
||||
pProxyCommitData->stats.tlogLoggingDist->sampleSeconds(now() - tLoggingStart);
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> reply(CommitBatchContext* self) {
|
||||
state double replyStart = now();
|
||||
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
|
||||
state Span span("MP:reply"_loc, self->span.context);
|
||||
|
||||
|
@ -1382,7 +1402,7 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
|
|||
pProxyCommitData->commitBatchesMemBytesCount -= self->currentBatchMemBytesCount;
|
||||
ASSERT_ABORT(pProxyCommitData->commitBatchesMemBytesCount >= 0);
|
||||
wait(self->releaseFuture);
|
||||
|
||||
pProxyCommitData->stats.replyCommitDist->sampleSeconds(now() - replyStart);
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -1853,7 +1873,10 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
|
|||
|
||||
commitData.resolvers = commitData.db->get().resolvers;
|
||||
ASSERT(commitData.resolvers.size() != 0);
|
||||
|
||||
for (int i = 0; i < commitData.resolvers.size(); ++i) {
|
||||
commitData.stats.resolverDist.push_back(Histogram::getHistogram(
|
||||
LiteralStringRef("CommitProxy"), "ToResolver_" + commitData.resolvers[i].id().toString(), Histogram::Unit::microseconds));
|
||||
}
|
||||
auto rs = commitData.keyResolvers.modify(allKeys);
|
||||
for (auto r = rs.begin(); r != rs.end(); ++r)
|
||||
r->value().emplace_back(0, 0);
|
||||
|
|
|
@ -3967,7 +3967,6 @@ ACTOR Future<Void> updateNextWigglingStoragePID(DDTeamCollection* teamCollection
|
|||
ACTOR Future<Void> perpetualStorageWiggleIterator(AsyncVar<bool>* stopSignal,
|
||||
FutureStream<Void> finishStorageWiggleSignal,
|
||||
DDTeamCollection* teamCollection) {
|
||||
state int lastFinishTime = now();
|
||||
loop {
|
||||
choose {
|
||||
when(wait(stopSignal->onChange())) {}
|
||||
|
@ -4047,8 +4046,7 @@ ACTOR Future<Void> clusterHealthCheckForPerpetualWiggle(DDTeamCollection* self,
|
|||
// cluster is unhealthy and restarted once the cluster is healthy again.
|
||||
ACTOR Future<Void> perpetualStorageWiggler(AsyncVar<bool>* stopSignal,
|
||||
PromiseStream<Void> finishStorageWiggleSignal,
|
||||
DDTeamCollection* self,
|
||||
const DDEnabledState* ddEnabledState) {
|
||||
DDTeamCollection* self) {
|
||||
state Future<Void> watchFuture = Never();
|
||||
state Future<Void> moveFinishFuture = Never();
|
||||
state int extraTeamCount = 0;
|
||||
|
@ -4132,8 +4130,7 @@ ACTOR Future<Void> perpetualStorageWiggler(AsyncVar<bool>* stopSignal,
|
|||
// This coroutine sets a watch to monitor the value change of `perpetualStorageWiggleKey` which is controlled by command
|
||||
// `configure perpetual_storage_wiggle=$value` if the value is 1, this actor start 2 actors,
|
||||
// `perpetualStorageWiggleIterator` and `perpetualStorageWiggler`. Otherwise, it sends stop signal to them.
|
||||
ACTOR Future<Void> monitorPerpetualStorageWiggle(DDTeamCollection* teamCollection,
|
||||
const DDEnabledState* ddEnabledState) {
|
||||
ACTOR Future<Void> monitorPerpetualStorageWiggle(DDTeamCollection* teamCollection) {
|
||||
state int speed = 0;
|
||||
state AsyncVar<bool> stopWiggleSignal(true);
|
||||
state PromiseStream<Void> finishStorageWiggleSignal;
|
||||
|
@ -4158,8 +4155,8 @@ ACTOR Future<Void> monitorPerpetualStorageWiggle(DDTeamCollection* teamCollectio
|
|||
stopWiggleSignal.set(false);
|
||||
collection.add(perpetualStorageWiggleIterator(
|
||||
&stopWiggleSignal, finishStorageWiggleSignal.getFuture(), teamCollection));
|
||||
collection.add(perpetualStorageWiggler(
|
||||
&stopWiggleSignal, finishStorageWiggleSignal, teamCollection, ddEnabledState));
|
||||
collection.add(
|
||||
perpetualStorageWiggler(&stopWiggleSignal, finishStorageWiggleSignal, teamCollection));
|
||||
TraceEvent("PerpetualStorageWiggleOpen", teamCollection->distributorId).log();
|
||||
} else if (speed == 0) {
|
||||
if (!stopWiggleSignal.get()) {
|
||||
|
@ -5564,7 +5561,7 @@ ACTOR Future<Void> dataDistributionTeamCollection(Reference<DDTeamCollection> te
|
|||
self->addActor.send(waitHealthyZoneChange(self));
|
||||
|
||||
if (self->primary) { // the primary dc also handle the satellite dc's perpetual wiggling
|
||||
self->addActor.send(monitorPerpetualStorageWiggle(self, ddEnabledState));
|
||||
self->addActor.send(monitorPerpetualStorageWiggle(self));
|
||||
}
|
||||
// SOMEDAY: Monitor FF/serverList for (new) servers that aren't in allServers and add or remove them
|
||||
|
||||
|
|
|
@ -57,6 +57,8 @@ struct GrvProxyStats {
|
|||
Deque<int> requestBuckets;
|
||||
double lastBucketBegin;
|
||||
double bucketInterval;
|
||||
Reference<Histogram> grvConfirmEpochLiveDist;
|
||||
Reference<Histogram> grvGetCommittedVersionRpcDist;
|
||||
|
||||
void updateRequestBuckets() {
|
||||
while (now() - lastBucketBegin > bucketInterval) {
|
||||
|
@ -112,7 +114,13 @@ struct GrvProxyStats {
|
|||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
recentRequests(0), lastBucketBegin(now()),
|
||||
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE / FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS) {
|
||||
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE / FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
|
||||
grvConfirmEpochLiveDist(Histogram::getHistogram(LiteralStringRef("GrvProxy"),
|
||||
LiteralStringRef("GrvConfirmEpochLive"),
|
||||
Histogram::Unit::microseconds)),
|
||||
grvGetCommittedVersionRpcDist(Histogram::getHistogram(LiteralStringRef("GrvProxy"),
|
||||
LiteralStringRef("GrvGetCommittedVersionRpc"),
|
||||
Histogram::Unit::microseconds)) {
|
||||
// The rate at which the limit(budget) is allowed to grow.
|
||||
specialCounter(cc, "SystemGRVQueueSize", [this]() { return this->systemGRVQueueSize; });
|
||||
specialCounter(cc, "DefaultGRVQueueSize", [this]() { return this->defaultGRVQueueSize; });
|
||||
|
@ -526,6 +534,8 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan,
|
|||
// and no other proxy could have already committed anything without first ending the epoch
|
||||
state Span span("GP:getLiveCommittedVersion"_loc, parentSpan);
|
||||
++grvProxyData->stats.txnStartBatch;
|
||||
|
||||
state double grvStart = now();
|
||||
state Future<GetRawCommittedVersionReply> replyFromMasterFuture;
|
||||
replyFromMasterFuture = grvProxyData->master.getLiveCommittedVersion.getReply(
|
||||
GetRawCommittedVersionRequest(span.context, debugID), TaskPriority::GetLiveCommittedVersionReply);
|
||||
|
@ -537,6 +547,8 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan,
|
|||
wait(grvProxyData->lastCommitTime.whenAtLeast(now() - SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION));
|
||||
}
|
||||
|
||||
state double grvConfirmEpochLive = now();
|
||||
grvProxyData->stats.grvConfirmEpochLiveDist->sampleSeconds(grvConfirmEpochLive - grvStart);
|
||||
if (debugID.present()) {
|
||||
g_traceBatch.addEvent(
|
||||
"TransactionDebug", debugID.get().first(), "GrvProxyServer.getLiveCommittedVersion.confirmEpochLive");
|
||||
|
@ -546,6 +558,7 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan,
|
|||
grvProxyData->minKnownCommittedVersion =
|
||||
std::max(grvProxyData->minKnownCommittedVersion, repFromMaster.minKnownCommittedVersion);
|
||||
|
||||
grvProxyData->stats.grvGetCommittedVersionRpcDist->sampleSeconds(now() - grvConfirmEpochLive);
|
||||
GetReadVersionReply rep;
|
||||
rep.version = repFromMaster.version;
|
||||
rep.locked = repFromMaster.locked;
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
#include "fdbserver/MutationTracking.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/Error.h"
|
||||
#include "flow/Histogram.h"
|
||||
#include "flow/IndexedSet.h"
|
||||
#include "flow/Knobs.h"
|
||||
#include "fdbrpc/ReplicationPolicy.h"
|
||||
|
@ -57,6 +58,7 @@ public:
|
|||
std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> logRouters;
|
||||
std::vector<Reference<AsyncVar<OptionalInterface<BackupInterface>>>> backupWorkers;
|
||||
std::vector<Reference<ConnectionResetInfo>> connectionResetTrackers;
|
||||
std::vector<Reference<Histogram>> tlogPushDistTrackers;
|
||||
int32_t tLogWriteAntiQuorum;
|
||||
int32_t tLogReplicationFactor;
|
||||
std::vector<LocalityData> tLogLocalities; // Stores the localities of the log servers
|
||||
|
|
|
@ -74,6 +74,15 @@ struct ProxyStats {
|
|||
int64_t maxComputeNS;
|
||||
int64_t minComputeNS;
|
||||
|
||||
Reference<Histogram> commitBatchQueuingDist;
|
||||
Reference<Histogram> getCommitVersionDist;
|
||||
std::vector<Reference<Histogram>> resolverDist;
|
||||
Reference<Histogram> resolutionDist;
|
||||
Reference<Histogram> postResolutionDist;
|
||||
Reference<Histogram> processingMutationDist;
|
||||
Reference<Histogram> tlogLoggingDist;
|
||||
Reference<Histogram> replyCommitDist;
|
||||
|
||||
int64_t getAndResetMaxCompute() {
|
||||
int64_t r = maxComputeNS;
|
||||
maxComputeNS = 0;
|
||||
|
@ -113,7 +122,28 @@ struct ProxyStats {
|
|||
id,
|
||||
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
|
||||
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
|
||||
maxComputeNS(0), minComputeNS(1e12) {
|
||||
maxComputeNS(0), minComputeNS(1e12),
|
||||
commitBatchQueuingDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
|
||||
LiteralStringRef("CommitBatchQueuing"),
|
||||
Histogram::Unit::microseconds)),
|
||||
getCommitVersionDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
|
||||
LiteralStringRef("GetCommitVersion"),
|
||||
Histogram::Unit::microseconds)),
|
||||
resolutionDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
|
||||
LiteralStringRef("Resolution"),
|
||||
Histogram::Unit::microseconds)),
|
||||
postResolutionDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
|
||||
LiteralStringRef("PostResolutionQueuing"),
|
||||
Histogram::Unit::microseconds)),
|
||||
processingMutationDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
|
||||
LiteralStringRef("ProcessingMutation"),
|
||||
Histogram::Unit::microseconds)),
|
||||
tlogLoggingDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
|
||||
LiteralStringRef("TlogLogging"),
|
||||
Histogram::Unit::microseconds)),
|
||||
replyCommitDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
|
||||
LiteralStringRef("ReplyCommit"),
|
||||
Histogram::Unit::microseconds)) {
|
||||
specialCounter(cc, "LastAssignedCommitVersion", [this]() { return this->lastCommitVersionAssigned; });
|
||||
specialCounter(cc, "Version", [pVersion]() { return *pVersion; });
|
||||
specialCounter(cc, "CommittedVersion", [pCommittedVersion]() { return pCommittedVersion->get(); });
|
||||
|
|
|
@ -527,6 +527,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
|
|||
}
|
||||
|
||||
ACTOR static Future<TLogCommitReply> recordPushMetrics(Reference<ConnectionResetInfo> self,
|
||||
Reference<Histogram> dist,
|
||||
NetworkAddress addr,
|
||||
Future<TLogCommitReply> in) {
|
||||
state double startTime = now();
|
||||
|
@ -541,6 +542,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
|
|||
self->fastReplies++;
|
||||
}
|
||||
}
|
||||
dist->sampleSeconds(now() - startTime);
|
||||
return t;
|
||||
}
|
||||
|
||||
|
@ -563,12 +565,21 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
|
|||
it->connectionResetTrackers.push_back(makeReference<ConnectionResetInfo>());
|
||||
}
|
||||
}
|
||||
if (it->tlogPushDistTrackers.empty()) {
|
||||
for (int i = 0; i < it->logServers.size(); i++) {
|
||||
it->tlogPushDistTrackers.push_back(
|
||||
Histogram::getHistogram("ToTlog_" + it->logServers[i]->get().interf().uniqueID.toString(),
|
||||
it->logServers[i]->get().interf().address().toString(),
|
||||
Histogram::Unit::microseconds));
|
||||
}
|
||||
}
|
||||
vector<Future<Void>> tLogCommitResults;
|
||||
for (int loc = 0; loc < it->logServers.size(); loc++) {
|
||||
Standalone<StringRef> msg = data.getMessages(location);
|
||||
data.recordEmptyMessage(location, msg);
|
||||
allReplies.push_back(recordPushMetrics(
|
||||
it->connectionResetTrackers[loc],
|
||||
it->tlogPushDistTrackers[loc],
|
||||
it->logServers[loc]->get().interf().address(),
|
||||
it->logServers[loc]->get().interf().commit.getReply(TLogCommitRequest(spanContext,
|
||||
msg.arena(),
|
||||
|
|
|
@ -219,6 +219,7 @@ public:
|
|||
|
||||
template <class U>
|
||||
Optional(const U& t) : impl(std::in_place, t) {}
|
||||
Optional(T&& t) : impl(std::in_place, std::move(t)) {}
|
||||
|
||||
/* This conversion constructor was nice, but combined with the prior constructor it means that Optional<int> can be
|
||||
converted to Optional<Optional<int>> in the wrong way (a non-present Optional<int> converts to a non-present
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
*/
|
||||
|
||||
#include "flow/FileTraceLogWriter.h"
|
||||
#include "flow/Platform.h"
|
||||
#include "flow/flow.h"
|
||||
#include "flow/ThreadHelper.actor.h"
|
||||
|
||||
|
@ -87,11 +88,13 @@ FileTraceLogWriter::FileTraceLogWriter(std::string const& directory,
|
|||
std::string const& processName,
|
||||
std::string const& basename,
|
||||
std::string const& extension,
|
||||
std::string const& tracePartialFileSuffix,
|
||||
uint64_t maxLogsSize,
|
||||
std::function<void()> const& onError,
|
||||
Reference<ITraceLogIssuesReporter> const& issues)
|
||||
: directory(directory), processName(processName), basename(basename), extension(extension), maxLogsSize(maxLogsSize),
|
||||
traceFileFD(-1), index(0), issues(issues), onError(onError) {}
|
||||
: directory(directory), processName(processName), basename(basename), extension(extension),
|
||||
tracePartialFileSuffix(tracePartialFileSuffix), maxLogsSize(maxLogsSize), traceFileFD(-1), index(0), issues(issues),
|
||||
onError(onError) {}
|
||||
|
||||
void FileTraceLogWriter::addref() {
|
||||
ReferenceCounted<FileTraceLogWriter>::addref();
|
||||
|
@ -158,7 +161,8 @@ void FileTraceLogWriter::open() {
|
|||
// log10(index) < 10
|
||||
UNSTOPPABLE_ASSERT(indexWidth < 10);
|
||||
|
||||
auto finalname = format("%s.%d.%d.%s", basename.c_str(), indexWidth, index, extension.c_str());
|
||||
finalname =
|
||||
format("%s.%d.%d.%s%s", basename.c_str(), indexWidth, index, extension.c_str(), tracePartialFileSuffix.c_str());
|
||||
while ((traceFileFD = __open(finalname.c_str(), TRACEFILE_FLAGS, TRACEFILE_MODE)) == -1) {
|
||||
lastError(errno);
|
||||
if (errno == EEXIST) {
|
||||
|
@ -166,7 +170,12 @@ void FileTraceLogWriter::open() {
|
|||
indexWidth = unsigned(::floor(log10f(float(index))));
|
||||
|
||||
UNSTOPPABLE_ASSERT(indexWidth < 10);
|
||||
finalname = format("%s.%d.%d.%s", basename.c_str(), indexWidth, index, extension.c_str());
|
||||
finalname = format("%s.%d.%d.%s%s",
|
||||
basename.c_str(),
|
||||
indexWidth,
|
||||
index,
|
||||
extension.c_str(),
|
||||
tracePartialFileSuffix.c_str());
|
||||
} else {
|
||||
fprintf(stderr,
|
||||
"ERROR: could not create trace log file `%s' (%d: %s)\n",
|
||||
|
@ -178,7 +187,7 @@ void FileTraceLogWriter::open() {
|
|||
|
||||
int errorNum = errno;
|
||||
onMainThreadVoid(
|
||||
[finalname, errorNum] {
|
||||
[finalname = finalname, errorNum] {
|
||||
TraceEvent(SevWarnAlways, "TraceFileOpenError")
|
||||
.detail("Filename", finalname)
|
||||
.detail("ErrorCode", errorNum)
|
||||
|
@ -201,6 +210,11 @@ void FileTraceLogWriter::close() {
|
|||
while (__close(traceFileFD))
|
||||
threadSleep(0.1);
|
||||
}
|
||||
traceFileFD = -1;
|
||||
if (!tracePartialFileSuffix.empty()) {
|
||||
renameFile(finalname, finalname.substr(0, finalname.size() - tracePartialFileSuffix.size()));
|
||||
}
|
||||
finalname = "";
|
||||
}
|
||||
|
||||
void FileTraceLogWriter::roll() {
|
||||
|
@ -216,6 +230,15 @@ void FileTraceLogWriter::cleanupTraceFiles() {
|
|||
// Setting maxLogsSize=0 disables trace file cleanup based on dir size
|
||||
if (!g_network->isSimulated() && maxLogsSize > 0) {
|
||||
try {
|
||||
// Rename/finalize any stray files ending in tracePartialFileSuffix for this process.
|
||||
if (!tracePartialFileSuffix.empty()) {
|
||||
for (const auto& f : platform::listFiles(directory, tracePartialFileSuffix)) {
|
||||
if (f.substr(0, processName.length()) == processName) {
|
||||
renameFile(f, f.substr(0, f.size() - tracePartialFileSuffix.size()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<std::string> existingFiles = platform::listFiles(directory, extension);
|
||||
std::vector<std::string> existingTraceFiles;
|
||||
|
||||
|
|
|
@ -51,6 +51,8 @@ private:
|
|||
std::string processName;
|
||||
std::string basename;
|
||||
std::string extension;
|
||||
std::string finalname;
|
||||
std::string tracePartialFileSuffix;
|
||||
|
||||
uint64_t maxLogsSize;
|
||||
int traceFileFD;
|
||||
|
@ -66,6 +68,7 @@ public:
|
|||
std::string const& processName,
|
||||
std::string const& basename,
|
||||
std::string const& extension,
|
||||
std::string const& tracePartialFileSuffix,
|
||||
uint64_t maxLogsSize,
|
||||
std::function<void()> const& onError,
|
||||
Reference<ITraceLogIssuesReporter> const& issues);
|
||||
|
|
|
@ -117,10 +117,12 @@ void Histogram::writeToLog() {
|
|||
TraceEvent e(SevInfo, "Histogram");
|
||||
e.detail("Group", group).detail("Op", op).detail("Unit", UnitToStringMapper[(size_t)unit]);
|
||||
|
||||
int totalCount = 0;
|
||||
for (uint32_t i = 0; i < 32; i++) {
|
||||
uint64_t value = uint64_t(1) << (i + 1);
|
||||
|
||||
if (buckets[i]) {
|
||||
totalCount += buckets[i];
|
||||
switch (unit) {
|
||||
case Unit::microseconds:
|
||||
e.detail(format("LessThan%u.%03u", value / 1000, value % 1000), buckets[i]);
|
||||
|
@ -140,6 +142,7 @@ void Histogram::writeToLog() {
|
|||
}
|
||||
}
|
||||
}
|
||||
e.detail("TotalCount", totalCount);
|
||||
}
|
||||
|
||||
std::string Histogram::drawHistogram() {
|
||||
|
|
|
@ -265,43 +265,29 @@ static std::string toLower(std::string const& name) {
|
|||
return lower_name;
|
||||
}
|
||||
|
||||
template <class T>
|
||||
static T parseIntegral(std::string const& value) {
|
||||
T v;
|
||||
int n = 0;
|
||||
if (StringRef(value).startsWith(LiteralStringRef("0x"))) {
|
||||
if (sscanf(value.c_str(), "0x%" SCNx64 "%n", &v, &n) != 1 || n != value.size())
|
||||
throw invalid_option_value();
|
||||
} else {
|
||||
if (sscanf(value.c_str(), "%" SCNd64 "%n", &v, &n) != 1 || n != value.size())
|
||||
throw invalid_option_value();
|
||||
}
|
||||
return v;
|
||||
}
|
||||
|
||||
ParsedKnobValue Knobs::parseKnobValue(std::string const& knob, std::string const& value) const {
|
||||
if (double_knobs.count(knob)) {
|
||||
double v;
|
||||
int n = 0;
|
||||
if (sscanf(value.c_str(), "%lf%n", &v, &n) != 1 || n != value.size())
|
||||
throw invalid_option_value();
|
||||
return v;
|
||||
} else if (bool_knobs.count(knob)) {
|
||||
if (toLower(value) == "true") {
|
||||
return true;
|
||||
} else if (toLower(value) == "false") {
|
||||
return false;
|
||||
} else {
|
||||
return parseIntegral<bool>(value);
|
||||
try {
|
||||
if (double_knobs.count(knob)) {
|
||||
return std::stod(value);
|
||||
} else if (bool_knobs.count(knob)) {
|
||||
if (toLower(value) == "true") {
|
||||
return true;
|
||||
} else if (toLower(value) == "false") {
|
||||
return false;
|
||||
} else {
|
||||
return (std::stoi(value) != 0);
|
||||
}
|
||||
} else if (int64_knobs.count(knob)) {
|
||||
return static_cast<int64_t>(std::stol(value, nullptr, 0));
|
||||
} else if (int_knobs.count(knob)) {
|
||||
return std::stoi(value, nullptr, 0);
|
||||
} else if (string_knobs.count(knob)) {
|
||||
return value;
|
||||
}
|
||||
} else if (int64_knobs.count(knob)) {
|
||||
return parseIntegral<int64_t>(value);
|
||||
} else if (int_knobs.count(knob)) {
|
||||
return parseIntegral<int>(value);
|
||||
} else if (string_knobs.count(knob)) {
|
||||
return value;
|
||||
return NoKnobFound{};
|
||||
} catch (...) {
|
||||
throw invalid_option_value();
|
||||
}
|
||||
return NoKnobFound{};
|
||||
}
|
||||
|
||||
bool Knobs::setKnob(std::string const& knob, int value) {
|
||||
|
|
|
@ -90,6 +90,7 @@ EncryptionStreamCipher::EncryptionStreamCipher(const StreamCipher::Key& key, con
|
|||
}
|
||||
|
||||
StringRef EncryptionStreamCipher::encrypt(unsigned char const* plaintext, int len, Arena& arena) {
|
||||
TEST(true); // Encrypting data with StreamCipher
|
||||
auto ciphertext = new (arena) unsigned char[len + AES_BLOCK_SIZE];
|
||||
int bytes{ 0 };
|
||||
EVP_EncryptUpdate(cipher.getCtx(), ciphertext, &bytes, plaintext, len);
|
||||
|
@ -110,6 +111,7 @@ DecryptionStreamCipher::DecryptionStreamCipher(const StreamCipher::Key& key, con
|
|||
}
|
||||
|
||||
StringRef DecryptionStreamCipher::decrypt(unsigned char const* ciphertext, int len, Arena& arena) {
|
||||
TEST(true); // Decrypting data with StreamCipher
|
||||
auto plaintext = new (arena) unsigned char[len];
|
||||
int bytesDecrypted{ 0 };
|
||||
EVP_DecryptUpdate(cipher.getCtx(), plaintext, &bytesDecrypted, ciphertext, len);
|
||||
|
|
|
@ -52,6 +52,7 @@ class ThreadSafeQueue : NonCopyable {
|
|||
struct Node : BaseNode, FastAllocated<Node> {
|
||||
T data;
|
||||
Node(T const& data) : data(data) {}
|
||||
Node(T&& data) : data(std::move(data)) {}
|
||||
};
|
||||
std::atomic<BaseNode*> head;
|
||||
BaseNode* tail;
|
||||
|
@ -131,9 +132,9 @@ public:
|
|||
}
|
||||
|
||||
// If push() returns true, the consumer may be sleeping and should be woken
|
||||
bool push(T const& data) {
|
||||
Node* n = new Node(data);
|
||||
n->data = data;
|
||||
template <class U>
|
||||
bool push(U&& data) {
|
||||
Node* n = new Node(std::forward<U>(data));
|
||||
return pushNode(n) == &sleeping;
|
||||
}
|
||||
|
||||
|
|
|
@ -107,7 +107,7 @@ private:
|
|||
std::vector<TraceEventFields> eventBuffer;
|
||||
int loggedLength;
|
||||
int bufferLength;
|
||||
bool opened;
|
||||
std::atomic<bool> opened;
|
||||
int64_t preopenOverflowCount;
|
||||
std::string basename;
|
||||
std::string logGroup;
|
||||
|
@ -115,6 +115,7 @@ private:
|
|||
std::string directory;
|
||||
std::string processName;
|
||||
Optional<NetworkAddress> localAddress;
|
||||
std::string tracePartialFileSuffix;
|
||||
|
||||
Reference<IThreadPool> writer;
|
||||
uint64_t rollsize;
|
||||
|
@ -288,13 +289,15 @@ public:
|
|||
std::string const& timestamp,
|
||||
uint64_t rs,
|
||||
uint64_t maxLogsSize,
|
||||
Optional<NetworkAddress> na) {
|
||||
Optional<NetworkAddress> na,
|
||||
std::string const& tracePartialFileSuffix) {
|
||||
ASSERT(!writer && !opened);
|
||||
|
||||
this->directory = directory;
|
||||
this->processName = processName;
|
||||
this->logGroup = logGroup;
|
||||
this->localAddress = na;
|
||||
this->tracePartialFileSuffix = tracePartialFileSuffix;
|
||||
|
||||
basename = format("%s/%s.%s.%s",
|
||||
directory.c_str(),
|
||||
|
@ -306,6 +309,7 @@ public:
|
|||
processName,
|
||||
basename,
|
||||
formatter->getExtension(),
|
||||
tracePartialFileSuffix,
|
||||
maxLogsSize,
|
||||
[this]() { barriers->triggerAll(); },
|
||||
issues));
|
||||
|
@ -715,7 +719,8 @@ void openTraceFile(const NetworkAddress& na,
|
|||
std::string directory,
|
||||
std::string baseOfBase,
|
||||
std::string logGroup,
|
||||
std::string identifier) {
|
||||
std::string identifier,
|
||||
std::string tracePartialFileSuffix) {
|
||||
if (g_traceLog.isOpen())
|
||||
return;
|
||||
|
||||
|
@ -739,7 +744,8 @@ void openTraceFile(const NetworkAddress& na,
|
|||
format("%lld", time(nullptr)),
|
||||
rollsize,
|
||||
maxLogsSize,
|
||||
!g_network->isSimulated() ? na : Optional<NetworkAddress>());
|
||||
!g_network->isSimulated() ? na : Optional<NetworkAddress>(),
|
||||
tracePartialFileSuffix);
|
||||
|
||||
uncancellable(recurring(&flushTraceFile, FLOW_KNOBS->TRACE_FLUSH_INTERVAL, TaskPriority::FlushTrace));
|
||||
g_traceBatch.dump();
|
||||
|
|
|
@ -564,7 +564,8 @@ void openTraceFile(const NetworkAddress& na,
|
|||
std::string directory = ".",
|
||||
std::string baseOfBase = "trace",
|
||||
std::string logGroup = "default",
|
||||
std::string identifier = "");
|
||||
std::string identifier = "",
|
||||
std::string tracePartialFileSuffix = "");
|
||||
void initTraceEventMetrics();
|
||||
void closeTraceFile();
|
||||
bool traceFileIsOpen();
|
||||
|
|
Loading…
Reference in New Issue