merge master, fix conflicts

This commit is contained in:
Xiaoxi Wang 2021-08-10 09:11:55 -07:00
commit 1f6cee89ab
55 changed files with 784 additions and 424 deletions

2
.gitignore vendored
View File

@ -8,7 +8,7 @@ bindings/java/foundationdb-client*.jar
bindings/java/foundationdb-tests*.jar
bindings/java/fdb-java-*-sources.jar
packaging/msi/FDBInstaller.msi
builds/
build/
cmake-build-debug/
# Generated source, build, and packaging files
*.g.cpp

View File

@ -171,7 +171,7 @@ add_subdirectory(fdbbackup)
add_subdirectory(contrib)
add_subdirectory(tests)
add_subdirectory(flowbench EXCLUDE_FROM_ALL)
if(WITH_PYTHON)
if(WITH_PYTHON AND WITH_C_BINDING)
add_subdirectory(bindings)
endif()
if(WITH_DOCUMENTATION)

View File

@ -3,14 +3,16 @@ if(NOT OPEN_FOR_IDE)
add_subdirectory(c)
add_subdirectory(flow)
endif()
add_subdirectory(python)
if(WITH_JAVA)
if(WITH_PYTHON_BINDING)
add_subdirectory(python)
endif()
if(WITH_JAVA_BINDING)
add_subdirectory(java)
endif()
if(WITH_GO AND NOT OPEN_FOR_IDE)
if(WITH_GO_BINDING AND NOT OPEN_FOR_IDE)
add_subdirectory(go)
endif()
if(WITH_RUBY)
if(WITH_RUBY_BINDING)
add_subdirectory(ruby)
endif()
if(NOT WIN32 AND NOT OPEN_FOR_IDE)

View File

@ -332,9 +332,10 @@ def transaction(logger):
output7 = run_fdbcli_command('get', 'key')
assert output7 == "`key': not found"
def get_fdb_process_addresses():
def get_fdb_process_addresses(logger):
# get all processes' network addresses
output = run_fdbcli_command('kill')
logger.debug(output)
# except the first line, each line is one process
addresses = output.split('\n')[1:]
assert len(addresses) == process_number
@ -354,7 +355,7 @@ def coordinators(logger):
assert coordinator_list[0]['address'] == coordinators
# verify the cluster description
assert get_value_from_status_json(True, 'cluster', 'connection_string').startswith('{}:'.format(cluster_description))
addresses = get_fdb_process_addresses()
addresses = get_fdb_process_addresses(logger)
# set all 5 processes as coordinators and update the cluster description
new_cluster_description = 'a_simple_description'
run_fdbcli_command('coordinators', *addresses, 'description={}'.format(new_cluster_description))
@ -369,7 +370,7 @@ def coordinators(logger):
@enable_logging()
def exclude(logger):
# get all processes' network addresses
addresses = get_fdb_process_addresses()
addresses = get_fdb_process_addresses(logger)
logger.debug("Cluster processes: {}".format(' '.join(addresses)))
# There should be no excluded process for now
no_excluded_process_output = 'There are currently no servers or localities excluded from the database.'
@ -377,16 +378,28 @@ def exclude(logger):
assert no_excluded_process_output in output1
# randomly pick one and exclude the process
excluded_address = random.choice(addresses)
# If we see "not enough space" error, use FORCE option to proceed
# this should be a safe operation as we do not need any storage space for the test
force = False
# sometimes we need to retry the exclude
while True:
logger.debug("Excluding process: {}".format(excluded_address))
error_message = run_fdbcli_command_and_get_error('exclude', excluded_address)
if force:
error_message = run_fdbcli_command_and_get_error('exclude', 'FORCE', excluded_address)
else:
error_message = run_fdbcli_command_and_get_error('exclude', excluded_address)
if error_message == 'WARNING: {} is a coordinator!'.format(excluded_address):
# exclude coordinator will print the warning, verify the randomly selected process is the coordinator
coordinator_list = get_value_from_status_json(True, 'client', 'coordinators', 'coordinators')
assert len(coordinator_list) == 1
assert coordinator_list[0]['address'] == excluded_address
break
elif 'ERROR: This exclude may cause the total free space in the cluster to drop below 10%.' in error_message:
# exclude the process may cause the free space not enough
# use FORCE option to ignore it and proceed
assert not force
force = True
logger.debug("Use FORCE option to exclude the process")
elif not error_message:
break
else:
@ -436,7 +449,8 @@ if __name__ == '__main__':
# assertions will fail if fdbcli does not work as expected
process_number = int(sys.argv[3])
if process_number == 1:
advanceversion()
# TODO: disable for now, the change can cause the database unavailable
#advanceversion()
cache_range()
consistencycheck()
datadistribution()
@ -449,10 +463,7 @@ if __name__ == '__main__':
throttle()
else:
assert process_number > 1, "Process number should be positive"
# the kill command which used to list processes seems to not work as expected sometime
# which makes the test flaky.
# We need to figure out the reason and then re-enable these tests
#coordinators()
#exclude()
coordinators()
exclude()

View File

@ -352,7 +352,7 @@ function(package_bindingtester)
COMMENT "Copy Flow tester for bindingtester")
set(generated_binding_files python/fdb/fdboptions.py)
if(WITH_JAVA)
if(WITH_JAVA_BINDING)
if(NOT FDB_RELEASE)
set(prerelease_string "-PRERELEASE")
else()
@ -369,7 +369,7 @@ function(package_bindingtester)
set(generated_binding_files ${generated_binding_files} java/foundationdb-tests.jar)
endif()
if(WITH_GO AND NOT OPEN_FOR_IDE)
if(WITH_GO_BINDING AND NOT OPEN_FOR_IDE)
add_dependencies(copy_binding_output_files fdb_go_tester fdb_go)
add_custom_command(
TARGET copy_binding_output_files

View File

@ -1,61 +1,73 @@
function(compile_boost)
# Initialize function incoming parameters
set(options)
set(oneValueArgs TARGET)
set(multiValueArgs BUILD_ARGS CXXFLAGS LDFLAGS)
cmake_parse_arguments(MY "${options}" "${oneValueArgs}"
cmake_parse_arguments(COMPILE_BOOST "${options}" "${oneValueArgs}"
"${multiValueArgs}" ${ARGN} )
# Configure the boost toolset to use
set(BOOTSTRAP_ARGS "--with-libraries=context")
set(B2_COMMAND "./b2")
set(BOOST_COMPILER_FLAGS -fvisibility=hidden -fPIC -std=c++14 -w)
# Configure bootstrap command
set(BOOTSTRAP_COMMAND "./bootstrap.sh")
set(BOOTSTRAP_LIBRARIES "context")
set(BOOST_CXX_COMPILER "${CMAKE_CXX_COMPILER}")
if(APPLE)
set(BOOST_TOOLSET "clang-darwin")
# this is to fix a weird macOS issue -- by default
# cmake would otherwise pass a compiler that can't
# compile boost
set(BOOST_CXX_COMPILER "/usr/bin/clang++")
elseif(CLANG)
if(CLANG)
set(BOOST_TOOLSET "clang")
list(APPEND BOOTSTRAP_ARGS "${BOOTSTRAP_COMMAND} --with-toolset=clang")
if(APPLE)
# this is to fix a weird macOS issue -- by default
# cmake would otherwise pass a compiler that can't
# compile boost
set(BOOST_CXX_COMPILER "/usr/bin/clang++")
endif()
else()
set(BOOST_TOOLSET "gcc")
endif()
if(APPLE OR USE_LIBCXX)
list(APPEND BOOST_COMPILER_FLAGS -stdlib=libc++)
endif()
set(BOOST_ADDITIONAL_COMPILE_OPTIOINS "")
foreach(flag IN LISTS BOOST_COMPILER_FLAGS MY_CXXFLAGS)
string(APPEND BOOST_ADDITIONAL_COMPILE_OPTIOINS "<cxxflags>${flag} ")
endforeach()
foreach(flag IN LISTS MY_LDFLAGS)
string(APPEND BOOST_ADDITIONAL_COMPILE_OPTIOINS "<linkflags>${flag} ")
endforeach()
configure_file(${CMAKE_SOURCE_DIR}/cmake/user-config.jam.cmake ${CMAKE_BINARY_DIR}/user-config.jam)
message(STATUS "Use ${BOOST_TOOLSET} to build boost")
# Configure b2 command
set(B2_COMMAND "./b2")
set(BOOST_COMPILER_FLAGS -fvisibility=hidden -fPIC -std=c++17 -w)
set(BOOST_LINK_FLAGS "")
if(APPLE OR CLANG OR USE_LIBCXX)
list(APPEND BOOST_COMPILER_FLAGS -stdlib=libc++ -nostdlib++)
list(APPEND BOOST_LINK_FLAGS -static-libgcc -lc++ -lc++abi)
endif()
# Update the user-config.jam
set(BOOST_ADDITIONAL_COMPILE_OPTIOINS "")
foreach(flag IN LISTS BOOST_COMPILER_FLAGS COMPILE_BOOST_CXXFLAGS)
string(APPEND BOOST_ADDITIONAL_COMPILE_OPTIONS "<cxxflags>${flag} ")
endforeach()
#foreach(flag IN LISTS BOOST_LINK_FLAGS COMPILE_BOOST_LDFLAGS)
# string(APPEND BOOST_ADDITIONAL_COMPILE_OPTIONS "<linkflags>${flag} ")
#endforeach()
configure_file(${CMAKE_SOURCE_DIR}/cmake/user-config.jam.cmake ${CMAKE_BINARY_DIR}/user-config.jam)
set(USER_CONFIG_FLAG --user-config=${CMAKE_BINARY_DIR}/user-config.jam)
# Build boost
include(ExternalProject)
set(BOOST_INSTALL_DIR "${CMAKE_BINARY_DIR}/boost_install")
ExternalProject_add("${MY_TARGET}Project"
ExternalProject_add("${COMPILE_BOOST_TARGET}Project"
URL "https://boostorg.jfrog.io/artifactory/main/release/1.72.0/source/boost_1_72_0.tar.bz2"
URL_HASH SHA256=59c9b274bc451cf91a9ba1dd2c7fdcaf5d60b1b3aa83f2c9fa143417cc660722
CONFIGURE_COMMAND ./bootstrap.sh ${BOOTSTRAP_ARGS}
BUILD_COMMAND ${B2_COMMAND} link=static ${MY_BUILD_ARGS} --prefix=${BOOST_INSTALL_DIR} ${USER_CONFIG_FLAG} install
CONFIGURE_COMMAND ${BOOTSTRAP_COMMAND} ${BOOTSTRAP_ARGS} --with-libraries=${BOOTSTRAP_LIBRARIES} --with-toolset=${BOOST_TOOLSET}
BUILD_COMMAND ${B2_COMMAND} link=static ${COMPILE_BOOST_BUILD_ARGS} --prefix=${BOOST_INSTALL_DIR} ${USER_CONFIG_FLAG} install
BUILD_IN_SOURCE ON
INSTALL_COMMAND ""
UPDATE_COMMAND ""
BUILD_BYPRODUCTS "${BOOST_INSTALL_DIR}/boost/config.hpp"
"${BOOST_INSTALL_DIR}/lib/libboost_context.a")
add_library(${MY_TARGET}_context STATIC IMPORTED)
add_dependencies(${MY_TARGET}_context ${MY_TARGET}Project)
set_target_properties(${MY_TARGET}_context PROPERTIES IMPORTED_LOCATION "${BOOST_INSTALL_DIR}/lib/libboost_context.a")
add_library(${COMPILE_BOOST_TARGET}_context STATIC IMPORTED)
add_dependencies(${COMPILE_BOOST_TARGET}_context ${COMPILE_BOOST_TARGET}Project)
set_target_properties(${COMPILE_BOOST_TARGET}_context PROPERTIES IMPORTED_LOCATION "${BOOST_INSTALL_DIR}/lib/libboost_context.a")
add_library(${MY_TARGET} INTERFACE)
target_include_directories(${MY_TARGET} SYSTEM INTERFACE ${BOOST_INSTALL_DIR}/include)
target_link_libraries(${MY_TARGET} INTERFACE ${MY_TARGET}_context)
endfunction()
add_library(${COMPILE_BOOST_TARGET} INTERFACE)
target_include_directories(${COMPILE_BOOST_TARGET} SYSTEM INTERFACE ${BOOST_INSTALL_DIR}/include)
target_link_libraries(${COMPILE_BOOST_TARGET} INTERFACE ${COMPILE_BOOST_TARGET}_context)
endfunction(compile_boost)
if(USE_SANITIZER)
if(WIN32)
@ -72,10 +84,20 @@ if(USE_SANITIZER)
return()
endif()
list(APPEND CMAKE_PREFIX_PATH /opt/boost_1_72_0)
# since boost 1.72 boost installs cmake configs. We will enforce config mode
set(Boost_USE_STATIC_LIBS ON)
set(BOOST_HINT_PATHS /opt/boost_1_72_0)
# Clang and Gcc will have different name mangling to std::call_once, etc.
if (UNIX AND CMAKE_CXX_COMPILER_ID MATCHES "Clang$")
list(APPEND CMAKE_PREFIX_PATH /opt/boost_1_72_0_clang)
set(BOOST_HINT_PATHS /opt/boost_1_72_0_clang)
message(STATUS "Using Clang version of boost::context")
else ()
list(APPEND CMAKE_PREFIX_PATH /opt/boost_1_72_0)
set(BOOST_HINT_PATHS /opt/boost_1_72_0)
message(STATUS "Using g++ version of boost::context")
endif ()
if(BOOST_ROOT)
list(APPEND BOOST_HINT_PATHS ${BOOST_ROOT})
endif()

View File

@ -261,10 +261,6 @@ else()
if (CLANG)
add_compile_options()
# Clang has link errors unless `atomic` is specifically requested.
if(NOT APPLE)
#add_link_options(-latomic)
endif()
if (APPLE OR USE_LIBCXX)
add_compile_options($<$<COMPILE_LANGUAGE:CXX>:-stdlib=libc++>)
if (NOT APPLE)

View File

@ -47,22 +47,6 @@ else()
endif()
endif()
################################################################################
# Java Bindings
################################################################################
set(WITH_JAVA OFF)
find_package(JNI 1.8)
find_package(Java 1.8 COMPONENTS Development)
# leave FreeBSD JVM compat for later
if(JNI_FOUND AND Java_FOUND AND Java_Development_FOUND AND NOT (CMAKE_SYSTEM_NAME STREQUAL "FreeBSD"))
set(WITH_JAVA ON)
include(UseJava)
enable_language(Java)
else()
set(WITH_JAVA OFF)
endif()
################################################################################
# Python Bindings
################################################################################
@ -75,12 +59,57 @@ else()
set(WITH_PYTHON OFF)
endif()
option(BUILD_PYTHON_BINDING "build python binding" ON)
if(NOT BUILD_PYTHON_BINDING OR NOT WITH_PYTHON)
set(WITH_PYTHON_BINDING OFF)
else()
if(WITH_PYTHON)
set(WITH_PYTHON_BINDING ON)
else()
#message(FATAL_ERROR "Could not found a suitable python interpreter")
set(WITH_PYTHON_BINDING OFF)
endif()
endif()
################################################################################
# C Bindings
################################################################################
option(BUILD_C_BINDING "build C binding" ON)
if(BUILD_C_BINDING AND WITH_PYTHON)
set(WITH_C_BINDING ON)
else()
set(WITH_C_BINDING OFF)
endif()
################################################################################
# Java Bindings
################################################################################
option(BUILD_JAVA_BINDING "build java binding" ON)
if(NOT BUILD_JAVA_BINDING OR NOT WITH_C_BINDING)
set(WITH_JAVA_BINDING OFF)
else()
set(WITH_JAVA_BINDING OFF)
find_package(JNI 1.8)
find_package(Java 1.8 COMPONENTS Development)
# leave FreeBSD JVM compat for later
if(JNI_FOUND AND Java_FOUND AND Java_Development_FOUND AND NOT (CMAKE_SYSTEM_NAME STREQUAL "FreeBSD") AND WITH_C_BINDING)
set(WITH_JAVA_BINDING ON)
include(UseJava)
enable_language(Java)
else()
set(WITH_JAVA_BINDING OFF)
endif()
endif()
################################################################################
# Pip
################################################################################
option(BUILD_DOCUMENTATION "build documentation" ON)
find_package(Python3 COMPONENTS Interpreter)
if (Python3_Interpreter_FOUND)
if (WITH_PYTHON AND Python3_Interpreter_FOUND AND BUILD_DOCUMENTATION)
set(WITH_DOCUMENTATION ON)
else()
set(WITH_DOCUMENTATION OFF)
@ -90,27 +119,37 @@ endif()
# GO
################################################################################
find_program(GO_EXECUTABLE go)
# building the go binaries is currently not supported on Windows
if(GO_EXECUTABLE AND NOT WIN32)
set(WITH_GO ON)
option(BUILD_GO_BINDING "build go binding" ON)
if(NOT BUILD_GO_BINDING OR NOT BUILD_C_BINDING)
set(WITH_GO_BINDING OFF)
else()
set(WITH_GO OFF)
endif()
if (USE_SANITIZER)
# Disable building go for sanitizers, since _stacktester doesn't link properly
set(WITH_GO OFF)
find_program(GO_EXECUTABLE go)
# building the go binaries is currently not supported on Windows
if(GO_EXECUTABLE AND NOT WIN32 AND WITH_C_BINDING)
set(WITH_GO_BINDING ON)
else()
set(WITH_GO_BINDING OFF)
endif()
if (USE_SANITIZER)
# Disable building go for sanitizers, since _stacktester doesn't link properly
set(WITH_GO_BINDING OFF)
endif()
endif()
################################################################################
# Ruby
################################################################################
find_program(GEM_EXECUTABLE gem)
set(WITH_RUBY OFF)
if(GEM_EXECUTABLE)
set(GEM_COMMAND ${RUBY_EXECUTABLE} ${GEM_EXECUTABLE})
set(WITH_RUBY ON)
option(BUILD_RUBY_BINDING "build ruby binding" ON)
if(NOT BUILD_RUBY_BINDING OR NOT BUILD_C_BINDING)
set(WITH_RUBY_BINDING OFF)
else()
find_program(GEM_EXECUTABLE gem)
set(WITH_RUBY_BINDING OFF)
if(GEM_EXECUTABLE AND WITH_C_BINDING)
set(GEM_COMMAND ${RUBY_EXECUTABLE} ${GEM_EXECUTABLE})
set(WITH_RUBY_BINDING ON)
endif()
endif()
################################################################################
@ -160,20 +199,22 @@ function(print_components)
message(STATUS "=========================================")
message(STATUS " Components Build Overview ")
message(STATUS "=========================================")
message(STATUS "Build Java Bindings: ${WITH_JAVA}")
message(STATUS "Build with TLS support: ${WITH_TLS}")
message(STATUS "Build Go bindings: ${WITH_GO}")
message(STATUS "Build Ruby bindings: ${WITH_RUBY}")
message(STATUS "Build Python sdist (make package): ${WITH_PYTHON}")
message(STATUS "Build Documentation (make html): ${WITH_DOCUMENTATION}")
message(STATUS "Build Bindings (depends on Python): ${WITH_PYTHON}")
message(STATUS "Build C Bindings: ${WITH_C_BINDING}")
message(STATUS "Build Python Bindings: ${WITH_PYTHON_BINDING}")
message(STATUS "Build Java Bindings: ${WITH_JAVA_BINDING}")
message(STATUS "Build Go bindings: ${WITH_GO_BINDING}")
message(STATUS "Build Ruby bindings: ${WITH_RUBY_BINDING}")
message(STATUS "Build with TLS support: ${WITH_TLS}")
message(STATUS "Build Documentation (make html): ${WITH_DOCUMENTATION}")
message(STATUS "Build Python sdist (make package): ${WITH_PYTHON_BINDING}")
message(STATUS "Configure CTest (depends on Python): ${WITH_PYTHON}")
message(STATUS "Build with RocksDB: ${WITH_ROCKSDB_EXPERIMENTAL}")
message(STATUS "=========================================")
endfunction()
if(FORCE_ALL_COMPONENTS)
if(NOT WITH_JAVA OR NOT WITH_TLS OR NOT WITH_GO OR NOT WITH_RUBY OR NOT WITH_PYTHON OR NOT WITH_DOCUMENTATION)
if(NOT WITH_C_BINDING OR NOT WITH_JAVA_BINDING OR NOT WITH_TLS OR NOT WITH_GO_BINDING OR NOT WITH_RUBY_BINDING OR NOT WITH_PYTHON_BINDING OR NOT WITH_DOCUMENTATION)
print_components()
message(FATAL_ERROR "FORCE_ALL_COMPONENTS is set but not all dependencies could be found")
endif()

View File

@ -1 +1 @@
using @BOOST_TOOLSET@ : : @BOOST_CXX_COMPILER@ : @BOOST_ADDITIONAL_COMPILE_OPTIOINS@ ;
using @BOOST_TOOLSET@ : : @BOOST_CXX_COMPILER@ : @BOOST_ADDITIONAL_COMPILE_OPTIONS@ ;

View File

@ -2,10 +2,17 @@
Release Notes
#############
6.3.19
======
* Add the ``trace_partial_file_suffix`` network option. This option will give unfinished trace files a special suffix to indicate they're not complete yet. When the trace file is complete, it is renamed to remove the suffix. `(PR #5330) <https://github.com/apple/foundationdb/pull/5330>`_
6.3.18
======
* The multi-version client API would not propagate errors that occurred when creating databases on external clients. This could result in a invalid memory accesses. `(PR #5221) <https://github.com/apple/foundationdb/pull/5221>`_
* Fixed a race between the multi-version client connecting to a cluster and destroying the database that could cause an assertion failure. `(PR #5221) <https://github.com/apple/foundationdb/pull/5221>`_
* Added Mako latency measurements. `(PR #5255) <https://github.com/apple/foundationdb/pull/5255>`_
* Fixed a bug introduced when porting restoring an inconsistent snapshot feature from 7.0 branch to 6.3 branch. The parameter that controls whether to perform an inconsistent snapshot restore may instead be used to lock the database during restore. `(PR #5228) <https://github.com/apple/foundationdb/pull/5228>`_
* Added SidebandMultiThreadClientTest, which validates causal consistency for multi-threaded client. `(PR #5173) <https://github.com/apple/foundationdb/pull/5173>`_
6.3.17
======

View File

@ -29,6 +29,7 @@ Features
* Added perpetual storage wiggle that supports less impactful B-trees recreation and data migration. These will also be used for deploying the Testing Storage Server which compares 2 storage engines' results. See :ref:`Documentation <perpetual-storage-wiggle>` for details. `(PR #4838) <https://github.com/apple/foundationdb/pull/4838>`_
* Improved the efficiency with which storage servers replicate data between themselves. `(PR #5017) <https://github.com/apple/foundationdb/pull/5017>`_
* Added support to ``exclude command`` to exclude based on locality match. `(PR #5113) <https://github.com/apple/foundationdb/pull/5113>`_
* Add the ``trace_partial_file_suffix`` network option. This option will give unfinished trace files a special suffix to indicate they're not complete yet. When the trace file is complete, it is renamed to remove the suffix. `(PR #5328) <https://github.com/apple/foundationdb/pull/5328>`_
Performance
-----------

View File

@ -506,7 +506,7 @@ ACTOR Future<Void> decode_logs(DecodeParams params) {
wait(process_file(container, logs[idx], uid, params));
idx++;
}
TraceEvent("DecodeDone", uid);
TraceEvent("DecodeDone", uid).log();
return Void();
}

View File

@ -1667,7 +1667,7 @@ ACTOR Future<Void> cleanupStatus(Reference<ReadYourWritesTransaction> tr,
readMore = true;
} catch (Error& e) {
// If doc can't be parsed or isn't alive, delete it.
TraceEvent(SevWarn, "RemovedDeadBackupLayerStatus").detail("Key", docs[i].key);
TraceEvent(SevWarn, "RemovedDeadBackupLayerStatus").detail("Key", docs[i].key).error(e, true);
tr->clear(docs[i].key);
// If limit is 1 then read more.
if (limit == 1)

View File

@ -7,6 +7,7 @@ set(FDBCLI_SRCS
FlowLineNoise.h
ForceRecoveryWithDataLossCommand.actor.cpp
MaintenanceCommand.actor.cpp
SetClassCommand.actor.cpp
SnapshotCommand.actor.cpp
ThrottleCommand.actor.cpp
Util.cpp

View File

@ -0,0 +1,123 @@
/*
* SetClassCommand.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2021 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbcli/fdbcli.actor.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/IClientApi.h"
#include "fdbclient/Knobs.h"
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
namespace {
ACTOR Future<Void> printProcessClass(Reference<IDatabase> db) {
state Reference<ITransaction> tr = db->createTransaction();
loop {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
try {
// Hold the reference to the memory
state ThreadFuture<RangeResult> classTypeFuture =
tr->getRange(fdb_cli::processClassTypeSpecialKeyRange, CLIENT_KNOBS->TOO_MANY);
state ThreadFuture<RangeResult> classSourceFuture =
tr->getRange(fdb_cli::processClassSourceSpecialKeyRange, CLIENT_KNOBS->TOO_MANY);
wait(success(safeThreadFutureToFuture(classSourceFuture)) &&
success(safeThreadFutureToFuture(classTypeFuture)));
RangeResult processTypeList = classTypeFuture.get();
RangeResult processSourceList = classSourceFuture.get();
ASSERT(processSourceList.size() == processTypeList.size());
if (!processTypeList.size())
printf("No processes are registered in the database.\n");
printf("There are currently %zu processes in the database:\n", processTypeList.size());
for (int index = 0; index < processTypeList.size(); index++) {
std::string address =
processTypeList[index].key.removePrefix(fdb_cli::processClassTypeSpecialKeyRange.begin).toString();
// check the addresses are the same in each list
std::string addressFromSourceList =
processSourceList[index]
.key.removePrefix(fdb_cli::processClassSourceSpecialKeyRange.begin)
.toString();
ASSERT(address == addressFromSourceList);
printf(" %s: %s (%s)\n",
address.c_str(),
processTypeList[index].value.toString().c_str(),
processSourceList[index].value.toString().c_str());
}
return Void();
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
};
ACTOR Future<bool> setProcessClass(Reference<IDatabase> db, KeyRef network_address, KeyRef class_type) {
state Reference<ITransaction> tr = db->createTransaction();
loop {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
try {
tr->set(network_address.withPrefix(fdb_cli::processClassTypeSpecialKeyRange.begin), class_type);
wait(safeThreadFutureToFuture(tr->commit()));
return true;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
} // namespace
namespace fdb_cli {
const KeyRangeRef processClassSourceSpecialKeyRange =
KeyRangeRef(LiteralStringRef("\xff\xff/configuration/process/class_source/"),
LiteralStringRef("\xff\xff/configuration/process/class_source0"));
const KeyRangeRef processClassTypeSpecialKeyRange =
KeyRangeRef(LiteralStringRef("\xff\xff/configuration/process/class_type/"),
LiteralStringRef("\xff\xff/configuration/process/class_type0"));
ACTOR Future<bool> setClassCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens) {
if (tokens.size() != 3 && tokens.size() != 1) {
printUsage(tokens[0]);
return false;
} else if (tokens.size() == 1) {
wait(printProcessClass(db));
} else {
bool successful = wait(setProcessClass(db, tokens[1], tokens[2]));
return successful;
}
return true;
}
CommandFactory setClassFactory(
"setclass",
CommandHelp("setclass [<ADDRESS> <CLASS>]",
"change the class of a process",
"If no address and class are specified, lists the classes of all servers.\n\nSetting the class to "
"`default' resets the process class to the class specified on the command line. The available "
"classes are `unset', `storage', `transaction', `resolution', `commit_proxy', `grv_proxy', "
"`master', `test', "
"`stateless', `log', `router', `cluster_controller', `fast_restore', `data_distributor', "
"`coordinator', `ratekeeper', `storage_cache', `backup', and `default'."));
} // namespace fdb_cli

View File

@ -566,15 +566,6 @@ void initHelp() {
"pair in <ADDRESS...> or any LocalityData (like dcid, zoneid, machineid, processid), removes any "
"matching exclusions from the excluded servers and localities list. "
"(A specified IP will match all IP:* exclusion entries)");
helpMap["setclass"] =
CommandHelp("setclass [<ADDRESS> <CLASS>]",
"change the class of a process",
"If no address and class are specified, lists the classes of all servers.\n\nSetting the class to "
"`default' resets the process class to the class specified on the command line. The available "
"classes are `unset', `storage', `transaction', `resolution', `commit_proxy', `grv_proxy', "
"`master', `test', "
"`stateless', `log', `router', `cluster_controller', `fast_restore', `data_distributor', "
"`coordinator', `ratekeeper', `storage_cache', `backup', and `default'.");
helpMap["status"] =
CommandHelp("status [minimal|details|json]",
"get the status of a FoundationDB cluster",
@ -2742,45 +2733,6 @@ ACTOR Future<bool> createSnapshot(Database db, std::vector<StringRef> tokens) {
return false;
}
ACTOR Future<bool> setClass(Database db, std::vector<StringRef> tokens) {
if (tokens.size() == 1) {
vector<ProcessData> _workers = wait(makeInterruptable(getWorkers(db)));
auto workers = _workers; // strip const
if (!workers.size()) {
printf("No processes are registered in the database.\n");
return false;
}
std::sort(workers.begin(), workers.end(), ProcessData::sort_by_address());
printf("There are currently %zu processes in the database:\n", workers.size());
for (const auto& w : workers)
printf(" %s: %s (%s)\n",
w.address.toString().c_str(),
w.processClass.toString().c_str(),
w.processClass.sourceString().c_str());
return false;
}
AddressExclusion addr = AddressExclusion::parse(tokens[1]);
if (!addr.isValid()) {
fprintf(stderr, "ERROR: '%s' is not a valid network endpoint address\n", tokens[1].toString().c_str());
if (tokens[1].toString().find(":tls") != std::string::npos)
printf(" Do not include the `:tls' suffix when naming a process\n");
return true;
}
ProcessClass processClass(tokens[2].toString(), ProcessClass::DBSource);
if (processClass.classType() == ProcessClass::InvalidClass && tokens[2] != LiteralStringRef("default")) {
fprintf(stderr, "ERROR: '%s' is not a valid process class\n", tokens[2].toString().c_str());
return true;
}
wait(makeInterruptable(setClass(db, addr, processClass)));
return false;
};
Reference<ReadYourWritesTransaction> getTransaction(Database db,
Reference<ReadYourWritesTransaction>& tr,
FdbOptions* options,
@ -3689,14 +3641,9 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
}
if (tokencmp(tokens[0], "setclass")) {
if (tokens.size() != 3 && tokens.size() != 1) {
printUsage(tokens[0]);
bool _result = wait(makeInterruptable(setClassCommandActor(db2, tokens)));
if (!_result)
is_error = true;
} else {
bool err = wait(setClass(db, tokens));
if (err)
is_error = true;
}
continue;
}

View File

@ -64,7 +64,9 @@ extern const KeyRef consistencyCheckSpecialKey;
// maintenance
extern const KeyRangeRef maintenanceSpecialKeyRange;
extern const KeyRef ignoreSSFailureSpecialKey;
// setclass
extern const KeyRangeRef processClassSourceSpecialKeyRange;
extern const KeyRangeRef processClassTypeSpecialKeyRange;
// help functions (Copied from fdbcli.actor.cpp)
// compare StringRef with the given c string
@ -81,6 +83,8 @@ ACTOR Future<bool> consistencyCheckCommandActor(Reference<ITransaction> tr, std:
ACTOR Future<bool> forceRecoveryWithDataLossCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
// maintenance command
ACTOR Future<bool> maintenanceCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
// setclass command
ACTOR Future<bool> setClassCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
// snapshot command
ACTOR Future<bool> snapshotCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
// throttle command

View File

@ -18,6 +18,8 @@
* limitations under the License.
*/
#include <atomic>
#include "fdbclient/AsyncTaskThread.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -30,13 +32,22 @@ public:
bool isTerminate() const override { return true; }
};
ACTOR Future<Void> asyncTaskThreadClient(AsyncTaskThread* asyncTaskThread, int* sum, int count) {
ACTOR Future<Void> asyncTaskThreadClient(AsyncTaskThread* asyncTaskThread, std::atomic<int> *sum, int count, int clientId, double meanSleep) {
state int i = 0;
state double randomSleep = 0.0;
for (; i < count; ++i) {
randomSleep = deterministicRandom()->random01() * 2 * meanSleep;
wait(delay(randomSleep));
wait(asyncTaskThread->execAsync([sum = sum] {
++(*sum);
sum->fetch_add(1);
return Void();
}));
TraceEvent("AsyncTaskThreadIncrementedSum")
.detail("Index", i)
.detail("Sum", sum->load())
.detail("ClientId", clientId)
.detail("RandomSleep", randomSleep)
.detail("MeanSleep", meanSleep);
}
return Void();
}
@ -51,7 +62,7 @@ AsyncTaskThread::~AsyncTaskThread() {
bool wakeUp = false;
{
std::lock_guard<std::mutex> g(m);
wakeUp = queue.push(std::make_shared<TerminateTask>());
wakeUp = queue.push(std::make_unique<TerminateTask>());
}
if (wakeUp) {
cv.notify_one();
@ -61,7 +72,7 @@ AsyncTaskThread::~AsyncTaskThread() {
void AsyncTaskThread::run(AsyncTaskThread* self) {
while (true) {
std::shared_ptr<IAsyncTask> task;
std::unique_ptr<IAsyncTask> task;
{
std::unique_lock<std::mutex> lk(self->m);
self->cv.wait(lk, [self] { return !self->queue.canSleep(); });
@ -75,14 +86,30 @@ void AsyncTaskThread::run(AsyncTaskThread* self) {
}
TEST_CASE("/asynctaskthread/add") {
state int sum = 0;
state std::atomic<int> sum = 0;
state AsyncTaskThread asyncTaskThread;
state int numClients = 10;
state int incrementsPerClient = 100;
std::vector<Future<Void>> clients;
clients.reserve(10);
for (int i = 0; i < 10; ++i) {
clients.push_back(asyncTaskThreadClient(&asyncTaskThread, &sum, 100));
clients.reserve(numClients);
for (int clientId = 0; clientId < numClients; ++clientId) {
clients.push_back(asyncTaskThreadClient(&asyncTaskThread, &sum, incrementsPerClient, clientId, deterministicRandom()->random01() * 0.01));
}
wait(waitForAll(clients));
ASSERT_EQ(sum, 1000);
ASSERT_EQ(sum.load(), numClients * incrementsPerClient);
return Void();
}
TEST_CASE("/asynctaskthread/error") {
state AsyncTaskThread asyncTaskThread;
try {
wait(asyncTaskThread.execAsync([]{
throw operation_failed();
return Void();
}));
ASSERT(false);
} catch (Error &e) {
ASSERT_EQ(e.code(), error_code_operation_failed);
}
return Void();
}

View File

@ -48,7 +48,7 @@ public:
};
class AsyncTaskThread {
ThreadSafeQueue<std::shared_ptr<IAsyncTask>> queue;
ThreadSafeQueue<std::unique_ptr<IAsyncTask>> queue;
std::condition_variable cv;
std::mutex m;
std::thread thread;
@ -60,7 +60,7 @@ class AsyncTaskThread {
bool wakeUp = false;
{
std::lock_guard<std::mutex> g(m);
wakeUp = queue.push(std::make_shared<AsyncTask<F>>(func));
wakeUp = queue.push(std::make_unique<AsyncTask<F>>(func));
}
if (wakeUp) {
cv.notify_one();
@ -88,6 +88,7 @@ public:
auto funcResult = func();
onMainThreadVoid([promise, funcResult] { promise.send(funcResult); }, nullptr, priority);
} catch (Error& e) {
TraceEvent("ErrorExecutingAsyncTask").error(e);
onMainThreadVoid([promise, e] { promise.sendError(e); }, nullptr, priority);
}
});

View File

@ -717,11 +717,22 @@ protected:
template <>
inline Tuple Codec<Reference<IBackupContainer>>::pack(Reference<IBackupContainer> const& bc) {
return Tuple().append(StringRef(bc->getURL()));
Tuple tuple;
tuple.append(StringRef(bc->getURL()));
if (bc->getEncryptionKeyFileName().present()) {
tuple.append(bc->getEncryptionKeyFileName().get());
}
return tuple;
}
template <>
inline Reference<IBackupContainer> Codec<Reference<IBackupContainer>>::unpack(Tuple const& val) {
return IBackupContainer::openContainer(val.getString(0).toString());
ASSERT(val.size() == 1 || val.size() == 2);
auto url = val.getString(0).toString();
Optional<std::string> encryptionKeyFileName;
if (val.size() == 2) {
encryptionKeyFileName = val.getString(1).toString();
}
return IBackupContainer::openContainer(url, encryptionKeyFileName);
}
class BackupConfig : public KeyBackedConfig {

View File

@ -330,7 +330,7 @@ void decodeBackupLogValue(Arena& arena,
}
} else {
Version ver = key_version->rangeContaining(logValue.param1).value();
//TraceEvent("ApplyMutation").detail("LogValue", logValue.toString()).detail("Version", version).detail("Ver", ver).detail("Apply", version > ver && ver != invalidVersion);
//TraceEvent("ApplyMutation").detail("LogValue", logValue).detail("Version", version).detail("Ver", ver).detail("Apply", version > ver && ver != invalidVersion);
if (version > ver && ver != invalidVersion) {
if (removePrefix.size()) {
logValue.param1 = logValue.param1.removePrefix(removePrefix);

View File

@ -284,11 +284,11 @@ Reference<IBackupContainer> IBackupContainer::openContainer(const std::string& u
#ifdef BUILD_AZURE_BACKUP
else if (u.startsWith("azure://"_sr)) {
u.eat("azure://"_sr);
auto address = NetworkAddress::parse(u.eat("/"_sr).toString());
auto accountName = u.eat("@"_sr).toString();
auto endpoint = u.eat("/"_sr).toString();
auto containerName = u.eat("/"_sr).toString();
auto accountName = u.eat("/"_sr).toString();
r = makeReference<BackupContainerAzureBlobStore>(
address, containerName, accountName, encryptionKeyFileName);
endpoint, accountName, containerName, encryptionKeyFileName);
}
#endif
else {
@ -296,6 +296,7 @@ Reference<IBackupContainer> IBackupContainer::openContainer(const std::string& u
throw backup_invalid_url();
}
r->encryptionKeyFileName = encryptionKeyFileName;
r->URL = url;
return r;
} catch (Error& e) {

View File

@ -298,12 +298,14 @@ public:
static std::vector<std::string> getURLFormats();
static Future<std::vector<std::string>> listContainers(const std::string& baseURL);
std::string getURL() const { return URL; }
std::string const &getURL() const { return URL; }
Optional<std::string> const &getEncryptionKeyFileName() const { return encryptionKeyFileName; }
static std::string lastOpenError;
private:
std::string URL;
Optional<std::string> encryptionKeyFileName;
};
namespace fileBackup {

View File

@ -20,37 +20,69 @@
#include "fdbclient/BackupContainerAzureBlobStore.h"
#include "fdbrpc/AsyncFileEncrypted.h"
#include <future>
#include "flow/actorcompiler.h" // This must be the last #include.
namespace {
std::string const notFoundErrorCode = "404";
void printAzureError(std::string const& operationName, azure::storage_lite::storage_error const& err) {
printf("(%s) : Error from Azure SDK : %s (%s) : %s",
operationName.c_str(),
err.code_name.c_str(),
err.code.c_str(),
err.message.c_str());
}
template <class T>
T waitAzureFuture(std::future<azure::storage_lite::storage_outcome<T>>&& f, std::string const& operationName) {
auto outcome = f.get();
if (outcome.success()) {
return outcome.response();
} else {
printAzureError(operationName, outcome.error());
throw backup_error();
}
}
} // namespace
class BackupContainerAzureBlobStoreImpl {
public:
using AzureClient = azure::storage_lite::blob_client;
class ReadFile final : public IAsyncFile, ReferenceCounted<ReadFile> {
AsyncTaskThread& asyncTaskThread;
AsyncTaskThread* asyncTaskThread;
std::string containerName;
std::string blobName;
AzureClient* client;
std::shared_ptr<AzureClient> client;
public:
ReadFile(AsyncTaskThread& asyncTaskThread,
const std::string& containerName,
const std::string& blobName,
AzureClient* client)
: asyncTaskThread(asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {}
std::shared_ptr<AzureClient> const& client)
: asyncTaskThread(&asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {}
void addref() override { ReferenceCounted<ReadFile>::addref(); }
void delref() override { ReferenceCounted<ReadFile>::delref(); }
Future<int> read(void* data, int length, int64_t offset) {
return asyncTaskThread.execAsync([client = this->client,
containerName = this->containerName,
blobName = this->blobName,
data,
length,
offset] {
Future<int> read(void* data, int length, int64_t offset) override {
TraceEvent(SevDebug, "BCAzureBlobStoreRead")
.detail("Length", length)
.detail("Offset", offset)
.detail("ContainerName", containerName)
.detail("BlobName", blobName);
return asyncTaskThread->execAsync([client = this->client,
containerName = this->containerName,
blobName = this->blobName,
data,
length,
offset] {
std::ostringstream oss(std::ios::out | std::ios::binary);
client->download_blob_to_stream(containerName, blobName, offset, length, oss);
waitAzureFuture(client->download_blob_to_stream(containerName, blobName, offset, length, oss),
"download_blob_to_stream");
auto str = std::move(oss).str();
memcpy(data, str.c_str(), str.size());
return static_cast<int>(str.size());
@ -61,19 +93,23 @@ public:
Future<Void> truncate(int64_t size) override { throw file_not_writable(); }
Future<Void> sync() override { throw file_not_writable(); }
Future<int64_t> size() const override {
return asyncTaskThread.execAsync([client = this->client,
containerName = this->containerName,
blobName = this->blobName] {
return static_cast<int64_t>(client->get_blob_properties(containerName, blobName).get().response().size);
});
TraceEvent(SevDebug, "BCAzureBlobStoreReadFileSize")
.detail("ContainerName", containerName)
.detail("BlobName", blobName);
return asyncTaskThread->execAsync(
[client = this->client, containerName = this->containerName, blobName = this->blobName] {
auto resp =
waitAzureFuture(client->get_blob_properties(containerName, blobName), "get_blob_properties");
return static_cast<int64_t>(resp.size);
});
}
std::string getFilename() const override { return blobName; }
int64_t debugFD() const override { return 0; }
};
class WriteFile final : public IAsyncFile, ReferenceCounted<WriteFile> {
AsyncTaskThread& asyncTaskThread;
AzureClient* client;
AsyncTaskThread* asyncTaskThread;
std::shared_ptr<AzureClient> client;
std::string containerName;
std::string blobName;
int64_t m_cursor{ 0 };
@ -88,8 +124,8 @@ public:
WriteFile(AsyncTaskThread& asyncTaskThread,
const std::string& containerName,
const std::string& blobName,
AzureClient* client)
: asyncTaskThread(asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {}
std::shared_ptr<AzureClient> const& client)
: asyncTaskThread(&asyncTaskThread), containerName(containerName), blobName(blobName), client(client) {}
void addref() override { ReferenceCounted<WriteFile>::addref(); }
void delref() override { ReferenceCounted<WriteFile>::delref(); }
@ -114,22 +150,33 @@ public:
return Void();
}
Future<Void> sync() override {
TraceEvent(SevDebug, "BCAzureBlobStoreSync")
.detail("Length", buffer.size())
.detail("ContainerName", containerName)
.detail("BlobName", blobName);
auto movedBuffer = std::move(buffer);
buffer.clear();
return asyncTaskThread.execAsync([client = this->client,
containerName = this->containerName,
blobName = this->blobName,
buffer = std::move(movedBuffer)] {
std::istringstream iss(std::move(buffer));
auto resp = client->append_block_from_stream(containerName, blobName, iss).get();
return Void();
});
buffer = {};
if (!movedBuffer.empty()) {
return asyncTaskThread->execAsync([client = this->client,
containerName = this->containerName,
blobName = this->blobName,
buffer = std::move(movedBuffer)] {
std::istringstream iss(std::move(buffer));
waitAzureFuture(client->append_block_from_stream(containerName, blobName, iss),
"append_block_from_stream");
return Void();
});
}
return Void();
}
Future<int64_t> size() const override {
return asyncTaskThread.execAsync(
TraceEvent(SevDebug, "BCAzureBlobStoreSize")
.detail("ContainerName", containerName)
.detail("BlobName", blobName);
return asyncTaskThread->execAsync(
[client = this->client, containerName = this->containerName, blobName = this->blobName] {
auto resp = client->get_blob_properties(containerName, blobName).get().response();
ASSERT(resp.valid()); // TODO: Should instead throw here
auto resp =
waitAzureFuture(client->get_blob_properties(containerName, blobName), "get_blob_properties");
return static_cast<int64_t>(resp.size);
});
}
@ -163,44 +210,53 @@ public:
static bool isDirectory(const std::string& blobName) { return blobName.size() && blobName.back() == '/'; }
// Hack to get around the fact that macros don't work inside actor functions
static Reference<IAsyncFile> encryptFile(Reference<IAsyncFile> const& f, AsyncFileEncrypted::Mode mode) {
Reference<IAsyncFile> result = f;
#if ENCRYPTION_ENABLED
result = makeReference<AsyncFileEncrypted>(result, mode);
#endif
return result;
}
ACTOR static Future<Reference<IAsyncFile>> readFile(BackupContainerAzureBlobStore* self, std::string fileName) {
bool exists = wait(self->blobExists(fileName));
if (!exists) {
throw file_not_found();
}
Reference<IAsyncFile> f =
makeReference<ReadFile>(self->asyncTaskThread, self->containerName, fileName, self->client.get());
#if ENCRYPTION_ENABLED
makeReference<ReadFile>(self->asyncTaskThread, self->containerName, fileName, self->client);
if (self->usesEncryption()) {
f = makeReference<AsyncFileEncrypted>(f, false);
f = encryptFile(f, AsyncFileEncrypted::Mode::READ_ONLY);
}
#endif
return f;
}
ACTOR static Future<Reference<IBackupFile>> writeFile(BackupContainerAzureBlobStore* self, std::string fileName) {
TraceEvent(SevDebug, "BCAzureBlobStoreCreateWriteFile")
.detail("ContainerName", self->containerName)
.detail("FileName", fileName);
wait(self->asyncTaskThread.execAsync(
[client = self->client.get(), containerName = self->containerName, fileName = fileName] {
auto outcome = client->create_append_blob(containerName, fileName).get();
[client = self->client, containerName = self->containerName, fileName = fileName] {
waitAzureFuture(client->create_append_blob(containerName, fileName), "create_append_blob");
return Void();
}));
auto f = makeReference<WriteFile>(self->asyncTaskThread, self->containerName, fileName, self->client.get());
#if ENCRYPTION_ENABLED
Reference<IAsyncFile> f =
makeReference<WriteFile>(self->asyncTaskThread, self->containerName, fileName, self->client);
if (self->usesEncryption()) {
f = makeReference<AsyncFileEncrypted>(f, true);
f = encryptFile(f, AsyncFileEncrypted::Mode::APPEND_ONLY);
}
#endif
return makeReference<BackupFile>(fileName, f);
}
static void listFiles(AzureClient* client,
static void listFiles(std::shared_ptr<AzureClient> const& client,
const std::string& containerName,
const std::string& path,
std::function<bool(std::string const&)> folderPathFilter,
BackupContainerFileSystem::FilesAndSizesT& result) {
auto resp = client->list_blobs_segmented(containerName, "/", "", path).get().response();
auto resp = waitAzureFuture(client->list_blobs_segmented(containerName, "/", "", path), "list_blobs_segmented");
for (const auto& blob : resp.blobs) {
if (isDirectory(blob.name) && folderPathFilter(blob.name)) {
if (isDirectory(blob.name) && (!folderPathFilter || folderPathFilter(blob.name))) {
listFiles(client, containerName, blob.name, folderPathFilter, result);
} else {
result.emplace_back(blob.name, blob.content_length);
@ -214,8 +270,12 @@ public:
BackupContainerFileSystem::FilesAndSizesT files = wait(self->listFiles());
filesToDelete = files.size();
}
wait(self->asyncTaskThread.execAsync([containerName = self->containerName, client = self->client.get()] {
client->delete_container(containerName).wait();
TraceEvent(SevDebug, "BCAzureBlobStoreDeleteContainer")
.detail("FilesToDelete", filesToDelete)
.detail("ContainerName", self->containerName)
.detail("TrackNumDeleted", pNumDeleted != nullptr);
wait(self->asyncTaskThread.execAsync([containerName = self->containerName, client = self->client] {
waitAzureFuture(client->delete_container(containerName), "delete_container");
return Void();
}));
if (pNumDeleted) {
@ -224,36 +284,44 @@ public:
return Void();
}
ACTOR static Future<Void> create(BackupContainerAzureBlobStore* self) {
state Future<Void> f1 =
self->asyncTaskThread.execAsync([containerName = self->containerName, client = self->client.get()] {
client->create_container(containerName).wait();
return Void();
});
state Future<Void> f2 = self->usesEncryption() ? self->encryptionSetupComplete() : Void();
return f1 && f2;
}
};
Future<bool> BackupContainerAzureBlobStore::blobExists(const std::string& fileName) {
return asyncTaskThread.execAsync(
[client = this->client.get(), containerName = this->containerName, fileName = fileName] {
auto resp = client->get_blob_properties(containerName, fileName).get().response();
return resp.valid();
});
TraceEvent(SevDebug, "BCAzureBlobStoreCheckExists")
.detail("FileName", fileName)
.detail("ContainerName", containerName);
return asyncTaskThread.execAsync([client = this->client, containerName = this->containerName, fileName = fileName] {
auto outcome = client->get_blob_properties(containerName, fileName).get();
if (outcome.success()) {
return true;
} else {
auto const& err = outcome.error();
if (err.code == notFoundErrorCode) {
return false;
} else {
printAzureError("get_blob_properties", err);
throw backup_error();
}
}
});
}
BackupContainerAzureBlobStore::BackupContainerAzureBlobStore(const NetworkAddress& address,
BackupContainerAzureBlobStore::BackupContainerAzureBlobStore(const std::string& endpoint,
const std::string& accountName,
const std::string& containerName,
const Optional<std::string>& encryptionKeyFileName)
: containerName(containerName) {
setEncryptionKey(encryptionKeyFileName);
std::string accountKey = std::getenv("AZURE_KEY");
const char* _accountKey = std::getenv("AZURE_KEY");
if (!_accountKey) {
TraceEvent(SevError, "EnvironmentVariableNotFound").detail("EnvVariable", "AZURE_KEY");
// TODO: More descriptive error?
throw backup_error();
}
std::string accountKey = _accountKey;
auto credential = std::make_shared<azure::storage_lite::shared_key_credential>(accountName, accountKey);
auto storageAccount = std::make_shared<azure::storage_lite::storage_account>(
accountName, credential, false, format("http://%s/%s", address.toString().c_str(), accountName.c_str()));
accountName, credential, true, format("https://%s", endpoint.c_str()));
client = std::make_unique<AzureClient>(storageAccount, 1);
}
@ -265,12 +333,30 @@ void BackupContainerAzureBlobStore::delref() {
}
Future<Void> BackupContainerAzureBlobStore::create() {
return BackupContainerAzureBlobStoreImpl::create(this);
TraceEvent(SevDebug, "BCAzureBlobStoreCreateContainer").detail("ContainerName", containerName);
Future<Void> createContainerFuture =
asyncTaskThread.execAsync([containerName = this->containerName, client = this->client] {
waitAzureFuture(client->create_container(containerName), "create_container");
return Void();
});
Future<Void> encryptionSetupFuture = usesEncryption() ? encryptionSetupComplete() : Void();
return createContainerFuture && encryptionSetupFuture;
}
Future<bool> BackupContainerAzureBlobStore::exists() {
return asyncTaskThread.execAsync([containerName = this->containerName, client = this->client.get()] {
auto resp = client->get_container_properties(containerName).get().response();
return resp.valid();
TraceEvent(SevDebug, "BCAzureBlobStoreCheckContainerExists").detail("ContainerName", containerName);
return asyncTaskThread.execAsync([containerName = this->containerName, client = this->client] {
auto outcome = client->get_container_properties(containerName).get();
if (outcome.success()) {
return true;
} else {
auto const& err = outcome.error();
if (err.code == notFoundErrorCode) {
return false;
} else {
printAzureError("got_container_properties", err);
throw backup_error();
}
}
});
}
@ -285,22 +371,23 @@ Future<Reference<IBackupFile>> BackupContainerAzureBlobStore::writeFile(const st
Future<BackupContainerFileSystem::FilesAndSizesT> BackupContainerAzureBlobStore::listFiles(
const std::string& path,
std::function<bool(std::string const&)> folderPathFilter) {
return asyncTaskThread.execAsync([client = this->client.get(),
containerName = this->containerName,
path = path,
folderPathFilter = folderPathFilter] {
FilesAndSizesT result;
BackupContainerAzureBlobStoreImpl::listFiles(client, containerName, path, folderPathFilter, result);
return result;
});
TraceEvent(SevDebug, "BCAzureBlobStoreListFiles").detail("ContainerName", containerName).detail("Path", path);
return asyncTaskThread.execAsync(
[client = this->client, containerName = this->containerName, path = path, folderPathFilter = folderPathFilter] {
FilesAndSizesT result;
BackupContainerAzureBlobStoreImpl::listFiles(client, containerName, path, folderPathFilter, result);
return result;
});
}
Future<Void> BackupContainerAzureBlobStore::deleteFile(const std::string& fileName) {
return asyncTaskThread.execAsync(
[containerName = this->containerName, fileName = fileName, client = client.get()]() {
client->delete_blob(containerName, fileName).wait();
return Void();
});
TraceEvent(SevDebug, "BCAzureBlobStoreDeleteFile")
.detail("ContainerName", containerName)
.detail("FileName", fileName);
return asyncTaskThread.execAsync([containerName = this->containerName, fileName = fileName, client = client]() {
client->delete_blob(containerName, fileName).wait();
return Void();
});
}
Future<Void> BackupContainerAzureBlobStore::deleteContainer(int* pNumDeleted) {
@ -313,5 +400,5 @@ Future<std::vector<std::string>> BackupContainerAzureBlobStore::listURLs(const s
}
std::string BackupContainerAzureBlobStore::getURLFormat() {
return "azure://<ip>:<port>/<accountname>/<container>/<path_to_file>";
return "azure://<accountname>@<endpoint>/<container>/";
}

View File

@ -33,7 +33,7 @@ class BackupContainerAzureBlobStore final : public BackupContainerFileSystem,
ReferenceCounted<BackupContainerAzureBlobStore> {
using AzureClient = azure::storage_lite::blob_client;
std::unique_ptr<AzureClient> client;
std::shared_ptr<AzureClient> client;
std::string containerName;
AsyncTaskThread asyncTaskThread;
@ -42,7 +42,7 @@ class BackupContainerAzureBlobStore final : public BackupContainerFileSystem,
friend class BackupContainerAzureBlobStoreImpl;
public:
BackupContainerAzureBlobStore(const NetworkAddress& address,
BackupContainerAzureBlobStore(const std::string& endpoint,
const std::string& accountName,
const std::string& containerName,
const Optional<std::string>& encryptionKeyFileName);

View File

@ -22,6 +22,9 @@
#ifndef FDBCLIENT_CLIENTLOGEVENTS_H
#define FDBCLIENT_CLIENTLOGEVENTS_H
#include "fdbclient/FDBTypes.h"
#include "fdbclient/CommitProxyInterface.h"
namespace FdbClientLogEvents {
enum class EventType {
GET_VERSION_LATENCY = 0,
@ -252,7 +255,7 @@ struct EventCommit : public Event {
.setMaxEventLength(-1)
.detail("TransactionID", id)
.setMaxFieldLength(maxFieldLength)
.detail("Mutation", mutation.toString());
.detail("Mutation", mutation);
}
TraceEvent("TransactionTrace_Commit")
@ -316,7 +319,7 @@ struct EventCommit_V2 : public Event {
.setMaxEventLength(-1)
.detail("TransactionID", id)
.setMaxFieldLength(maxFieldLength)
.detail("Mutation", mutation.toString());
.detail("Mutation", mutation);
}
TraceEvent("TransactionTrace_Commit")
@ -430,7 +433,7 @@ struct EventCommitError : public Event {
.setMaxEventLength(-1)
.detail("TransactionID", id)
.setMaxFieldLength(maxFieldLength)
.detail("Mutation", mutation.toString());
.detail("Mutation", mutation);
}
TraceEvent("TransactionTrace_CommitError").detail("TransactionID", id).detail("ErrCode", errCode);

View File

@ -2355,7 +2355,7 @@ std::string getDRMutationStreamId(StatusObjectReader statusObj, const char* cont
}
}
}
TraceEvent(SevWarn, "DBA_TagNotPresentInStatus").detail("Tag", tagName.toString()).detail("Context", context);
TraceEvent(SevWarn, "DBA_TagNotPresentInStatus").detail("Tag", tagName).detail("Context", context);
throw backup_error();
} catch (std::runtime_error& e) {
TraceEvent(SevWarn, "DBA_GetDRMutationStreamIdFail").detail("Error", e.what());

View File

@ -77,7 +77,7 @@ void GlobalConfig::trigger(KeyRef key, std::function<void(std::optional<std::any
}
void GlobalConfig::insert(KeyRef key, ValueRef value) {
TraceEvent(SevInfo, "GlobalConfig_Insert").detail("Key", key).detail("Value", value);
// TraceEvent(SevInfo, "GlobalConfig_Insert").detail("Key", key).detail("Value", value);
data.erase(key);
Arena arena(key.expectedSize() + value.expectedSize());
@ -113,7 +113,7 @@ void GlobalConfig::erase(Key key) {
}
void GlobalConfig::erase(KeyRangeRef range) {
TraceEvent(SevInfo, "GlobalConfig_Erase").detail("Range", range);
// TraceEvent(SevInfo, "GlobalConfig_Erase").detail("Range", range);
auto it = data.begin();
while (it != data.end()) {
if (range.contains(it->first)) {
@ -167,7 +167,7 @@ ACTOR Future<Void> GlobalConfig::migrate(GlobalConfig* self) {
// attempt this migration at the same time, sometimes resulting in
// aborts due to conflicts. Purposefully avoid retrying, making this
// migration best-effort.
TraceEvent(SevInfo, "GlobalConfigMigrationError").detail("What", e.what());
TraceEvent(SevInfo, "GlobalConfig_MigrationError").detail("What", e.what());
}
return Void();
@ -176,7 +176,7 @@ ACTOR Future<Void> GlobalConfig::migrate(GlobalConfig* self) {
// Updates local copy of global configuration by reading the entire key-range
// from storage.
ACTOR Future<Void> GlobalConfig::refresh(GlobalConfig* self) {
TraceEvent trace(SevInfo, "GlobalConfig_Refresh");
// TraceEvent trace(SevInfo, "GlobalConfig_Refresh");
self->erase(KeyRangeRef(""_sr, "\xff"_sr));
Transaction tr(self->cx);

View File

@ -108,7 +108,6 @@ public:
// the key.
template <typename T, typename std::enable_if<std::is_arithmetic<T>{}, bool>::type = true>
const T get(KeyRef name, T defaultVal) {
TraceEvent(SevInfo, "GlobalConfig_Get").detail("Key", name);
try {
auto configValue = get(name);
if (configValue.isValid()) {

View File

@ -4093,8 +4093,7 @@ SpanID generateSpanID(int transactionTracingEnabled) {
}
}
Transaction::Transaction()
: info(TaskPriority::DefaultEndpoint, generateSpanID(true)), span(info.spanID, "Transaction"_loc) {}
Transaction::Transaction() = default;
Transaction::Transaction(Database const& cx)
: info(cx->taskID, generateSpanID(cx->transactionTracingEnabled)), numErrors(0), options(cx),
@ -6340,7 +6339,7 @@ void enableClientInfoLogging() {
}
ACTOR Future<Void> snapCreate(Database cx, Standalone<StringRef> snapCmd, UID snapUID) {
TraceEvent("SnapCreateEnter").detail("SnapCmd", snapCmd.toString()).detail("UID", snapUID);
TraceEvent("SnapCreateEnter").detail("SnapCmd", snapCmd).detail("UID", snapUID);
try {
loop {
choose {
@ -6350,7 +6349,7 @@ ACTOR Future<Void> snapCreate(Database cx, Standalone<StringRef> snapCmd, UID sn
ProxySnapRequest(snapCmd, snapUID, snapUID),
cx->taskID,
AtMostOnce::True))) {
TraceEvent("SnapCreateExit").detail("SnapCmd", snapCmd.toString()).detail("UID", snapUID);
TraceEvent("SnapCreateExit").detail("SnapCmd", snapCmd).detail("UID", snapUID);
return Void();
}
}

View File

@ -180,6 +180,9 @@ struct TransactionInfo {
// prefix/<key2> : '0' - any keys equal or larger than this key are (definitely) not conflicting keys
std::shared_ptr<CoalescedKeyRangeMap<Value>> conflictingKeys;
// Only available so that Transaction can have a default constructor, for use in state variables
TransactionInfo() : taskID(), spanID(), useProvisionalProxies() {}
explicit TransactionInfo(TaskPriority taskID, SpanID spanID)
: taskID(taskID), spanID(spanID), useProvisionalProxies(false) {}
};

View File

@ -71,7 +71,7 @@ class SimpleConfigTransactionImpl {
if (reply.value.present()) {
return reply.value.get().toValue();
} else {
return {};
return Optional<Value>{};
}
}

View File

@ -476,10 +476,10 @@ void SpecialKeySpace::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& r
auto begin = writeImpls[range.begin];
auto end = writeImpls.rangeContainingKeyBefore(range.end)->value();
if (begin != end) {
TraceEvent(SevDebug, "SpecialKeySpaceCrossModuleClear").detail("Range", range.toString());
TraceEvent(SevDebug, "SpecialKeySpaceCrossModuleClear").detail("Range", range);
throw special_keys_cross_module_clear(); // ban cross module clear
} else if (begin == nullptr) {
TraceEvent(SevDebug, "SpecialKeySpaceNoWriteModuleFound").detail("Range", range.toString());
TraceEvent(SevDebug, "SpecialKeySpaceNoWriteModuleFound").detail("Range", range);
throw special_keys_no_write_module_found();
}
return begin->clear(ryw, range);

View File

@ -131,8 +131,8 @@ void applyMetadataMutations(SpanID const& spanContext,
MutationRef privatized = m;
privatized.param1 = m.param1.withPrefix(systemKeys.begin, arena);
TraceEvent(SevDebug, "SendingPrivateMutation", dbgid)
.detail("Original", m.toString())
.detail("Privatized", privatized.toString())
.detail("Original", m)
.detail("Privatized", privatized)
.detail("Server", serverKeysDecodeServer(m.param1))
.detail("TagKey", serverTagKeyFor(serverKeysDecodeServer(m.param1)))
.detail("Tag", tag.toString());
@ -218,8 +218,8 @@ void applyMetadataMutations(SpanID const& spanContext,
(!m.param1.startsWith(failedLocalityPrefix) && m.param1 != failedLocalityVersionKey)) {
auto t = txnStateStore->readValue(m.param1).get();
TraceEvent("MutationRequiresRestart", dbgid)
.detail("M", m.toString())
.detail("PrevValue", t.present() ? t.get() : LiteralStringRef("(none)"))
.detail("M", m)
.detail("PrevValue", t.orDefault("(none)"_sr))
.detail("ToCommit", toCommit != nullptr);
confChange = true;
}
@ -431,7 +431,7 @@ void applyMetadataMutations(SpanID const& spanContext,
txnStateStore->clear(range & configKeys);
if (!excludedServersKeys.contains(range) && !failedServersKeys.contains(range) &&
!excludedLocalityKeys.contains(range) && !failedLocalityKeys.contains(range)) {
TraceEvent("MutationRequiresRestart", dbgid).detail("M", m.toString());
TraceEvent("MutationRequiresRestart", dbgid).detail("M", m);
confChange = true;
}
}

View File

@ -744,7 +744,6 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
DEBUG_MUTATION("addMutation", message.version.version, m)
.detail("Version", message.version.toString())
.detail("Mutation", m)
.detail("KCV", self->minKnownCommittedVersion)
.detail("SavedVersion", self->savedVersion);

View File

@ -399,6 +399,14 @@ ACTOR Future<Void> releaseResolvingAfter(ProxyCommitData* self, Future<Void> rel
return Void();
}
ACTOR static Future<ResolveTransactionBatchReply> trackResolutionMetrics(Reference<Histogram> dist,
Future<ResolveTransactionBatchReply> in) {
state double startTime = now();
ResolveTransactionBatchReply reply = wait(in);
dist->sampleSeconds(now() - startTime);
return reply;
}
namespace CommitBatch {
struct CommitBatchContext {
@ -579,6 +587,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
TEST(pProxyCommitData->latestLocalCommitBatchResolving.get() < localBatchNumber - 1); // Wait for local batch
wait(pProxyCommitData->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber - 1));
double queuingDelay = g_network->now() - timeStart;
pProxyCommitData->stats.commitBatchQueuingDist->sampleSeconds(queuingDelay);
if ((queuingDelay > (double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS / SERVER_KNOBS->VERSIONS_PER_SECOND ||
(g_network->isSimulated() && BUGGIFY_WITH_PROB(0.01))) &&
SERVER_KNOBS->PROXY_REJECT_BATCH_QUEUED_TOO_LONG && canReject(trs)) {
@ -619,6 +628,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
pProxyCommitData->commitVersionRequestNumber++,
pProxyCommitData->mostRecentProcessedRequestNumber,
pProxyCommitData->dbgid);
state double beforeGettingCommitVersion = now();
GetCommitVersionReply versionReply = wait(brokenPromiseToNever(
pProxyCommitData->master.getCommitVersion.getReply(req, TaskPriority::ProxyMasterVersionReply)));
@ -626,6 +636,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
pProxyCommitData->stats.txnCommitVersionAssigned += trs.size();
pProxyCommitData->stats.lastCommitVersionAssigned = versionReply.version;
pProxyCommitData->stats.getCommitVersionDist->sampleSeconds(now() - beforeGettingCommitVersion);
self->commitVersion = versionReply.version;
self->prevVersion = versionReply.prevVersion;
@ -646,6 +657,7 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
}
ACTOR Future<Void> getResolution(CommitBatchContext* self) {
state double resolutionStart = now();
// Sending these requests is the fuzzy border between phase 1 and phase 2; it could conceivably overlap with
// resolution processing but is still using CPU
ProxyCommitData* pProxyCommitData = self->pProxyCommitData;
@ -674,8 +686,9 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
std::vector<Future<ResolveTransactionBatchReply>> replies;
for (int r = 0; r < pProxyCommitData->resolvers.size(); r++) {
requests.requests[r].debugID = self->debugID;
replies.push_back(brokenPromiseToNever(
pProxyCommitData->resolvers[r].resolve.getReply(requests.requests[r], TaskPriority::ProxyResolverReply)));
replies.push_back(trackResolutionMetrics(pProxyCommitData->stats.resolverDist[r],
brokenPromiseToNever(pProxyCommitData->resolvers[r].resolve.getReply(
requests.requests[r], TaskPriority::ProxyResolverReply))));
}
self->transactionResolverMap.swap(requests.transactionResolverMap);
@ -700,6 +713,7 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
std::vector<ResolveTransactionBatchReply> resolutionResp = wait(getAll(replies));
self->resolution.swap(*const_cast<std::vector<ResolveTransactionBatchReply>*>(&resolutionResp));
self->pProxyCommitData->stats.resolutionDist->sampleSeconds(now() - resolutionStart);
if (self->debugID.present()) {
g_traceBatch.addEvent(
"CommitDebug", self->debugID.get().first(), "CommitProxyServer.commitBatch.AfterResolution");
@ -939,10 +953,8 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
pProxyCommitData->singleKeyMutationEvent->log();
}
DEBUG_MUTATION("ProxyCommit", self->commitVersion, m)
.detail("Dbgid", pProxyCommitData->dbgid)
.detail("To", tags)
.detail("Mutation", m);
DEBUG_MUTATION("ProxyCommit", self->commitVersion, m, pProxyCommitData->dbgid)
.detail("To", tags);
self->toCommit.addTags(tags);
if (pProxyCommitData->cacheInfo[m.param1]) {
self->toCommit.addTag(cacheTag);
@ -955,10 +967,8 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
++firstRange;
if (firstRange == ranges.end()) {
// Fast path
DEBUG_MUTATION("ProxyCommit", self->commitVersion, m)
.detail("Dbgid", pProxyCommitData->dbgid)
.detail("To", ranges.begin().value().tags)
.detail("Mutation", m);
DEBUG_MUTATION("ProxyCommit", self->commitVersion, m, pProxyCommitData->dbgid)
.detail("To", ranges.begin().value().tags);
ranges.begin().value().populateTags();
self->toCommit.addTags(ranges.begin().value().tags);
@ -993,10 +1003,8 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
trCost->get().clearIdxCosts.pop_front();
}
}
DEBUG_MUTATION("ProxyCommit", self->commitVersion, m)
.detail("Dbgid", pProxyCommitData->dbgid)
.detail("To", allSources)
.detail("Mutation", m);
DEBUG_MUTATION("ProxyCommit", self->commitVersion, m, pProxyCommitData->dbgid)
.detail("To", allSources);
self->toCommit.addTags(allSources);
}
@ -1055,6 +1063,7 @@ ACTOR Future<Void> assignMutationsToStorageServers(CommitBatchContext* self) {
}
ACTOR Future<Void> postResolution(CommitBatchContext* self) {
state double postResolutionStart = now();
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
state std::vector<CommitTransactionRequest>& trs = self->trs;
state const int64_t localBatchNumber = self->localBatchNumber;
@ -1064,6 +1073,8 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
bool queuedCommits = pProxyCommitData->latestLocalCommitBatchLogging.get() < localBatchNumber - 1;
TEST(queuedCommits); // Queuing post-resolution commit processing
wait(pProxyCommitData->latestLocalCommitBatchLogging.whenAtLeast(localBatchNumber - 1));
state double postResolutionQueuing = now();
pProxyCommitData->stats.postResolutionDist->sampleSeconds(postResolutionQueuing - postResolutionStart);
wait(yield(TaskPriority::ProxyCommitYield1));
self->computeStart = g_network->timer();
@ -1212,10 +1223,12 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
1e9 * pProxyCommitData->commitComputePerOperation[self->latencyBucket]);
}
pProxyCommitData->stats.processingMutationDist->sampleSeconds(now() - postResolutionQueuing);
return Void();
}
ACTOR Future<Void> transactionLogging(CommitBatchContext* self) {
state double tLoggingStart = now();
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
state Span span("MP:transactionLogging"_loc, self->span.context);
@ -1249,11 +1262,12 @@ ACTOR Future<Void> transactionLogging(CommitBatchContext* self) {
pProxyCommitData->txsPopVersions.emplace_back(self->commitVersion, self->msg.popTo);
}
pProxyCommitData->logSystem->popTxs(self->msg.popTo);
pProxyCommitData->stats.tlogLoggingDist->sampleSeconds(now() - tLoggingStart);
return Void();
}
ACTOR Future<Void> reply(CommitBatchContext* self) {
state double replyStart = now();
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
state Span span("MP:reply"_loc, self->span.context);
@ -1385,7 +1399,7 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
pProxyCommitData->commitBatchesMemBytesCount -= self->currentBatchMemBytesCount;
ASSERT_ABORT(pProxyCommitData->commitBatchesMemBytesCount >= 0);
wait(self->releaseFuture);
pProxyCommitData->stats.replyCommitDist->sampleSeconds(now() - replyStart);
return Void();
}
@ -1523,7 +1537,7 @@ ACTOR static Future<Void> rejoinServer(CommitProxyInterface proxy, ProxyCommitDa
// We can't respond to these requests until we have valid txnStateStore
wait(commitData->validState.getFuture());
TraceEvent("ProxyReadyForReads", proxy.id());
TraceEvent("ProxyReadyForReads", proxy.id()).log();
loop {
GetStorageServerRejoinInfoRequest req = waitNext(proxy.getStorageServerRejoinInfo.getFuture());
@ -1856,7 +1870,10 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
commitData.resolvers = commitData.db->get().resolvers;
ASSERT(commitData.resolvers.size() != 0);
for (int i = 0; i < commitData.resolvers.size(); ++i) {
commitData.stats.resolverDist.push_back(Histogram::getHistogram(
LiteralStringRef("CommitProxy"), "ToResolver_" + commitData.resolvers[i].id().toString(), Histogram::Unit::microseconds));
}
auto rs = commitData.keyResolvers.modify(allKeys);
for (auto r = rs.begin(); r != rs.end(); ++r)
r->value().emplace_back(0, 0);

View File

@ -57,6 +57,8 @@ struct GrvProxyStats {
Deque<int> requestBuckets;
double lastBucketBegin;
double bucketInterval;
Reference<Histogram> grvConfirmEpochLiveDist;
Reference<Histogram> grvGetCommittedVersionRpcDist;
void updateRequestBuckets() {
while (now() - lastBucketBegin > bucketInterval) {
@ -112,7 +114,13 @@ struct GrvProxyStats {
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
recentRequests(0), lastBucketBegin(now()),
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE / FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS) {
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE / FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
grvConfirmEpochLiveDist(Histogram::getHistogram(LiteralStringRef("GrvProxy"),
LiteralStringRef("GrvConfirmEpochLive"),
Histogram::Unit::microseconds)),
grvGetCommittedVersionRpcDist(Histogram::getHistogram(LiteralStringRef("GrvProxy"),
LiteralStringRef("GrvGetCommittedVersionRpc"),
Histogram::Unit::microseconds)) {
// The rate at which the limit(budget) is allowed to grow.
specialCounter(cc, "SystemGRVQueueSize", [this]() { return this->systemGRVQueueSize; });
specialCounter(cc, "DefaultGRVQueueSize", [this]() { return this->defaultGRVQueueSize; });
@ -526,6 +534,8 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan,
// and no other proxy could have already committed anything without first ending the epoch
state Span span("GP:getLiveCommittedVersion"_loc, parentSpan);
++grvProxyData->stats.txnStartBatch;
state double grvStart = now();
state Future<GetRawCommittedVersionReply> replyFromMasterFuture;
replyFromMasterFuture = grvProxyData->master.getLiveCommittedVersion.getReply(
GetRawCommittedVersionRequest(span.context, debugID), TaskPriority::GetLiveCommittedVersionReply);
@ -537,6 +547,8 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan,
wait(grvProxyData->lastCommitTime.whenAtLeast(now() - SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION));
}
state double grvConfirmEpochLive = now();
grvProxyData->stats.grvConfirmEpochLiveDist->sampleSeconds(grvConfirmEpochLive - grvStart);
if (debugID.present()) {
g_traceBatch.addEvent(
"TransactionDebug", debugID.get().first(), "GrvProxyServer.getLiveCommittedVersion.confirmEpochLive");
@ -546,6 +558,7 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan,
grvProxyData->minKnownCommittedVersion =
std::max(grvProxyData->minKnownCommittedVersion, repFromMaster.minKnownCommittedVersion);
grvProxyData->stats.grvGetCommittedVersionRpcDist->sampleSeconds(now() - grvConfirmEpochLive);
GetReadVersionReply rep;
rep.version = repFromMaster.version;
rep.locked = repFromMaster.locked;

View File

@ -31,6 +31,7 @@
#include "fdbserver/MutationTracking.h"
#include "flow/Arena.h"
#include "flow/Error.h"
#include "flow/Histogram.h"
#include "flow/IndexedSet.h"
#include "flow/Knobs.h"
#include "fdbrpc/ReplicationPolicy.h"
@ -57,6 +58,7 @@ public:
std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> logRouters;
std::vector<Reference<AsyncVar<OptionalInterface<BackupInterface>>>> backupWorkers;
std::vector<Reference<ConnectionResetInfo>> connectionResetTrackers;
std::vector<Reference<Histogram>> tlogPushDistTrackers;
int32_t tLogWriteAntiQuorum;
int32_t tLogReplicationFactor;
std::vector<LocalityData> tLogLocalities; // Stores the localities of the log servers

View File

@ -133,8 +133,7 @@ void ILogSystem::ServerPeekCursor::nextMessage() {
}
messageAndTags.loadFromArena(&rd, &messageVersion.sub);
DEBUG_TAGS_AND_MESSAGE("ServerPeekCursor", messageVersion.version, messageAndTags.getRawMessage())
.detail("CursorID", this->randomID);
DEBUG_TAGS_AND_MESSAGE("ServerPeekCursor", messageVersion.version, messageAndTags.getRawMessage(), this->randomID);
// Rewind and consume the header so that reader() starts from the message.
rd.rewind();
rd.readBytes(messageAndTags.getHeaderSize());

View File

@ -18,6 +18,7 @@
* limitations under the License.
*/
#include <algorithm>
#include <vector>
#include "fdbserver/MutationTracking.h"
#include "fdbserver/LogProtocolMessage.h"
@ -27,43 +28,35 @@
#error "You cannot use mutation tracking in a clean/release build."
#endif
// Track up to 2 keys in simulation via enabling MUTATION_TRACKING_ENABLED and setting the keys here.
StringRef debugKey = LiteralStringRef("");
StringRef debugKey2 = LiteralStringRef("\xff\xff\xff\xff");
// Track any of these keys in simulation via enabling MUTATION_TRACKING_ENABLED and setting the keys here.
std::vector<KeyRef> debugKeys = { ""_sr, "\xff\xff"_sr };
TraceEvent debugMutationEnabled(const char* context, Version version, MutationRef const& mutation) {
if ((mutation.type == mutation.ClearRange || mutation.type == mutation.DebugKeyRange) &&
((mutation.param1 <= debugKey && mutation.param2 > debugKey) ||
(mutation.param1 <= debugKey2 && mutation.param2 > debugKey2))) {
TraceEvent event("MutationTracking");
event.detail("At", context)
.detail("Version", version)
.detail("MutationType", typeString[mutation.type])
.detail("KeyBegin", mutation.param1)
.detail("KeyEnd", mutation.param2);
TraceEvent debugMutationEnabled(const char* context, Version version, MutationRef const& mutation, UID id) {
if (std::any_of(debugKeys.begin(), debugKeys.end(), [&mutation](const KeyRef& debugKey) {
return ((mutation.type == mutation.ClearRange || mutation.type == mutation.DebugKeyRange) &&
mutation.param1 <= debugKey && mutation.param2 > debugKey) ||
mutation.param1 == debugKey;
})) {
TraceEvent event("MutationTracking", id);
event.detail("At", context).detail("Version", version).detail("Mutation", mutation);
return event;
} else if (mutation.param1 == debugKey || mutation.param1 == debugKey2) {
TraceEvent event("MutationTracking");
event.detail("At", context)
.detail("Version", version)
.detail("MutationType", typeString[mutation.type])
.detail("Key", mutation.param1)
.detail("Value", mutation.param2);
return event;
} else {
return TraceEvent();
}
return TraceEvent();
}
TraceEvent debugKeyRangeEnabled(const char* context, Version version, KeyRangeRef const& keys) {
if (keys.contains(debugKey) || keys.contains(debugKey2)) {
return debugMutation(context, version, MutationRef(MutationRef::DebugKeyRange, keys.begin, keys.end));
} else {
return TraceEvent();
TraceEvent debugKeyRangeEnabled(const char* context, Version version, KeyRangeRef const& keys, UID id) {
if (std::any_of(
debugKeys.begin(), debugKeys.end(), [&keys](const KeyRef& debugKey) { return keys.contains(debugKey); })) {
TraceEvent event("MutationTracking", id);
event.detail("At", context).detail("Version", version).detail("Mutation", MutationRef(MutationRef::DebugKeyRange, keys.begin, keys.end));
return event;
}
return TraceEvent();
}
TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, StringRef commitBlob) {
TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, StringRef commitBlob, UID id) {
BinaryReader rdr(commitBlob, AssumeVersion(g_network->protocolVersion()));
while (!rdr.empty()) {
if (*(int32_t*)rdr.peekBytes(4) == VERSION_HEADER) {
@ -93,7 +86,7 @@ TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, Stri
MutationRef m;
BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion()));
br >> m;
TraceEvent event = debugMutation(context, version, m);
TraceEvent event = debugMutation(context, version, m, id);
if (event.isEnabled()) {
event.detail("MessageTags", msg.tags);
return event;
@ -104,23 +97,23 @@ TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, Stri
}
#if MUTATION_TRACKING_ENABLED
TraceEvent debugMutation(const char* context, Version version, MutationRef const& mutation) {
return debugMutationEnabled(context, version, mutation);
TraceEvent debugMutation(const char* context, Version version, MutationRef const& mutation, UID id) {
return debugMutationEnabled(context, version, mutation, id);
}
TraceEvent debugKeyRange(const char* context, Version version, KeyRangeRef const& keys) {
return debugKeyRangeEnabled(context, version, keys);
TraceEvent debugKeyRange(const char* context, Version version, KeyRangeRef const& keys, UID id) {
return debugKeyRangeEnabled(context, version, keys, id);
}
TraceEvent debugTagsAndMessage(const char* context, Version version, StringRef commitBlob) {
return debugTagsAndMessageEnabled(context, version, commitBlob);
TraceEvent debugTagsAndMessage(const char* context, Version version, StringRef commitBlob, UID id) {
return debugTagsAndMessageEnabled(context, version, commitBlob, id);
}
#else
TraceEvent debugMutation(const char* context, Version version, MutationRef const& mutation) {
TraceEvent debugMutation(const char* context, Version version, MutationRef const& mutation, UID id) {
return TraceEvent();
}
TraceEvent debugKeyRange(const char* context, Version version, KeyRangeRef const& keys) {
TraceEvent debugKeyRange(const char* context, Version version, KeyRangeRef const& keys, UID id) {
return TraceEvent();
}
TraceEvent debugTagsAndMessage(const char* context, Version version, StringRef commitBlob) {
TraceEvent debugTagsAndMessage(const char* context, Version version, StringRef commitBlob, UID id) {
return TraceEvent();
}
#endif

View File

@ -28,19 +28,18 @@
#define MUTATION_TRACKING_ENABLED 0
// The keys to track are defined in the .cpp file to limit recompilation.
#define DEBUG_MUTATION(context, version, mutation) MUTATION_TRACKING_ENABLED&& debugMutation(context, version, mutation)
TraceEvent debugMutation(const char* context, Version version, MutationRef const& mutation);
#define DEBUG_MUTATION(...) MUTATION_TRACKING_ENABLED && debugMutation(__VA_ARGS__)
TraceEvent debugMutation(const char* context, Version version, MutationRef const& mutation, UID id = UID());
// debugKeyRange and debugTagsAndMessage only log the *first* occurrence of a key in their range/commit.
// TODO: Create a TraceEventGroup that forwards all calls to each element of a vector<TraceEvent>,
// to allow "multiple" TraceEvents to be returned.
#define DEBUG_KEY_RANGE(context, version, keys) MUTATION_TRACKING_ENABLED&& debugKeyRange(context, version, keys)
TraceEvent debugKeyRange(const char* context, Version version, KeyRangeRef const& keys);
#define DEBUG_KEY_RANGE(...) MUTATION_TRACKING_ENABLED && debugKeyRange(__VA_ARGS__)
TraceEvent debugKeyRange(const char* context, Version version, KeyRangeRef const& keys, UID id = UID());
#define DEBUG_TAGS_AND_MESSAGE(context, version, commitBlob) \
MUTATION_TRACKING_ENABLED&& debugTagsAndMessage(context, version, commitBlob)
TraceEvent debugTagsAndMessage(const char* context, Version version, StringRef commitBlob);
#define DEBUG_TAGS_AND_MESSAGE(...) MUTATION_TRACKING_ENABLED && debugTagsAndMessage(__VA_ARGS__)
TraceEvent debugTagsAndMessage(const char* context, Version version, StringRef commitBlob, UID id = UID());
// TODO: Version Tracking. If the bug is in handling a version rather than a key, then it'd be good to be able to log
// each time that version is handled within simulation. A similar set of functions should be implemented.

View File

@ -74,6 +74,15 @@ struct ProxyStats {
int64_t maxComputeNS;
int64_t minComputeNS;
Reference<Histogram> commitBatchQueuingDist;
Reference<Histogram> getCommitVersionDist;
std::vector<Reference<Histogram>> resolverDist;
Reference<Histogram> resolutionDist;
Reference<Histogram> postResolutionDist;
Reference<Histogram> processingMutationDist;
Reference<Histogram> tlogLoggingDist;
Reference<Histogram> replyCommitDist;
int64_t getAndResetMaxCompute() {
int64_t r = maxComputeNS;
maxComputeNS = 0;
@ -113,7 +122,28 @@ struct ProxyStats {
id,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
maxComputeNS(0), minComputeNS(1e12) {
maxComputeNS(0), minComputeNS(1e12),
commitBatchQueuingDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
LiteralStringRef("CommitBatchQueuing"),
Histogram::Unit::microseconds)),
getCommitVersionDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
LiteralStringRef("GetCommitVersion"),
Histogram::Unit::microseconds)),
resolutionDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
LiteralStringRef("Resolution"),
Histogram::Unit::microseconds)),
postResolutionDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
LiteralStringRef("PostResolutionQueuing"),
Histogram::Unit::microseconds)),
processingMutationDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
LiteralStringRef("ProcessingMutation"),
Histogram::Unit::microseconds)),
tlogLoggingDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
LiteralStringRef("TlogLogging"),
Histogram::Unit::microseconds)),
replyCommitDist(Histogram::getHistogram(LiteralStringRef("CommitProxy"),
LiteralStringRef("ReplyCommit"),
Histogram::Unit::microseconds)) {
specialCounter(cc, "LastAssignedCommitVersion", [this]() { return this->lastCommitVersionAssigned; });
specialCounter(cc, "Version", [pVersion]() { return *pVersion; });
specialCounter(cc, "CommittedVersion", [pCommittedVersion]() { return pCommittedVersion->get(); });

View File

@ -63,8 +63,7 @@ struct StagingKey {
ASSERT(m.type != MutationRef::SetVersionstampedKey && m.type != MutationRef::SetVersionstampedValue);
DEBUG_MUTATION("StagingKeyAdd", newVersion.version, m)
.detail("Version", version.toString())
.detail("NewVersion", newVersion.toString())
.detail("Mutation", m);
.detail("NewVersion", newVersion.toString());
if (version == newVersion) {
// This could happen because the same mutation can be present in
// overlapping mutation logs, because new TLogs can copy mutations

View File

@ -892,8 +892,7 @@ ACTOR Future<Void> sendMutationsToApplier(
MutationRef mutation = mvector[splitMutationIndex];
UID applierID = nodeIDs[splitMutationIndex];
DEBUG_MUTATION("RestoreLoaderSplittedMutation", commitVersion.version, mutation)
.detail("Version", commitVersion.toString())
.detail("Mutation", mutation);
.detail("Version", commitVersion.toString());
// CAREFUL: The splitted mutations' lifetime is shorter than the for-loop
// Must use deep copy for splitted mutations
applierVersionedMutationsBuffer[applierID].push_back_deep(
@ -996,7 +995,7 @@ void splitMutation(const KeyRangeMap<UID>& krMap,
VectorRef<MutationRef>& mvector,
Arena& nodeIDs_arena,
VectorRef<UID>& nodeIDs) {
TraceEvent(SevDebug, "FastRestoreSplitMutation").detail("Mutation", m.toString());
TraceEvent(SevDebug, "FastRestoreSplitMutation").detail("Mutation", m);
ASSERT(mvector.empty());
ASSERT(nodeIDs.empty());
auto r = krMap.intersectingRanges(KeyRangeRef(m.param1, m.param2));
@ -1527,15 +1526,15 @@ TEST_CASE("/FastRestore/RestoreLoader/splitMutation") {
KeyRange krange(KeyRangeRef(result.param1, result.param2));
KeyRange krange2(KeyRangeRef(result2.param1, result2.param2));
TraceEvent("Result")
.detail("KeyRange1", krange.toString())
.detail("KeyRange2", krange2.toString())
.detail("KeyRange1", krange)
.detail("KeyRange2", krange2)
.detail("ApplierID1", applierID)
.detail("ApplierID2", applierID2);
if (krange != krange2 || applierID != applierID2) {
TraceEvent(SevError, "IncorrectResult")
.detail("Mutation", mutation.toString())
.detail("KeyRange1", krange.toString())
.detail("KeyRange2", krange2.toString())
.detail("Mutation", mutation)
.detail("KeyRange1", krange)
.detail("KeyRange2", krange2)
.detail("ApplierID1", applierID)
.detail("ApplierID2", applierID2);
}

View File

@ -1582,7 +1582,7 @@ void CacheRangeInfo::addMutation(Version version, MutationRef const& mutation) {
// even allow them on un-assigned range?)
TraceEvent(SevError, "DeliveredToNotAssigned")
.detail("Version", version)
.detail("Mutation", mutation.toString());
.detail("Mutation", mutation);
ASSERT(false); // Mutation delivered to notAssigned cacheRange!
}
}
@ -1719,7 +1719,7 @@ public:
DEBUG_MUTATION("SCUpdateMutation", ver, m);
if (m.param1.startsWith(systemKeys.end)) {
//TraceEvent("SCPrivateData", data->thisServerID).detail("Mutation", m.toString()).detail("Version", ver);
//TraceEvent("SCPrivateData", data->thisServerID).detail("Mutation", m).detail("Version", ver);
applyPrivateCacheData(data, m);
} else {
splitMutation(data, data->cachedRangeMap, m, ver);
@ -2011,7 +2011,7 @@ ACTOR Future<Void> pullAsyncData(StorageCacheData* data) {
}
} else {
TraceEvent(SevError, "DiscardingPeekedData", data->thisServerID)
.detail("Mutation", msg.toString())
.detail("Mutation", msg)
.detail("CursorVersion", cloneCursor2->version().version)
.detail("DataVersion", data->version.get());
}

View File

@ -1423,9 +1423,8 @@ void commitMessages(TLogData* self,
block.reserve(block.arena(), std::max<int64_t>(SERVER_KNOBS->TLOG_MESSAGE_BLOCK_BYTES, msgSize));
}
DEBUG_TAGS_AND_MESSAGE("TLogCommitMessages", version, msg.getRawMessage())
.detail("UID", self->dbgid)
.detail("LogId", logData->logId);
DEBUG_TAGS_AND_MESSAGE("TLogCommitMessages", version, msg.getRawMessage(), logData->logId)
.detail("DebugID", self->dbgid);
block.append(block.arena(), msg.message.begin(), msg.message.size());
for (auto tag : msg.tags) {
if (logData->locality == tagLocalitySatellite) {
@ -1557,8 +1556,7 @@ void peekMessagesFromMemory(Reference<LogData> self,
messages << it->second.toStringRef();
void* data = messages.getData();
DEBUG_TAGS_AND_MESSAGE(
"TLogPeek", currentVersion, StringRef((uint8_t*)data + offset, messages.getLength() - offset))
.detail("LogId", self->logId)
"TLogPeek", currentVersion, StringRef((uint8_t*)data + offset, messages.getLength() - offset), self->logId)
.detail("PeekTag", tag);
}
}
@ -1837,9 +1835,8 @@ Future<Void> tLogPeekMessages(PromiseType replyPromise,
wait(parseMessagesForTag(entry.messages, reqTag, logData->logRouterTags));
for (const StringRef& msg : rawMessages) {
messages.serializeBytes(msg);
DEBUG_TAGS_AND_MESSAGE("TLogPeekFromDisk", entry.version, msg)
.detail("UID", self->dbgid)
.detail("LogId", logData->logId)
DEBUG_TAGS_AND_MESSAGE("TLogPeekFromDisk", entry.version, msg, logData->logId)
.detail("DebugID", self->dbgid)
.detail("PeekTag", reqTag);
}

View File

@ -527,6 +527,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
}
ACTOR static Future<TLogCommitReply> recordPushMetrics(Reference<ConnectionResetInfo> self,
Reference<Histogram> dist,
NetworkAddress addr,
Future<TLogCommitReply> in) {
state double startTime = now();
@ -541,6 +542,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
self->fastReplies++;
}
}
dist->sampleSeconds(now() - startTime);
return t;
}
@ -563,12 +565,21 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
it->connectionResetTrackers.push_back(makeReference<ConnectionResetInfo>());
}
}
if (it->tlogPushDistTrackers.empty()) {
for (int i = 0; i < it->logServers.size(); i++) {
it->tlogPushDistTrackers.push_back(
Histogram::getHistogram("ToTlog_" + it->logServers[i]->get().interf().uniqueID.toString(),
it->logServers[i]->get().interf().address().toString(),
Histogram::Unit::microseconds));
}
}
vector<Future<Void>> tLogCommitResults;
for (int loc = 0; loc < it->logServers.size(); loc++) {
Standalone<StringRef> msg = data.getMessages(location);
data.recordEmptyMessage(location, msg);
allReplies.push_back(recordPushMetrics(
it->connectionResetTrackers[loc],
it->tlogPushDistTrackers[loc],
it->logServers[loc]->get().interf().address(),
it->logServers[loc]->get().interf().commit.getReply(TLogCommitRequest(spanContext,
msg.arena(),

View File

@ -1267,14 +1267,16 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
DEBUG_MUTATION("ShardGetValue",
version,
MutationRef(MutationRef::DebugKey, req.key, v.present() ? v.get() : LiteralStringRef("<null>")));
MutationRef(MutationRef::DebugKey, req.key, v.present() ? v.get() : LiteralStringRef("<null>")),
data->thisServerID);
DEBUG_MUTATION("ShardGetPath",
version,
MutationRef(MutationRef::DebugKey,
req.key,
path == 0 ? LiteralStringRef("0")
: path == 1 ? LiteralStringRef("1")
: LiteralStringRef("2")));
: LiteralStringRef("2")),
data->thisServerID);
/*
StorageMetrics m;
@ -1382,7 +1384,8 @@ ACTOR Future<Version> watchWaitForValueChange(StorageServer* data, SpanID parent
latest,
MutationRef(MutationRef::DebugKey,
metadata->key,
reply.value.present() ? StringRef(reply.value.get()) : LiteralStringRef("<null>")));
reply.value.present() ? StringRef(reply.value.get()) : LiteralStringRef("<null>")),
data->thisServerID);
if (metadata->debugID.present())
g_traceBatch.addEvent(
@ -1954,7 +1957,7 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
throw wrong_shard_server();
}
state int offset1;
state int offset1 = 0;
state int offset2;
state Future<Key> fBegin = req.begin.isFirstGreaterOrEqual()
? Future<Key>(req.begin.getKey())
@ -2120,7 +2123,7 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
throw wrong_shard_server();
}
state int offset1;
state int offset1 = 0;
state int offset2;
state Future<Key> fBegin = req.begin.isFirstGreaterOrEqual()
? Future<Key>(req.begin.getKey())
@ -2835,7 +2838,7 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
wait(data->coreStarted.getFuture() && delay(0));
try {
DEBUG_KEY_RANGE("fetchKeysBegin", data->version.get(), shard->keys);
DEBUG_KEY_RANGE("fetchKeysBegin", data->version.get(), shard->keys, data->thisServerID);
TraceEvent(SevDebug, interval.begin(), data->thisServerID)
.detail("KeyBegin", shard->keys.begin)
@ -2883,7 +2886,6 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
// Get the history
state int debug_getRangeRetries = 0;
state int debug_nextRetryToLog = 1;
state bool isTooOld = false;
// FIXME: The client cache does not notice when servers are added to a team. To read from a local storage server
// we must refresh the cache manually.
@ -2927,9 +2929,13 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
.detail("Last", this_block.size() ? this_block.end()[-1].key : std::string())
.detail("Version", fetchVersion)
.detail("More", this_block.more);
DEBUG_KEY_RANGE("fetchRange", fetchVersion, keys);
for (auto k = this_block.begin(); k != this_block.end(); ++k)
DEBUG_MUTATION("fetch", fetchVersion, MutationRef(MutationRef::SetValue, k->key, k->value));
DEBUG_KEY_RANGE("fetchRange", fetchVersion, keys, data->thisServerID);
if(MUTATION_TRACKING_ENABLED) {
for (auto k = this_block.begin(); k != this_block.end(); ++k) {
DEBUG_MUTATION("fetch", fetchVersion, MutationRef(MutationRef::SetValue, k->key, k->value), data->thisServerID);
}
}
metricReporter.addFetchedBytes(expectedBlockSize, this_block.size());
@ -3092,8 +3098,11 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
for (auto b = batch->changes.begin() + startSize; b != batch->changes.end(); ++b) {
ASSERT(b->version >= checkv);
checkv = b->version;
for (auto& m : b->mutations)
DEBUG_MUTATION("fetchKeysFinalCommitInject", batch->changes[0].version, m);
if(MUTATION_TRACKING_ENABLED) {
for (auto& m : b->mutations) {
DEBUG_MUTATION("fetchKeysFinalCommitInject", batch->changes[0].version, m, data->thisServerID);
}
}
}
shard->updates.clear();
@ -3195,9 +3204,7 @@ void ShardInfo::addMutation(Version version, MutationRef const& mutation) {
else if (readWrite)
readWrite->addMutation(version, mutation, this->keys, readWrite->updateEagerReads);
else if (mutation.type != MutationRef::ClearRange) {
TraceEvent(SevError, "DeliveredToNotAssigned")
.detail("Version", version)
.detail("Mutation", mutation.toString());
TraceEvent(SevError, "DeliveredToNotAssigned").detail("Version", version).detail("Mutation", mutation);
ASSERT(false); // Mutation delivered to notAssigned shard!
}
}
@ -3221,7 +3228,7 @@ void changeServerKeys(StorageServer* data,
validate(data);
// TODO(alexmiller): Figure out how to selectively enable spammy data distribution events.
DEBUG_KEY_RANGE(nowAssigned ? "KeysAssigned" : "KeysUnassigned", version, keys);
DEBUG_KEY_RANGE(nowAssigned ? "KeysAssigned" : "KeysUnassigned", version, keys, data->thisServerID);
bool isDifferent = false;
auto existingShards = data->shards.intersectingRanges(keys);
@ -3329,7 +3336,7 @@ void changeServerKeys(StorageServer* data,
void rollback(StorageServer* data, Version rollbackVersion, Version nextVersion) {
TEST(true); // call to shard rollback
DEBUG_KEY_RANGE("Rollback", rollbackVersion, allKeys);
DEBUG_KEY_RANGE("Rollback", rollbackVersion, allKeys, data->thisServerID);
// We used to do a complicated dance to roll back in MVCC history. It's much simpler, and more testable,
// to simply restart the storage server actor and restore from the persistent disk state, and then roll
@ -3354,8 +3361,7 @@ void StorageServer::addMutation(Version version,
return;
}
expanded = addMutationToMutationLog(mLog, expanded);
DEBUG_MUTATION("applyMutation", version, expanded)
.detail("UID", thisServerID)
DEBUG_MUTATION("applyMutation", version, expanded, thisServerID)
.detail("ShardBegin", shard.begin)
.detail("ShardEnd", shard.end);
applyMutation(this, expanded, mLog.arena(), mutableData());
@ -3426,7 +3432,7 @@ public:
} else {
// FIXME: enable when DEBUG_MUTATION is active
// for(auto m = changes[c].mutations.begin(); m; ++m) {
// DEBUG_MUTATION("SSUpdateMutation", changes[c].version, *m);
// DEBUG_MUTATION("SSUpdateMutation", changes[c].version, *m, data->thisServerID);
//}
splitMutation(data, data->shards, m, ver);
@ -3450,7 +3456,7 @@ private:
bool processedCacheStartKey;
void applyPrivateData(StorageServer* data, MutationRef const& m) {
TraceEvent(SevDebug, "SSPrivateMutation", data->thisServerID).detail("Mutation", m.toString());
TraceEvent(SevDebug, "SSPrivateMutation", data->thisServerID).detail("Mutation", m);
if (processedStartKey) {
// Because of the implementation of the krm* functions, we expect changes in pairs, [begin,end)
@ -3556,7 +3562,7 @@ private:
}
void applyPrivateCacheData(StorageServer* data, MutationRef const& m) {
//TraceEvent(SevDebug, "SSPrivateCacheMutation", data->thisServerID).detail("Mutation", m.toString());
//TraceEvent(SevDebug, "SSPrivateCacheMutation", data->thisServerID).detail("Mutation", m);
if (processedCacheStartKey) {
// Because of the implementation of the krm* functions, we expect changes in pairs, [begin,end)
@ -3690,7 +3696,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
if (LogProtocolMessage::isNextIn(cloneReader)) {
LogProtocolMessage lpm;
cloneReader >> lpm;
//TraceEvent(SevDebug, "SSReadingLPM", data->thisServerID).detail("Mutation", lpm.toString());
//TraceEvent(SevDebug, "SSReadingLPM", data->thisServerID).detail("Mutation", lpm);
dbgLastMessageWasProtocol = true;
cloneCursor1->setProtocolVersion(cloneReader.protocolVersion());
} else if (cloneReader.protocolVersion().hasSpanContext() &&
@ -3700,7 +3706,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
} else {
MutationRef msg;
cloneReader >> msg;
// TraceEvent(SevDebug, "SSReadingLog", data->thisServerID).detail("Mutation", msg.toString());
// TraceEvent(SevDebug, "SSReadingLog", data->thisServerID).detail("Mutation", msg);
if (firstMutation && msg.param1.startsWith(systemKeys.end))
hasPrivateData = true;
@ -3825,7 +3831,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
(msg.param1.size() < 2 || msg.param1[0] != 0xff || msg.param1[1] != 0xff) &&
deterministicRandom()->random01() < 0.05) {
TraceEvent(SevWarnAlways, "TSSInjectDropMutation", data->thisServerID)
.detail("Mutation", msg.toString())
.detail("Mutation", msg)
.detail("Version", cloneCursor2->version().toString());
} else if (data->isTSSInQuarantine() &&
(msg.param1.size() < 2 || msg.param1[0] != 0xff || msg.param1[1] != 0xff)) {
@ -3833,11 +3839,13 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
.suppressFor(10.0)
.detail("Version", cloneCursor2->version().toString());
} else if (ver != invalidVersion) { // This change belongs to a version < minVersion
DEBUG_MUTATION("SSPeek", ver, msg).detail("ServerID", data->thisServerID);
DEBUG_MUTATION("SSPeek", ver, msg, data->thisServerID);
if (ver == 1) {
TraceEvent("SSPeekMutation", data->thisServerID).log();
//TraceEvent("SSPeekMutation", data->thisServerID).log();
// The following trace event may produce a value with special characters
//TraceEvent("SSPeekMutation", data->thisServerID).detail("Mutation", msg.toString()).detail("Version", cloneCursor2->version().toString());
TraceEvent("SSPeekMutation", data->thisServerID)
.detail("Mutation", msg)
.detail("Version", cloneCursor2->version().toString());
}
updater.applyMutation(data, msg, ver);
@ -3868,7 +3876,7 @@ ACTOR Future<Void> update(StorageServer* data, bool* pReceivedUpdate) {
}
} else
TraceEvent(SevError, "DiscardingPeekedData", data->thisServerID)
.detail("Mutation", msg.toString())
.detail("Mutation", msg)
.detail("Version", cloneCursor2->version().toString());
}
}
@ -4145,7 +4153,6 @@ void StorageServerDisk::writeKeyValue(KeyValueRef kv) {
}
void StorageServerDisk::writeMutation(MutationRef mutation) {
// FIXME: DEBUG_MUTATION(debugContext, debugVersion, *m);
if (mutation.type == MutationRef::SetValue) {
storage->set(KeyValueRef(mutation.param1, mutation.param2));
} else if (mutation.type == MutationRef::ClearRange) {
@ -4158,7 +4165,7 @@ void StorageServerDisk::writeMutations(const VectorRef<MutationRef>& mutations,
Version debugVersion,
const char* debugContext) {
for (const auto& m : mutations) {
DEBUG_MUTATION(debugContext, debugVersion, m).detail("UID", data->thisServerID);
DEBUG_MUTATION(debugContext, debugVersion, m, data->thisServerID);
if (m.type == MutationRef::SetValue) {
storage->set(KeyValueRef(m.param1, m.param2));
} else if (m.type == MutationRef::ClearRange) {
@ -4943,8 +4950,8 @@ ACTOR Future<Void> reportStorageServerState(StorageServer* self) {
TraceEvent(level, "FetchKeyCurrentStatus")
.detail("Timestamp", now())
.detail("LongestRunningTime", longestRunningFetchKeys.first)
.detail("StartKey", longestRunningFetchKeys.second.begin.printable())
.detail("EndKey", longestRunningFetchKeys.second.end.printable())
.detail("StartKey", longestRunningFetchKeys.second.begin)
.detail("EndKey", longestRunningFetchKeys.second.end)
.detail("NumRunning", numRunningFetchKeys);
}
}

View File

@ -663,16 +663,16 @@ struct ConsistencyCheckWorkload : TestWorkload {
.detail(format("StorageServer%d", j).c_str(), iter_ss[j].toString())
.detail(format("StorageServer%d", firstValidServer).c_str(),
iter_ss[firstValidServer].toString())
.detail("ShardBegin", printable(req.begin.getKey()))
.detail("ShardEnd", printable(req.end.getKey()))
.detail("ShardBegin", req.begin.getKey())
.detail("ShardEnd", req.end.getKey())
.detail("VersionNumber", req.version)
.detail(format("Server%dUniques", j).c_str(), currentUniques)
.detail(format("Server%dUniqueKey", j).c_str(), printable(currentUniqueKey))
.detail(format("Server%dUniqueKey", j).c_str(), currentUniqueKey)
.detail(format("Server%dUniques", firstValidServer).c_str(), referenceUniques)
.detail(format("Server%dUniqueKey", firstValidServer).c_str(),
printable(referenceUniqueKey))
referenceUniqueKey)
.detail("ValueMismatches", valueMismatches)
.detail("ValueMismatchKey", printable(valueMismatchKey))
.detail("ValueMismatchKey", valueMismatchKey)
.detail("MatchingKVPairs", matchingKVPairs);
self->testFailure("Data inconsistent", true);
@ -718,7 +718,7 @@ struct ConsistencyCheckWorkload : TestWorkload {
if (bytesReadInRange > 0) {
TraceEvent("CacheConsistencyCheck_ReadRange")
.suppressFor(1.0)
.detail("Range", printable(iter->range()))
.detail("Range", iter->range())
.detail("BytesRead", bytesReadInRange);
}
}
@ -1083,8 +1083,8 @@ struct ConsistencyCheckWorkload : TestWorkload {
TraceEvent("ConsistencyCheck_InconsistentStorageMetrics")
.detail("ByteEstimate1", estimatedBytes[firstValidStorageServer])
.detail("ByteEstimate2", numBytes)
.detail("Begin", printable(shard.begin))
.detail("End", printable(shard.end))
.detail("Begin", shard.begin)
.detail("End", shard.end)
.detail("StorageServer1", storageServers[firstValidStorageServer].id())
.detail("StorageServer2", storageServers[i].id())
.detail("IsTSS",
@ -1451,17 +1451,17 @@ struct ConsistencyCheckWorkload : TestWorkload {
.detail(format("StorageServer%d", j).c_str(), storageServers[j].toString())
.detail(format("StorageServer%d", firstValidServer).c_str(),
storageServers[firstValidServer].toString())
.detail("ShardBegin", printable(req.begin.getKey()))
.detail("ShardEnd", printable(req.end.getKey()))
.detail("ShardBegin", req.begin.getKey())
.detail("ShardEnd", req.end.getKey())
.detail("VersionNumber", req.version)
.detail(format("Server%dUniques", j).c_str(), currentUniques)
.detail(format("Server%dUniqueKey", j).c_str(), printable(currentUniqueKey))
.detail(format("Server%dUniqueKey", j).c_str(), currentUniqueKey)
.detail(format("Server%dUniques", firstValidServer).c_str(),
referenceUniques)
.detail(format("Server%dUniqueKey", firstValidServer).c_str(),
printable(referenceUniqueKey))
referenceUniqueKey)
.detail("ValueMismatches", valueMismatches)
.detail("ValueMismatchKey", printable(valueMismatchKey))
.detail("ValueMismatchKey", valueMismatchKey)
.detail("MatchingKVPairs", matchingKVPairs)
.detail("IsTSS",
storageServerInterfaces[j].isTss() ||
@ -1673,7 +1673,7 @@ struct ConsistencyCheckWorkload : TestWorkload {
if (bytesReadInRange > 0) {
TraceEvent("ConsistencyCheck_ReadRange")
.suppressFor(1.0)
.detail("Range", printable(range))
.detail("Range", range)
.detail("BytesRead", bytesReadInRange);
}
}

View File

@ -258,13 +258,13 @@ struct SerializabilityWorkload : TestWorkload {
} else if (ops[opNum].mutationOp.present()) {
auto& op = ops[opNum].mutationOp.get();
if (op.type == MutationRef::SetValue) {
//TraceEvent("SRL_Set").detail("Mutation", op.toString());
//TraceEvent("SRL_Set").detail("Mutation", op);
tr->set(op.param1, op.param2);
} else if (op.type == MutationRef::ClearRange) {
//TraceEvent("SRL_Clear").detail("Mutation", op.toString());
//TraceEvent("SRL_Clear").detail("Mutation", op);
tr->clear(KeyRangeRef(op.param1, op.param2));
} else {
//TraceEvent("SRL_AtomicOp").detail("Mutation", op.toString());
//TraceEvent("SRL_AtomicOp").detail("Mutation", op);
tr->atomicOp(op.param1, op.param2, op.type);
}
} else if (ops[opNum].readConflictOp.present()) {

View File

@ -219,6 +219,7 @@ public:
template <class U>
Optional(const U& t) : impl(std::in_place, t) {}
Optional(T&& t) : impl(std::in_place, std::move(t)) {}
/* This conversion constructor was nice, but combined with the prior constructor it means that Optional<int> can be
converted to Optional<Optional<int>> in the wrong way (a non-present Optional<int> converts to a non-present

View File

@ -117,10 +117,12 @@ void Histogram::writeToLog() {
TraceEvent e(SevInfo, "Histogram");
e.detail("Group", group).detail("Op", op).detail("Unit", UnitToStringMapper[(size_t)unit]);
int totalCount = 0;
for (uint32_t i = 0; i < 32; i++) {
uint64_t value = uint64_t(1) << (i + 1);
if (buckets[i]) {
totalCount += buckets[i];
switch (unit) {
case Unit::microseconds:
e.detail(format("LessThan%u.%03u", value / 1000, value % 1000), buckets[i]);
@ -140,6 +142,7 @@ void Histogram::writeToLog() {
}
}
}
e.detail("TotalCount", totalCount);
}
std::string Histogram::drawHistogram() {

View File

@ -90,6 +90,7 @@ EncryptionStreamCipher::EncryptionStreamCipher(const StreamCipher::Key& key, con
}
StringRef EncryptionStreamCipher::encrypt(unsigned char const* plaintext, int len, Arena& arena) {
TEST(true); // Encrypting data with StreamCipher
auto ciphertext = new (arena) unsigned char[len + AES_BLOCK_SIZE];
int bytes{ 0 };
EVP_EncryptUpdate(cipher.getCtx(), ciphertext, &bytes, plaintext, len);
@ -110,6 +111,7 @@ DecryptionStreamCipher::DecryptionStreamCipher(const StreamCipher::Key& key, con
}
StringRef DecryptionStreamCipher::decrypt(unsigned char const* ciphertext, int len, Arena& arena) {
TEST(true); // Decrypting data with StreamCipher
auto plaintext = new (arena) unsigned char[len];
int bytesDecrypted{ 0 };
EVP_DecryptUpdate(cipher.getCtx(), plaintext, &bytesDecrypted, ciphertext, len);

View File

@ -52,6 +52,7 @@ class ThreadSafeQueue : NonCopyable {
struct Node : BaseNode, FastAllocated<Node> {
T data;
Node(T const& data) : data(data) {}
Node(T&& data) : data(std::move(data)) {}
};
std::atomic<BaseNode*> head;
BaseNode* tail;
@ -131,9 +132,9 @@ public:
}
// If push() returns true, the consumer may be sleeping and should be woken
bool push(T const& data) {
Node* n = new Node(data);
n->data = data;
template <class U>
bool push(U&& data) {
Node* n = new Node(std::forward<U>(data));
return pushNode(n) == &sleeping;
}

View File

@ -107,7 +107,7 @@ private:
std::vector<TraceEventFields> eventBuffer;
int loggedLength;
int bufferLength;
bool opened;
std::atomic<bool> opened;
int64_t preopenOverflowCount;
std::string basename;
std::string logGroup;