merge upstream/main

This commit is contained in:
Xiaoxi Wang 2022-07-12 14:28:49 -07:00
commit d3686143a1
32 changed files with 524 additions and 277 deletions

View File

@ -53,6 +53,15 @@ add_dependencies(fdb_c fdb_c_generated fdb_c_options)
add_dependencies(fdbclient fdb_c_options)
add_dependencies(fdbclient_sampling fdb_c_options)
target_link_libraries(fdb_c PRIVATE $<BUILD_INTERFACE:fdbclient>)
if(USE_UBSAN)
# The intent of this hack is to force c targets that depend on fdb_c to use
# c++ as their linker language. Otherwise you see undefined references to c++
# specific ubsan symbols.
add_library(force_cxx_linker STATIC IMPORTED)
set_property(TARGET force_cxx_linker PROPERTY IMPORTED_LOCATION /dev/null)
set_target_properties(force_cxx_linker PROPERTIES IMPORTED_LINK_INTERFACE_LANGUAGES CXX)
target_link_libraries(fdb_c PUBLIC $<BUILD_INTERFACE:force_cxx_linker>)
endif()
if(APPLE)
set(symbols ${CMAKE_CURRENT_BINARY_DIR}/fdb_c.symbols)
add_custom_command(OUTPUT ${symbols}
@ -67,7 +76,12 @@ if(APPLE)
target_link_options(fdb_c PRIVATE "LINKER:-no_weak_exports,-exported_symbols_list,${symbols}")
elseif(WIN32)
else()
target_link_options(fdb_c PRIVATE "LINKER:--version-script=${CMAKE_CURRENT_SOURCE_DIR}/fdb_c.map,-z,nodelete,-z,noexecstack")
if (NOT USE_UBSAN)
# For ubsan we need to export type information for the vptr check to work.
# Otherwise we only want to export fdb symbols in the fdb c api.
target_link_options(fdb_c PRIVATE "LINKER:--version-script=${CMAKE_CURRENT_SOURCE_DIR}/fdb_c.map")
endif()
target_link_options(fdb_c PRIVATE "LINKER:-z,nodelete,-z,noexecstack")
endif()
target_include_directories(fdb_c PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}>
@ -112,8 +126,8 @@ if(NOT WIN32)
test/unit/fdb_api.cpp
test/unit/fdb_api.hpp)
add_library(fdb_cpp INTERFACE)
target_sources(fdb_cpp INTERFACE test/fdb_api.hpp)
add_library(fdb_cpp INTERFACE test/fdb_api.hpp)
target_sources(fdb_cpp INTERFACE )
target_include_directories(fdb_cpp INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/test)
target_link_libraries(fdb_cpp INTERFACE fmt::fmt)
@ -187,19 +201,13 @@ if(NOT WIN32)
endif()
target_link_libraries(fdb_c_api_tester_impl PRIVATE SimpleOpt)
add_dependencies(fdb_c_setup_tests doctest)
add_dependencies(fdb_c_unit_tests_impl doctest)
add_dependencies(fdb_c_unit_tests_version_510 doctest)
add_dependencies(disconnected_timeout_unit_tests doctest)
target_include_directories(fdb_c_setup_tests PUBLIC ${DOCTEST_INCLUDE_DIR})
target_include_directories(fdb_c_unit_tests_impl PUBLIC ${DOCTEST_INCLUDE_DIR} ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}/foundationdb/)
target_include_directories(fdb_c_unit_tests_version_510 PUBLIC ${DOCTEST_INCLUDE_DIR} ${CMAKE_BINARY_DIR}/flow/include)
target_include_directories(disconnected_timeout_unit_tests PUBLIC ${DOCTEST_INCLUDE_DIR})
target_link_libraries(fdb_c_setup_tests PRIVATE fdb_c Threads::Threads)
target_link_libraries(fdb_c_unit_tests_impl PRIVATE fdb_c Threads::Threads fdbclient rapidjson)
target_link_libraries(fdb_c_unit_tests_version_510 PRIVATE fdb_c Threads::Threads)
target_link_libraries(trace_partial_file_suffix_test PRIVATE fdb_c Threads::Threads flow)
target_link_libraries(disconnected_timeout_unit_tests PRIVATE fdb_c Threads::Threads)
target_include_directories(fdb_c_unit_tests_impl PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}/foundationdb/)
target_include_directories(fdb_c_unit_tests_version_510 PUBLIC ${CMAKE_BINARY_DIR}/flow/include)
target_link_libraries(fdb_c_setup_tests PRIVATE fdb_c Threads::Threads doctest)
target_link_libraries(fdb_c_unit_tests_impl PRIVATE fdb_c Threads::Threads fdbclient rapidjson doctest)
target_link_libraries(fdb_c_unit_tests_version_510 PRIVATE fdb_c Threads::Threads doctest)
target_link_libraries(trace_partial_file_suffix_test PRIVATE fdb_c Threads::Threads flow doctest)
target_link_libraries(disconnected_timeout_unit_tests PRIVATE fdb_c Threads::Threads doctest)
# do not set RPATH for mako
set_property(TARGET mako PROPERTY SKIP_BUILD_RPATH TRUE)
@ -426,7 +434,12 @@ if (NOT WIN32 AND NOT APPLE AND NOT OPEN_FOR_IDE)
endif()
# Generate shim library in Linux builds
if (NOT WIN32 AND NOT APPLE AND NOT OPEN_FOR_IDE)
if (OPEN_FOR_IDE)
add_library(fdb_c_shim OBJECT fdb_c_shim.cpp)
target_link_libraries(fdb_c_shim PUBLIC dl)
elseif(NOT WIN32 AND NOT APPLE AND NOT OPEN_FOR_IDE) # Linux Only
set(SHIM_LIB_OUTPUT_DIR ${CMAKE_CURRENT_BINARY_DIR})

View File

@ -3,7 +3,7 @@ include(ExternalProject)
find_package(Git REQUIRED)
ExternalProject_Add(
doctest
doctest_proj
PREFIX ${CMAKE_BINARY_DIR}/doctest
GIT_REPOSITORY https://github.com/onqtam/doctest.git
GIT_TAG 7b9885133108ae301ddd16e2651320f54cafeba7 # v2.4.8
@ -14,5 +14,8 @@ ExternalProject_Add(
LOG_DOWNLOAD ON
)
ExternalProject_Get_Property(doctest source_dir)
set(DOCTEST_INCLUDE_DIR ${source_dir}/doctest CACHE INTERNAL "Path to include folder for doctest")
ExternalProject_Get_Property(doctest_proj source_dir)
set(DOCTEST_INCLUDE_DIR "${source_dir}/doctest" CACHE INTERNAL "Path to include folder for doctest")
add_library(doctest INTERFACE)
add_dependencies(doctest doctest_proj)
target_include_directories(doctest INTERFACE "${DOCTEST_INCLUDE_DIR}")

View File

@ -86,7 +86,7 @@ public class Tuple implements Comparable<Tuple>, Iterable<Object> {
private Tuple(List<Object> elements) {
this.elements = elements;
incompleteVersionstamp = TupleUtil.hasIncompleteVersionstamp(elements.stream());
incompleteVersionstamp = TupleUtil.hasIncompleteVersionstamp(elements);
}
/**
@ -265,7 +265,7 @@ public class Tuple implements Comparable<Tuple>, Iterable<Object> {
* @return a newly created {@code Tuple}
*/
public Tuple add(List<?> l) {
return new Tuple(this, l, TupleUtil.hasIncompleteVersionstamp(l.stream()));
return new Tuple(this, l, TupleUtil.hasIncompleteVersionstamp(l));
}
/**
@ -483,6 +483,15 @@ public class Tuple implements Comparable<Tuple>, Iterable<Object> {
return new ArrayList<>(elements);
}
/**
* Gets the unserialized contents of this {@code Tuple} without copying into a new list.
*
* @return the elements that make up this {@code Tuple}.
*/
List<Object> getRawItems() {
return elements;
}
/**
* Gets a {@link Stream} of the unserialized contents of this {@code Tuple}.
*

View File

@ -776,24 +776,26 @@ class TupleUtil {
return packedSize;
}
static boolean hasIncompleteVersionstamp(Stream<?> items) {
return items.anyMatch(item -> {
static boolean hasIncompleteVersionstamp(Collection<?> items) {
boolean hasIncompleteVersionstamp = false;
for (Object item: items) {
if(item == null) {
return false;
continue;
}
else if(item instanceof Versionstamp) {
return !((Versionstamp) item).isComplete();
hasIncompleteVersionstamp = !((Versionstamp) item).isComplete();
}
else if(item instanceof Tuple) {
return hasIncompleteVersionstamp(((Tuple) item).stream());
hasIncompleteVersionstamp = hasIncompleteVersionstamp(((Tuple) item).getRawItems());
}
else if(item instanceof Collection<?>) {
return hasIncompleteVersionstamp(((Collection<?>) item).stream());
hasIncompleteVersionstamp = hasIncompleteVersionstamp( (Collection<?>) item);
}
else {
return false;
if (hasIncompleteVersionstamp) {
return hasIncompleteVersionstamp;
}
});
};
return hasIncompleteVersionstamp;
}
public static void main(String[] args) {

View File

@ -699,6 +699,15 @@ def tenants(logger):
run_fdbcli_command('writemode on; clear tenant_test')
def integer_options():
process = subprocess.Popen(command_template[:-1], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=fdbcli_env)
cmd_sequence = ['option on TIMEOUT 1000', 'writemode on', 'clear foo']
output, error_output = process.communicate(input='\n'.join(cmd_sequence).encode())
lines = output.decode().strip().split('\n')[-2:]
assert lines[0] == 'Option enabled for all transactions'
assert lines[1].startswith('Committed')
assert error_output == b''
if __name__ == '__main__':
parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
@ -745,6 +754,7 @@ if __name__ == '__main__':
triggerddteaminfolog()
tenants()
versionepoch()
integer_options()
else:
assert args.process_number > 1, "Process number should be positive"
coordinators()

View File

@ -189,7 +189,7 @@ endfunction()
function(add_flow_target)
set(options EXECUTABLE STATIC_LIBRARY
DYNAMIC_LIBRARY)
DYNAMIC_LIBRARY LINK_TEST)
set(oneValueArgs NAME)
set(multiValueArgs SRCS COVERAGE_FILTER_OUT DISABLE_ACTOR_DIAGNOSTICS ADDL_SRCS)
cmake_parse_arguments(AFT "${options}" "${oneValueArgs}" "${multiValueArgs}" "${ARGN}")
@ -277,6 +277,12 @@ function(add_flow_target)
set(strip_target ON)
add_library(${AFT_NAME} DYNAMIC ${sources} ${AFT_ADDL_SRCS})
endif()
if(AFT_LINK_TEST)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin/linktest)
set(strip_target ON)
set(target_type exec)
add_executable(${AFT_NAME} ${sources} ${AFT_ADDL_SRCS})
endif()
foreach(src IN LISTS sources AFT_ADDL_SRCS)
get_filename_component(dname ${CMAKE_CURRENT_SOURCE_DIR} NAME)

View File

@ -1,144 +0,0 @@
# TODO:
# * Android
# * FreeBSD
name: CI
on:
push:
paths-ignore:
- 'LICENSE.txt'
- 'README.md'
pull_request:
paths-ignore:
- 'LICENSE.txt'
- 'README.md'
jobs:
Baseline:
strategy:
fail-fast: false
matrix:
os: [ubuntu-18.04, ubuntu-20.04, ubuntu-latest]
cc: [[gcc, g++], [clang, clang++]]
py: [python3.6, python3.7, python3] # We need f-strings so 3.6+
runs-on: ${{ matrix.os }}
env:
CC: ${{ matrix.cc[0] }}
CXX: ${{ matrix.cc[1] }}
PYTHON: ${{ matrix.py }}
steps:
- uses: actions/checkout@v2
- name: Install deps
run: |
sudo add-apt-repository ppa:deadsnakes/ppa
sudo apt-get update
sudo apt-get install ${PYTHON}
sudo apt-get install ${PYTHON}-pip || true
- name: Run tests
run: scripts/travis.sh
PyPy: # Can't test this in matrix, pypy package has different names in old distros
runs-on: ubuntu-latest
env:
PYTHON: pypy3
steps:
- uses: actions/checkout@v2
- name: Install deps
run: sudo apt-get install ${PYTHON}
- name: Run tests
run: scripts/travis.sh
Pylint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install deps
run: sudo apt-get install pylint
- name: Run tests
run: |
pylint implib-gen.py
pylint scripts/ld
Coverage:
runs-on: ubuntu-latest
environment: secrets
env:
COVERAGE: 1
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
PYTHON: 'coverage run -a'
steps:
- uses: actions/checkout@v2
- name: Install deps
run: |
sudo apt-get install python3 python3-pip
sudo python3 -mpip install codecov
- name: Run tests
run: scripts/travis.sh
- name: Upload coverage
run: |
for t in tests/*; do
if test -d $t; then
(cd $t && coverage xml)
fi
done
codecov --required
x86:
runs-on: ubuntu-latest
env:
ARCH: i386-linux-gnueabi
steps:
- uses: actions/checkout@v2
- name: Install deps
run: sudo apt-get install gcc-multilib g++-multilib
- name: Run tests
run: scripts/travis.sh
arm-arm:
runs-on: ubuntu-latest
env:
ARCH: arm-linux-gnueabi
CFLAGS: -marm
steps:
- uses: actions/checkout@v2
- name: Install deps
run: sudo apt-get install qemu-user gcc-arm-linux-gnueabi g++-arm-linux-gnueabi binutils-arm-linux-gnueabi libc6-armel-cross libc6-dev-armel-cross
- name: Run tests
run: scripts/travis.sh
arm-thumb:
runs-on: ubuntu-latest
env:
ARCH: arm-linux-gnueabi
CFLAGS: -mthumb
steps:
- uses: actions/checkout@v2
- name: Install deps
run: sudo apt-get install qemu-user gcc-arm-linux-gnueabi g++-arm-linux-gnueabi binutils-arm-linux-gnueabi libc6-armel-cross libc6-dev-armel-cross
- name: Run tests
run: scripts/travis.sh
armhf-arm:
runs-on: ubuntu-latest
env:
ARCH: arm-linux-gnueabihf
CFLAGS: -marm
steps:
- uses: actions/checkout@v2
- name: Install deps
run: sudo apt-get install qemu-user gcc-arm-linux-gnueabihf g++-arm-linux-gnueabihf binutils-arm-linux-gnueabihf libc6-armhf-cross libc6-dev-armhf-cross
- name: Run tests
run: scripts/travis.sh
armhf-thumb:
runs-on: ubuntu-latest
env:
ARCH: arm-linux-gnueabihf
CFLAGS: -mthumb
steps:
- uses: actions/checkout@v2
- name: Install deps
run: sudo apt-get install qemu-user gcc-arm-linux-gnueabihf g++-arm-linux-gnueabihf binutils-arm-linux-gnueabihf libc6-armhf-cross libc6-dev-armhf-cross
- name: Run tests
run: scripts/travis.sh
aarch64:
runs-on: ubuntu-latest
env:
ARCH: aarch64-linux-gnueabi
steps:
- uses: actions/checkout@v2
- name: Install deps
run: sudo apt-get install qemu-user gcc-aarch64-linux-gnu g++-aarch64-linux-gnu binutils-aarch64-linux-gnu libc6-arm64-cross libc6-dev-arm64-cross
- name: Run tests
run: scripts/travis.sh

View File

@ -45,6 +45,7 @@ static unsigned int crc32_align(unsigned int crc, unsigned char* p, unsigned lon
unsigned int CRC32_FUNCTION_ASM(unsigned int crc, unsigned char* p, unsigned long len);
unsigned int CRC32_FUNCTION(unsigned int crc, unsigned char* p, unsigned long len) {
#ifdef __powerpc64 // avoid link failures on systems without CRC32_FUNCTION_ASM declared
unsigned int prealign;
unsigned int tail;
@ -76,6 +77,6 @@ out:
#ifdef CRC_XOR
crc ^= 0xffffffff;
#endif
#endif
return crc;
}

View File

@ -1,9 +1,19 @@
.. _release-notes:
#############
Release Notes
#############
7.1.15
======
* Same as 7.1.14 release with AVX enabled.
7.1.14
======
* Released with AVX disabled.
* Fixed a high commit latency bug when there are data movement. `(PR #7548) <https://github.com/apple/foundationdb/pull/7548>`_
* Fixed the primary locality on the sequencer by obtaining it from cluster controller. `(PR #7535) <https://github.com/apple/foundationdb/pull/7535>`_
* Added StorageEngine type to StorageMetrics trace events. `(PR #7546) <https://github.com/apple/foundationdb/pull/7546>`_
* Improved hasIncompleteVersionstamp performance in Java binding to use iteration rather than stream processing. `(PR #7559) <https://github.com/apple/foundationdb/pull/7559>`_
7.1.13
======
* Same as 7.1.12 release with AVX enabled.

View File

@ -0,0 +1,58 @@
.. _release-notes:
#############
Release Notes
#############
7.2.0
======
Features
--------
Performance
-----------
Reliability
-----------
Fixes
-----
* In ``fdbcli``, integer options are now expressed as integers rather than byte strings (e.g. ``option on TIMEOUT 1000``). `(PR #7571) <https://github.com/apple/foundationdb/pull/7571>`_
Status
------
Bindings
--------
Other Changes
-------------
Earlier release notes
---------------------
* :doc:`7.1 (API Version 710) </release-notes/release-notes-710>`
* :doc:`7.0 (API Version 700) </release-notes/release-notes-700>`
* :doc:`6.3 (API Version 630) </release-notes/release-notes-630>`
* :doc:`6.2 (API Version 620) </release-notes/release-notes-620>`
* :doc:`6.1 (API Version 610) </release-notes/release-notes-610>`
* :doc:`6.0 (API Version 600) </release-notes/release-notes-600>`
* :doc:`5.2 (API Version 520) </release-notes/release-notes-520>`
* :doc:`5.1 (API Version 510) </release-notes/release-notes-510>`
* :doc:`5.0 (API Version 500) </release-notes/release-notes-500>`
* :doc:`4.6 (API Version 460) </release-notes/release-notes-460>`
* :doc:`4.5 (API Version 450) </release-notes/release-notes-450>`
* :doc:`4.4 (API Version 440) </release-notes/release-notes-440>`
* :doc:`4.3 (API Version 430) </release-notes/release-notes-430>`
* :doc:`4.2 (API Version 420) </release-notes/release-notes-420>`
* :doc:`4.1 (API Version 410) </release-notes/release-notes-410>`
* :doc:`4.0 (API Version 400) </release-notes/release-notes-400>`
* :doc:`3.0 (API Version 300) </release-notes/release-notes-300>`
* :doc:`2.0 (API Version 200) </release-notes/release-notes-200>`
* :doc:`1.0 (API Version 100) </release-notes/release-notes-100>`
* :doc:`Beta 3 (API Version 23) </release-notes/release-notes-023>`
* :doc:`Beta 2 (API Version 22) </release-notes/release-notes-022>`
* :doc:`Beta 1 (API Version 21) </release-notes/release-notes-021>`
* :doc:`Alpha 6 (API Version 16) </release-notes/release-notes-016>`
* :doc:`Alpha 5 (API Version 14) </release-notes/release-notes-014>`

View File

@ -201,9 +201,31 @@ private:
bool enabled,
Optional<StringRef> arg,
bool intrans) {
if (enabled && arg.present() != FDBTransactionOptions::optionInfo.getMustExist(option).hasParameter) {
fprintf(stderr, "ERROR: option %s a parameter\n", arg.present() ? "did not expect" : "expected");
throw invalid_option_value();
// If the parameter type is an int, we will extract into this variable and reference its memory with a StringRef
int64_t parsedInt = 0;
if (enabled) {
auto optionInfo = FDBTransactionOptions::optionInfo.getMustExist(option);
if (arg.present() != optionInfo.hasParameter) {
fprintf(stderr, "ERROR: option %s a parameter\n", arg.present() ? "did not expect" : "expected");
throw invalid_option_value();
}
if (arg.present() && optionInfo.paramType == FDBOptionInfo::ParamType::Int) {
try {
size_t nextIdx;
std::string value = arg.get().toString();
parsedInt = std::stoll(value, &nextIdx);
if (nextIdx != value.length()) {
fprintf(
stderr, "ERROR: could not parse value `%s' as an integer\n", arg.get().toString().c_str());
throw invalid_option_value();
}
arg = StringRef(reinterpret_cast<uint8_t*>(&parsedInt), 8);
} catch (std::exception e) {
fprintf(stderr, "ERROR: could not parse value `%s' as an integer\n", arg.get().toString().c_str());
throw invalid_option_value();
}
}
}
if (intrans) {

View File

@ -89,6 +89,9 @@ if(WIN32)
add_dependencies(fdbclient_sampling_actors fdbclient_actors)
endif()
add_flow_target(LINK_TEST NAME fdbclientlinktest SRCS ${FDBCLIENT_SRCS} LinkTest.cpp ADDL_SRCS ${options_srcs})
target_link_libraries(fdbclientlinktest PRIVATE fdbclient rapidxml) # re-link rapidxml due to private link interface
if(BUILD_AZURE_BACKUP)
target_link_libraries(fdbclient PRIVATE curl uuid azure-storage-lite)
target_link_libraries(fdbclient_sampling PRIVATE curl uuid azure-storage-lite)

8
fdbclient/LinkTest.cpp Normal file
View File

@ -0,0 +1,8 @@
// When creating a static or shared library, undefined symbols will be ignored.
// Since we want to ensure no symbols from other modules are used, each module
// will create an executable so the linker will throw errors if it can't find
// the declaration of a symbol. This class defines a dummy main function so the
// executable can be built.
int main() {
return 0;
}

View File

@ -4719,6 +4719,9 @@ ACTOR Future<Void> getRangeStreamFragment(Reference<TransactionState> trState,
req.spanContext = spanContext;
req.limit = reverse ? -CLIENT_KNOBS->REPLY_BYTE_LIMIT : CLIENT_KNOBS->REPLY_BYTE_LIMIT;
req.limitBytes = std::numeric_limits<int>::max();
// leaving the flag off for now to prevent data fetches stall under heavy load
// it is used to inform the storage that the rangeRead is for Fetch
// req.isFetchKeys = (trState->taskID == TaskPriority::FetchKeys);
trState->cx->getLatestCommitVersions(
locations[shard].locations, req.version, trState, req.ssLatestCommitVersions);

View File

@ -124,6 +124,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( SNAP_NETWORK_FAILURE_RETRY_LIMIT, 10 );
init( MAX_STORAGE_SNAPSHOT_FAULT_TOLERANCE, 1 );
init( MAX_COORDINATOR_SNAPSHOT_FAULT_TOLERANCE, 1 );
init( SNAPSHOT_ALL_STATEFUL_PROCESSES, false ); if ( randomize && BUGGIFY ) SNAPSHOT_ALL_STATEFUL_PROCESSES = true;
// Data distribution queue
init( HEALTH_POLL_TIME, 1.0 );

View File

@ -629,6 +629,9 @@ public:
// Maximum number of coordinators a snapshot can fail to
// capture while still succeeding
int64_t MAX_COORDINATOR_SNAPSHOT_FAULT_TOLERANCE;
// if true, all processes with class "storage", "transaction" and "log" will be snapshotted even not recruited as
// the role
bool SNAPSHOT_ALL_STATEFUL_PROCESSES;
// Storage Metrics
double STORAGE_METRICS_AVERAGE_INTERVAL;

View File

@ -8,7 +8,7 @@ To test this, run the following commands from the root of the FoundationDB
repository:
```bash
mkdir website
mkdir -p website
docker build -t foundationdb/foundationdb-kubernetes:7.1.5-local --target fdb-kubernetes-monitor --build-arg FDB_VERSION=7.1.5 --build-arg FDB_LIBRARY_VERSIONS="7.1.5 6.3.24 6.2.30" -f packaging/docker/Dockerfile .
docker build -t foundationdb/foundationdb-kubernetes:7.1.6-local --target fdb-kubernetes-monitor --build-arg FDB_VERSION=7.1.6 --build-arg FDB_LIBRARY_VERSIONS="7.1.6 6.3.24 6.2.30" -f packaging/docker/Dockerfile .
kubectl apply -f packaging/docker/kubernetes/test_config.yaml
@ -42,3 +42,24 @@ Once you are done, you can tear down the example with the following command:
kubectl delete -f packaging/docker/kubernetes/test_config.yaml
kubectl delete pvc -l app=fdb-kubernetes-example
```
### FDB Kubernetes operator
The following steps assume that you already have a [local development](https://github.com/FoundationDB/fdb-kubernetes-operator#local-development) setup for the fdb-kubernetes-operator.
```bash
mkdir -p website
# Change this version if you want to create a cluster with a different version
docker build -t foundationdb/foundationdb-kubernetes:7.1.11-local --target fdb-kubernetes-monitor --build-arg FDB_VERSION=7.1.11 --build-arg FDB_LIBRARY_VERSIONS="7.1.11 6.3.24 6.2.30" -f packaging/docker/Dockerfile .
```
Depending on the local Kubernetes setup you use you might have to push the newly build image to a local registry.
Now you should change to the directoy that contains the [fdb-kubernetes-operator](https://github.com/FoundationDB/fdb-kubernetes-operator) repository.
In the top directory run:
```bash
# Adjust the version for the cluster if it differs from the build image
vim ./config/tests/base/cluster.yaml
# Now you can create the cluster
kubectl apply -k ./config/tests/unified_image
```

View File

@ -25,6 +25,10 @@ add_flow_target(STATIC_LIBRARY NAME fdbrpc_sampling
SRCS ${FDBRPC_SRCS}
DISABLE_ACTOR_DIAGNOSTICS ${FDBRPC_SRCS_DISABLE_ACTOR_DIAGNOSTICS})
add_flow_target(LINK_TEST NAME fdbrpclinktest SRCS ${FDBRPC_SRCS} LinkTest.cpp DISABLE_ACTOR_DIAGNOSTICS ${FDBRPC_SRCS_DISABLE_ACTOR_DIAGNOSTICS})
target_link_libraries(fdbrpclinktest PRIVATE fdbrpc rapidjson)
target_include_directories(fdbrpclinktest PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/libeio)
if(COMPILE_EIO)
add_library(eio STATIC libeio/eio.c)
if(USE_VALGRIND)

8
fdbrpc/LinkTest.cpp Normal file
View File

@ -0,0 +1,8 @@
// When creating a static or shared library, undefined symbols will be ignored.
// Since we want to ensure no symbols from other modules are used, each module
// will create an executable so the linker will throw errors if it can't find
// the declaration of a symbol. This class defines a dummy main function so the
// executable can be built.
int main() {
return 0;
}

View File

@ -215,7 +215,6 @@ ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster,
state Reference<ClusterRecoveryData> recoveryData;
state PromiseStream<Future<Void>> addActor;
state Future<Void> recoveryCore;
state bool recoveredDisk = false;
// SOMEDAY: If there is already a non-failed master referenced by zkMasterInfo, use that one until it fails
// When this someday is implemented, make sure forced failures still cause the master to be recruited again
@ -258,18 +257,6 @@ ACTOR Future<Void> clusterWatchDatabase(ClusterControllerData* cluster,
.detail("ChangeID", dbInfo.id);
db->serverInfo->set(dbInfo);
if (SERVER_KNOBS->ENABLE_ENCRYPTION && !recoveredDisk) {
// EKP singleton recruitment waits for 'Master/Sequencer' recruitment, execute wait for
// 'recoveredDiskFiles' optimization once EKP recruitment is unblocked to avoid circular dependencies
// with StorageServer initialization. The waiting for recoveredDiskFiles is to make sure the worker
// server on the same process has been registered with the new CC before recruitment.
wait(recoveredDiskFiles);
TraceEvent("CCWDB_RecoveredDiskFiles", cluster->id).log();
// Need to be done for the first once in the lifetime of ClusterController
recoveredDisk = true;
}
state Future<Void> spinDelay = delay(
SERVER_KNOBS
->MASTER_SPIN_DELAY); // Don't retry cluster recovery more than once per second, but don't delay
@ -1142,7 +1129,8 @@ ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
.detail("ZoneId", w.locality.zoneId())
.detail("DataHall", w.locality.dataHallId())
.detail("PClass", req.processClass.toString())
.detail("Workers", self->id_worker.size());
.detail("Workers", self->id_worker.size())
.detail("RecoveredDiskFiles", req.recoveredDiskFiles);
self->goodRecruitmentTime = lowPriorityDelay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY);
self->goodRemoteRecruitmentTime = lowPriorityDelay(SERVER_KNOBS->WAIT_FOR_GOOD_REMOTE_RECRUITMENT_DELAY);
} else {
@ -1154,7 +1142,8 @@ ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
.detail("DataHall", w.locality.dataHallId())
.detail("PClass", req.processClass.toString())
.detail("Workers", self->id_worker.size())
.detail("Degraded", req.degraded);
.detail("Degraded", req.degraded)
.detail("RecoveredDiskFiles", req.recoveredDiskFiles);
}
if (w.address() == g_network->getLocalAddress()) {
if (self->changingDcIds.get().first) {
@ -1207,6 +1196,7 @@ ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
newProcessClass,
newPriorityInfo,
req.degraded,
req.recoveredDiskFiles,
req.issues);
if (!self->masterProcessId.present() &&
w.locality.processId() == self->db.serverInfo->get().master.locality.processId()) {
@ -1232,6 +1222,7 @@ ACTOR Future<Void> registerWorker(RegisterWorkerRequest req,
info->second.priorityInfo = newPriorityInfo;
info->second.initialClass = req.initialClass;
info->second.details.degraded = req.degraded;
info->second.details.recoveredDiskFiles = req.recoveredDiskFiles;
info->second.gen = req.generation;
info->second.issues = req.issues;

View File

@ -364,6 +364,37 @@ class DDTxnProcessorImpl {
return result;
}
ACTOR static Future<Void> waitForDataDistributionEnabled(Database cx, const DDEnabledState* ddEnabledState) {
state Transaction tr(cx);
loop {
wait(delay(SERVER_KNOBS->DD_ENABLED_CHECK_DELAY, TaskPriority::DataDistribution));
try {
Optional<Value> mode = wait(tr.get(dataDistributionModeKey));
if (!mode.present() && ddEnabledState->isDDEnabled()) {
TraceEvent("WaitForDDEnabledSucceeded").log();
return Void();
}
if (mode.present()) {
BinaryReader rd(mode.get(), Unversioned());
int m;
rd >> m;
TraceEvent(SevDebug, "WaitForDDEnabled")
.detail("Mode", m)
.detail("IsDDEnabled", ddEnabledState->isDDEnabled());
if (m && ddEnabledState->isDDEnabled()) {
TraceEvent("WaitForDDEnabledSucceeded").log();
return Void();
}
}
tr.reset();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
};
Future<IDDTxnProcessor::SourceServers> DDTxnProcessor::getSourceServersForRange(const KeyRangeRef range) {
@ -395,4 +426,8 @@ Future<Reference<InitialDataDistribution>> DDTxnProcessor::getInitialDataDistrib
const std::vector<Optional<Key>>& remoteDcIds,
const DDEnabledState* ddEnabledState) {
return DDTxnProcessorImpl::getInitialDataDistribution(cx, distributorId, moveKeysLock, remoteDcIds, ddEnabledState);
}
Future<Void> DDTxnProcessor::waitForDataDistributionEnabled(const DDEnabledState* ddEnabledState) const {
return DDTxnProcessorImpl::waitForDataDistributionEnabled(cx, ddEnabledState);
}

View File

@ -192,37 +192,6 @@ ACTOR Future<Void> remoteRecovered(Reference<AsyncVar<ServerDBInfo> const> db) {
return Void();
}
ACTOR Future<Void> waitForDataDistributionEnabled(Database cx, const DDEnabledState* ddEnabledState) {
state Transaction tr(cx);
loop {
wait(delay(SERVER_KNOBS->DD_ENABLED_CHECK_DELAY, TaskPriority::DataDistribution));
try {
Optional<Value> mode = wait(tr.get(dataDistributionModeKey));
if (!mode.present() && ddEnabledState->isDDEnabled()) {
TraceEvent("WaitForDDEnabledSucceeded").log();
return Void();
}
if (mode.present()) {
BinaryReader rd(mode.get(), Unversioned());
int m;
rd >> m;
TraceEvent(SevDebug, "WaitForDDEnabled")
.detail("Mode", m)
.detail("IsDDEnabled", ddEnabledState->isDDEnabled());
if (m && ddEnabledState->isDDEnabled()) {
TraceEvent("WaitForDDEnabledSucceeded").log();
return Void();
}
}
tr.reset();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR Future<bool> isDataDistributionEnabled(Database cx, const DDEnabledState* ddEnabledState) {
state Transaction tr(cx);
loop {
@ -371,6 +340,10 @@ struct DataDistributor : NonCopyable, ReferenceCounted<DataDistributor> {
remoteDcIds.push_back(regions[1].dcId);
}
}
Future<Void> waitDataDistributorEnabled(const DDEnabledState* ddEnabledState) const {
return txnProcessor->waitForDataDistributionEnabled(ddEnabledState);
}
};
// Runs the data distribution algorithm for FDB, including the DD Queue, DD tracker, and DD team collection
@ -471,7 +444,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
.detail("HighestPriority", self->configuration.usableRegions > 1 ? 0 : -1)
.trackLatest(self->totalDataInFlightRemoteEventHolder->trackingKey);
wait(waitForDataDistributionEnabled(cx, ddEnabledState));
wait(self->waitDataDistributorEnabled(ddEnabledState));
TraceEvent("DataDistributionEnabled").log();
}
@ -925,6 +898,24 @@ ACTOR Future<std::map<NetworkAddress, std::pair<WorkerInterface, std::string>>>
}
}
}
if (SERVER_KNOBS->SNAPSHOT_ALL_STATEFUL_PROCESSES) {
for (const auto& worker : workers) {
const auto& processAddress = worker.interf.address();
// skip processes that are already included
if (result.count(processAddress))
continue;
const auto& processClassType = worker.processClass.classType();
// coordinators are always configured to be recruited
if (processClassType == ProcessClass::StorageClass) {
result[processAddress] = std::make_pair(worker.interf, "storage");
TraceEvent(SevInfo, "SnapUnRecruitedStorageProcess").detail("ProcessAddress", processAddress);
} else if (processClassType == ProcessClass::TransactionClass ||
processClassType == ProcessClass::LogClass) {
result[processAddress] = std::make_pair(worker.interf, "tlog");
TraceEvent(SevInfo, "SnapUnRecruitedLogProcess").detail("ProcessAddress", processAddress);
}
}
}
return result;
} catch (Error& e) {
wait(tr.onError(e));

View File

@ -479,8 +479,12 @@ private:
uint8_t* plaintext = new uint8_t[sizeof(int) + v1.size() + v2.size()];
*(int*)plaintext = op;
memcpy(plaintext + sizeof(int), v1.begin(), v1.size());
memcpy(plaintext + sizeof(int) + v1.size(), v2.begin(), v2.size());
if (v1.size()) {
memcpy(plaintext + sizeof(int), v1.begin(), v1.size());
}
if (v2.size()) {
memcpy(plaintext + sizeof(int) + v1.size(), v2.begin(), v2.size());
}
ASSERT(cipherKeys.cipherTextKey.isValid());
ASSERT(cipherKeys.cipherHeaderKey.isValid());

View File

@ -2221,6 +2221,10 @@ ACTOR Future<Void> commitQueue(TLogData* self) {
break;
}
if (logData->queueCommittedVersion.get() == std::numeric_limits<Version>::max()) {
break;
}
choose {
when(wait(logData->version.whenAtLeast(
std::max(logData->queueCommittingVersion, logData->queueCommittedVersion.get()) + 1))) {

View File

@ -37,7 +37,6 @@
#include "flow/serialize.h"
#include "flow/genericactors.actor.h"
#include "flow/UnitTest.h"
#include "fdbserver/IPager.h"
#include "flow/IAsyncFile.h"
#include "flow/ActorCollection.h"
#include <map>

View File

@ -64,9 +64,10 @@ struct WorkerInfo : NonCopyable {
ProcessClass processClass,
ClusterControllerPriorityInfo priorityInfo,
bool degraded,
bool recoveredDiskFiles,
Standalone<VectorRef<StringRef>> issues)
: watcher(watcher), reply(reply), gen(gen), reboots(0), initialClass(initialClass), priorityInfo(priorityInfo),
details(interf, processClass, degraded), issues(issues) {}
details(interf, processClass, degraded, recoveredDiskFiles), issues(issues) {}
WorkerInfo(WorkerInfo&& r) noexcept
: watcher(std::move(r.watcher)), reply(std::move(r.reply)), gen(r.gen), reboots(r.reboots),
@ -301,7 +302,7 @@ public:
std::set<AddressExclusion> excludedAddresses(req.excludeAddresses.begin(), req.excludeAddresses.end());
for (auto& it : id_worker)
if (workerAvailable(it.second, false) &&
if (workerAvailable(it.second, false) && it.second.details.recoveredDiskFiles &&
!excludedMachines.count(it.second.details.interf.locality.zoneId()) &&
(includeDCs.size() == 0 || includeDCs.count(it.second.details.interf.locality.dcId())) &&
!addressExcluded(excludedAddresses, it.second.details.interf.address()) &&
@ -316,7 +317,7 @@ public:
Optional<WorkerDetails> bestInfo;
for (auto& it : id_worker) {
ProcessClass::Fitness fit = it.second.details.processClass.machineClassFitness(ProcessClass::Storage);
if (workerAvailable(it.second, false) &&
if (workerAvailable(it.second, false) && it.second.details.recoveredDiskFiles &&
!excludedMachines.count(it.second.details.interf.locality.zoneId()) &&
(includeDCs.size() == 0 || includeDCs.count(it.second.details.interf.locality.dcId())) &&
!addressExcluded(excludedAddresses, it.second.details.interf.address()) && fit < bestFit) {
@ -365,7 +366,8 @@ public:
for (auto& it : id_worker) {
auto fitness = it.second.details.processClass.machineClassFitness(ProcessClass::Storage);
if (workerAvailable(it.second, false) && !conf.isExcludedServer(it.second.details.interf.addresses()) &&
if (workerAvailable(it.second, false) && it.second.details.recoveredDiskFiles &&
!conf.isExcludedServer(it.second.details.interf.addresses()) &&
!isExcludedDegradedServer(it.second.details.interf.addresses()) &&
fitness != ProcessClass::NeverAssign &&
(!dcId.present() || it.second.details.interf.locality.dcId() == dcId.get())) {
@ -597,6 +599,11 @@ public:
logWorkerUnavailable(SevInfo, id, "complex", "Worker is not available", worker_details, fitness, dcIds);
continue;
}
if (!worker_details.recoveredDiskFiles) {
logWorkerUnavailable(
SevInfo, id, "complex", "Worker disk file recovery unfinished", worker_details, fitness, dcIds);
continue;
}
if (conf.isExcludedServer(worker_details.interf.addresses())) {
logWorkerUnavailable(SevInfo,
id,
@ -842,6 +849,11 @@ public:
logWorkerUnavailable(SevInfo, id, "simple", "Worker is not available", worker_details, fitness, dcIds);
continue;
}
if (!worker_details.recoveredDiskFiles) {
logWorkerUnavailable(
SevInfo, id, "simple", "Worker disk file recovery unfinished", worker_details, fitness, dcIds);
continue;
}
if (conf.isExcludedServer(worker_details.interf.addresses())) {
logWorkerUnavailable(SevInfo,
id,
@ -989,6 +1001,11 @@ public:
SevInfo, id, "deprecated", "Worker is not available", worker_details, fitness, dcIds);
continue;
}
if (!worker_details.recoveredDiskFiles) {
logWorkerUnavailable(
SevInfo, id, "deprecated", "Worker disk file recovery unfinished", worker_details, fitness, dcIds);
continue;
}
if (conf.isExcludedServer(worker_details.interf.addresses())) {
logWorkerUnavailable(SevInfo,
id,

View File

@ -59,6 +59,8 @@ public:
const DatabaseConfiguration& configuration) const {
return Void();
}
virtual Future<Void> waitForDataDistributionEnabled(const DDEnabledState* ddEnabledState) const = 0;
};
class DDTxnProcessorImpl;
@ -90,6 +92,8 @@ public:
Future<Void> updateReplicaKeys(const std::vector<Optional<Key>>& primaryIds,
const std::vector<Optional<Key>>& remoteIds,
const DatabaseConfiguration& configuration) const override;
Future<Void> waitForDataDistributionEnabled(const DDEnabledState* ddEnabledState) const override;
};
// A mock transaction implementation for test usage.

View File

@ -138,10 +138,11 @@ struct WorkerDetails {
WorkerInterface interf;
ProcessClass processClass;
bool degraded;
bool recoveredDiskFiles;
WorkerDetails() : degraded(false) {}
WorkerDetails(const WorkerInterface& interf, ProcessClass processClass, bool degraded)
: interf(interf), processClass(processClass), degraded(degraded) {}
WorkerDetails(const WorkerInterface& interf, ProcessClass processClass, bool degraded, bool recoveredDiskFiles)
: interf(interf), processClass(processClass), degraded(degraded), recoveredDiskFiles(recoveredDiskFiles) {}
bool operator<(const WorkerDetails& r) const { return interf.id() < r.interf.id(); }
@ -436,6 +437,7 @@ struct RegisterWorkerRequest {
Version lastSeenKnobVersion;
ConfigClassSet knobConfigClassSet;
bool requestDbInfo;
bool recoveredDiskFiles;
RegisterWorkerRequest()
: priorityInfo(ProcessClass::UnsetFit, false, ClusterControllerPriorityInfo::FitnessUnknown), degraded(false) {}
@ -450,11 +452,12 @@ struct RegisterWorkerRequest {
Optional<EncryptKeyProxyInterface> ekpInterf,
bool degraded,
Version lastSeenKnobVersion,
ConfigClassSet knobConfigClassSet)
ConfigClassSet knobConfigClassSet,
bool recoveredDiskFiles)
: wi(wi), initialClass(initialClass), processClass(processClass), priorityInfo(priorityInfo),
generation(generation), distributorInterf(ddInterf), ratekeeperInterf(rkInterf), blobManagerInterf(bmInterf),
encryptKeyProxyInterf(ekpInterf), degraded(degraded), lastSeenKnobVersion(lastSeenKnobVersion),
knobConfigClassSet(knobConfigClassSet), requestDbInfo(false) {}
knobConfigClassSet(knobConfigClassSet), requestDbInfo(false), recoveredDiskFiles(recoveredDiskFiles) {}
template <class Ar>
void serialize(Ar& ar) {
@ -474,7 +477,8 @@ struct RegisterWorkerRequest {
degraded,
lastSeenKnobVersion,
knobConfigClassSet,
requestDbInfo);
requestDbInfo,
recoveredDiskFiles);
}
};

View File

@ -565,7 +565,8 @@ ACTOR Future<Void> registrationClient(
Reference<AsyncVar<std::set<std::string>> const> issues,
Reference<ConfigNode> configNode,
Reference<LocalConfiguration> localConfig,
Reference<AsyncVar<ServerDBInfo>> dbInfo) {
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Promise<Void> recoveredDiskFiles) {
// Keeps the cluster controller (as it may be re-elected) informed that this worker exists
// The cluster controller uses waitFailureClient to find out if we die, and returns from registrationReply
// (requiring us to re-register) The registration request piggybacks optional distributor interface if it exists.
@ -600,7 +601,8 @@ ACTOR Future<Void> registrationClient(
ekpInterf->get(),
degraded->get(),
localConfig->lastSeenVersion(),
localConfig->configClassSet());
localConfig->configClassSet(),
recoveredDiskFiles.isSet());
for (auto const& i : issues->get()) {
request.issues.push_back_deep(request.issues.arena(), i);
@ -637,10 +639,15 @@ ACTOR Future<Void> registrationClient(
request.requestDbInfo = true;
firstReg = false;
}
TraceEvent("WorkerRegister")
.detail("CCID", ccInterface->get().get().id())
.detail("Generation", requestGeneration)
.detail("RecoveredDiskFiles", recoveredDiskFiles.isSet());
}
state Future<RegisterWorkerReply> registrationReply =
ccInterfacePresent ? brokenPromiseToNever(ccInterface->get().get().registerWorker.getReply(request))
: Never();
state Future<Void> recovered = recoveredDiskFiles.isSet() ? Never() : recoveredDiskFiles.getFuture();
state double startTime = now();
loop choose {
when(RegisterWorkerReply reply = wait(registrationReply)) {
@ -664,6 +671,7 @@ ACTOR Future<Void> registrationClient(
when(wait(degraded->onChange())) { break; }
when(wait(FlowTransport::transport().onIncompatibleChanged())) { break; }
when(wait(issues->onChange())) { break; }
when(wait(recovered)) { break; }
}
}
}
@ -1589,7 +1597,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
// As (store type, spill type) can map to the same TLogFn across multiple TLogVersions, we need to
// decide if we should collapse them into the same SharedTLog instance as well. The answer
// here is no, so that when running with log_version==3, all files should say V=3.
state std::map<SharedLogsKey, SharedLogsValue> sharedLogs;
state std::map<SharedLogsKey, std::vector<SharedLogsValue>> sharedLogs;
state Reference<AsyncVar<UID>> activeSharedTLog(new AsyncVar<UID>());
state WorkerCache<InitializeBackupReply> backupWorkerCache;
state WorkerCache<InitializeBlobWorkerReply> blobWorkerCache;
@ -1787,32 +1795,29 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
Promise<Void> recovery;
TLogFn tLogFn = tLogFnForOptions(s.tLogOptions);
auto& logData = sharedLogs[SharedLogsKey(s.tLogOptions, s.storeType)];
logData.push_back(SharedLogsValue());
// FIXME: Shouldn't if logData.first isValid && !isReady, shouldn't we
// be sending a fake InitializeTLogRequest rather than calling tLog() ?
Future<Void> tl =
tLogFn(kv,
queue,
dbInfo,
locality,
!logData.actor.isValid() || logData.actor.isReady() ? logData.requests
: PromiseStream<InitializeTLogRequest>(),
s.storeID,
interf.id(),
true,
oldLog,
recovery,
folder,
degraded,
activeSharedTLog);
Future<Void> tl = tLogFn(kv,
queue,
dbInfo,
locality,
logData.back().requests,
s.storeID,
interf.id(),
true,
oldLog,
recovery,
folder,
degraded,
activeSharedTLog);
recoveries.push_back(recovery.getFuture());
activeSharedTLog->set(s.storeID);
tl = handleIOErrors(tl, kv, s.storeID);
tl = handleIOErrors(tl, queue, s.storeID);
if (!logData.actor.isValid() || logData.actor.isReady()) {
logData.actor = oldLog.getFuture() || tl;
logData.uid = s.storeID;
}
logData.back().actor = oldLog.getFuture() || tl;
logData.back().uid = s.storeID;
errorForwarders.add(forwardError(errors, Role::SHARED_TRANSACTION_LOG, s.storeID, tl));
}
}
@ -1855,8 +1860,31 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
startRole(Role::WORKER, interf.id(), interf.id(), details);
errorForwarders.add(traceRole(Role::WORKER, interf.id()));
wait(waitForAll(recoveries));
recoveredDiskFiles.send(Void());
// We want to avoid the worker being recruited as storage or TLog before recoverying it is local files,
// to make sure:
// (1) the worker can start serving requests once it is recruited as storage or TLog server, and
// (2) a slow recovering worker server wouldn't been recruited as TLog and make recovery slow.
// However, the worker server can still serve stateless roles, and if encryption is on, it is crucial to have
// some worker available to serve the EncryptKeyProxy role, before opening encrypted storage files.
//
// When encryption-at-rest is enabled, the follow code allows a worker to first register with the
// cluster controller to be recruited only as a stateless process i.e. it can't be recruited as a SS or TLog
// process; once the local disk recovery is complete (if applicable), the process re-registers with cluster
// controller as a stateful process role.
//
// TODO(yiwu): Unify behavior for encryption and non-encryption once the change is stable.
Future<Void> recoverDiskFiles = trigger(
[=]() {
TraceEvent("RecoveriesComplete", interf.id());
recoveredDiskFiles.send(Void());
return Void();
},
waitForAll(recoveries));
if (!SERVER_KNOBS->ENABLE_ENCRYPTION) {
wait(recoverDiskFiles);
} else {
errorForwarders.add(recoverDiskFiles);
}
errorForwarders.add(registrationClient(ccInterface,
interf,
@ -1871,7 +1899,8 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
issues,
configNode,
localConfig,
dbInfo));
dbInfo,
recoveredDiskFiles));
if (configNode.isValid()) {
errorForwarders.add(localConfig->consume(interf.configBroadcastInterface));
@ -1881,8 +1910,6 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
errorForwarders.add(healthMonitor(ccInterface, interf, locality, dbInfo));
}
TraceEvent("RecoveriesComplete", interf.id());
loop choose {
when(UpdateServerDBInfoRequest req = waitNext(interf.updateServerDBInfo.getFuture())) {
ServerDBInfo localInfo = BinaryReader::fromStringRef<ServerDBInfo>(
@ -2156,8 +2183,10 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
TLogOptions tLogOptions(req.logVersion, req.spillType);
TLogFn tLogFn = tLogFnForOptions(tLogOptions);
auto& logData = sharedLogs[SharedLogsKey(tLogOptions, req.storeType)];
logData.requests.send(req);
if (!logData.actor.isValid() || logData.actor.isReady()) {
while (!logData.empty() && (!logData.back().actor.isValid() || logData.back().actor.isReady())) {
logData.pop_back();
}
if (logData.empty()) {
UID logId = deterministicRandom()->randomUniqueID();
std::map<std::string, std::string> details;
details["ForMaster"] = req.recruitmentID.shortString();
@ -2183,11 +2212,12 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
filesClosed.add(data->onClosed());
filesClosed.add(queue->onClosed());
logData.push_back(SharedLogsValue());
Future<Void> tLogCore = tLogFn(data,
queue,
dbInfo,
locality,
logData.requests,
logData.back().requests,
logId,
interf.id(),
false,
@ -2199,10 +2229,11 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
tLogCore = handleIOErrors(tLogCore, data, logId);
tLogCore = handleIOErrors(tLogCore, queue, logId);
errorForwarders.add(forwardError(errors, Role::SHARED_TRANSACTION_LOG, logId, tLogCore));
logData.actor = tLogCore;
logData.uid = logId;
logData.back().actor = tLogCore;
logData.back().uid = logId;
}
activeSharedTLog->set(logData.uid);
logData.back().requests.send(req);
activeSharedTLog->set(logData.back().uid);
}
when(InitializeStorageRequest req = waitNext(interf.storage.getFuture())) {
// We want to prevent double recruiting on a worker unless we try to recruit something

View File

@ -2,6 +2,10 @@ find_package(Threads REQUIRED)
fdb_find_sources(FLOW_SRCS)
# Remove files with `main` defined so we can create a link test executable.
list(REMOVE_ITEM FLOW_SRCS TLSTest.cpp)
list(REMOVE_ITEM FLOW_SRCS MkCertCli.cpp)
if(CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64")
list(APPEND FLOW_SRCS aarch64/memcmp.S aarch64/memcpy.S)
endif()
@ -13,7 +17,14 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake ${CMAKE_CURRENT_BINARY
add_flow_target(STATIC_LIBRARY NAME flow SRCS ${FLOW_SRCS})
add_flow_target(STATIC_LIBRARY NAME flow_sampling SRCS ${FLOW_SRCS})
foreach(ft flow flow_sampling)
# When creating a static or shared library, undefined symbols will be ignored.
# Since we want to ensure no symbols from other modules are used, create an
# executable so the linker will throw errors if it can't find the declaration
# of a symbol.
add_flow_target(LINK_TEST NAME flowlinktest SRCS ${FLOW_SRCS} LinkTest.cpp)
target_link_libraries(flowlinktest PRIVATE flow stacktrace)
foreach(ft flow flow_sampling flowlinktest)
target_include_directories(${ft} PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include")
target_link_libraries(${ft} PRIVATE stacktrace)
@ -68,7 +79,6 @@ if(WIN32)
add_dependencies(flow_sampling_actors flow_actors)
endif()
add_executable(mkcert MkCertCli.cpp)
target_link_libraries(mkcert PUBLIC flow)

8
flow/LinkTest.cpp Normal file
View File

@ -0,0 +1,8 @@
// When creating a static or shared library, undefined symbols will be ignored.
// Since we want to ensure no symbols from other modules are used, each module
// will create an executable so the linker will throw errors if it can't find
// the declaration of a symbol. This class defines a dummy main function so the
// executable can be built.
int main() {
return 0;
}

View File

@ -0,0 +1,108 @@
#!/bin/bash
set -euo pipefail
ROOT=`pwd`
SERVER_COUNT=1
PORT_PREFIX=1500
# default cluster settings, override with options
STATELESS_COUNT=4
LOGS_COUNT=8
STORAGE_COUNT=16
KNOBS=""
LOGS_TASKSET=""
STATELESS_TASKSET=""
STORAGE_TASKSET=""
function usage {
echo "Usage"
printf "\tcd working-directory; ${0} path-to-build-root [OPTIONS]\n\r"
echo "Options"
printf "\t--knobs '--knob-KNOBNAME=KNOBVALUE' \n\r\t\tChanges a database knob. Enclose in single quotes.\n\r"
printf "\t--stateless_count COUNT\n\r\t\t number of stateless daemons to start. Default ${STATELESS_COUNT}\n\r"
printf "\t--stateless_taskset BITMASK\n\r\t\tBitmask of CPUs to pin stateless tasks to. Default is all CPUs.\n\r"
printf "\t--logs_count COUNT\n\r\t\tNumber of stateless daemons to start. Default ${LOGS_COUNT}\n\r"
printf "\t--logs_taskset BITMASK\n\r\t\tbitmask of CPUs to pin logs to. Default is all CPUs.\n\r"
printf "\t--storage_count COUNT\n\r\t\tnumber of storage daemons to start. Default ${STORAGE_COUNT}\n\r"
printf "\t--storage_taskset BITMASK\n\r\t\tBitmask of CPUs to pin storage to. Default is all CPUs.\n\r"
echo "Example"
printf "\t${0} . --knobs '--knob_proxy_use_resolver_private_mutations=1' --stateless_count 4 --stateless_taskset 0xf --logs_count 8 --logs_taskset 0xff0 --storage_taskset 0xffff000\n\r"
exit 1
}
function start_servers {
for j in `seq 1 $1`; do
LOG=${DIR}/${SERVER_COUNT}/log
DATA=${DIR}/${SERVER_COUNT}/data
mkdir -p ${LOG} ${DATA}
PORT=$(( $PORT_PREFIX + $SERVER_COUNT ))
$2 ${FDB} -p auto:${PORT} "$KNOBS" -c $3 -d $DATA -L $LOG -C $CLUSTER &
SERVER_COUNT=$(( $SERVER_COUNT + 1 ))
done
}
if (( $# < 1 )) ; then
echo Wrong number of arguments
usage
fi
if [[ $1 == "-h" || $1 == "--help" ]]; then
usage
fi
BUILD=$1
shift;
while [[ $# -gt 0 ]]; do
case "$1" in
--knobs)
KNOBS="$2"
;;
--stateless_taskset)
STATELESS_TASKSET="taskset ${2}"
;;
--logs_taskset)
LOGS_TASKSET="taskset ${2}"
;;
--storage_taskset)
STORAGE_TASKSET="taskset ${2}"
;;
--stateless_count)
STATELESS_COUNT=$2
;;
--logs_count)
LOGS_COUNT=$2
;;
--storage_count)
STORAGE_COUNT=$2
;;
esac
shift; shift
done
FDB=${BUILD}/bin/fdbserver
if [ ! -f ${FDB} ]; then
echo "Error: ${FDB} not found!"
usage
fi
DIR=./loopback-cluster
rm -rf $DIR
mkdir -p ${DIR}
CLUSTER_FILE="test1:testdb1@127.0.0.1:$(( $PORT_PREFIX + 1))"
CLUSTER=${DIR}/fdb.cluster
echo $CLUSTER_FILE > $CLUSTER
echo "Starting Cluster: " $CLUSTER_FILE
start_servers $STATELESS_COUNT "$STATELESS_TASKSET" stateless
start_servers $LOGS_COUNT "$LOGS_TASKSET" log
start_servers $STORAGE_COUNT "$STORAGE_TASKSET" storage
CLI="$BUILD/bin/fdbcli -C ${CLUSTER} --exec"
echo "configure new ssd single - stand by"
# sleep 2 seconds to wait for workers to join cluster, then configure database
( sleep 2 ; $CLI "configure new ssd single" )