Merge branch 'main' of https://github.com/apple/foundationdb into readaware

This commit is contained in:
Xiaoxi Wang 2022-03-28 14:20:46 -07:00
commit 2bc67a4f1d
47 changed files with 917 additions and 211 deletions

View File

@ -135,6 +135,7 @@ if(NOT WIN32)
add_executable(fdb_c_performance_test test/performance_test.c test/test.h) add_executable(fdb_c_performance_test test/performance_test.c test/test.h)
add_executable(fdb_c_ryw_benchmark test/ryw_benchmark.c test/test.h) add_executable(fdb_c_ryw_benchmark test/ryw_benchmark.c test/test.h)
add_executable(fdb_c_txn_size_test test/txn_size_test.c test/test.h) add_executable(fdb_c_txn_size_test test/txn_size_test.c test/test.h)
add_executable(fdb_c_client_memory_test test/client_memory_test.cpp test/unit/fdb_api.cpp test/unit/fdb_api.hpp)
add_executable(mako ${MAKO_SRCS}) add_executable(mako ${MAKO_SRCS})
add_executable(fdb_c_setup_tests test/unit/setup_tests.cpp) add_executable(fdb_c_setup_tests test/unit/setup_tests.cpp)
add_executable(fdb_c_unit_tests ${UNIT_TEST_SRCS}) add_executable(fdb_c_unit_tests ${UNIT_TEST_SRCS})
@ -145,10 +146,12 @@ if(NOT WIN32)
strip_debug_symbols(fdb_c_performance_test) strip_debug_symbols(fdb_c_performance_test)
strip_debug_symbols(fdb_c_ryw_benchmark) strip_debug_symbols(fdb_c_ryw_benchmark)
strip_debug_symbols(fdb_c_txn_size_test) strip_debug_symbols(fdb_c_txn_size_test)
strip_debug_symbols(fdb_c_client_memory_test)
endif() endif()
target_link_libraries(fdb_c_performance_test PRIVATE fdb_c Threads::Threads) target_link_libraries(fdb_c_performance_test PRIVATE fdb_c Threads::Threads)
target_link_libraries(fdb_c_ryw_benchmark PRIVATE fdb_c Threads::Threads) target_link_libraries(fdb_c_ryw_benchmark PRIVATE fdb_c Threads::Threads)
target_link_libraries(fdb_c_txn_size_test PRIVATE fdb_c Threads::Threads) target_link_libraries(fdb_c_txn_size_test PRIVATE fdb_c Threads::Threads)
target_link_libraries(fdb_c_client_memory_test PRIVATE fdb_c Threads::Threads)
add_dependencies(fdb_c_setup_tests doctest) add_dependencies(fdb_c_setup_tests doctest)
add_dependencies(fdb_c_unit_tests doctest) add_dependencies(fdb_c_unit_tests doctest)

View File

@ -835,9 +835,10 @@ extern "C" DLLEXPORT FDBResult* fdb_transaction_read_blob_granules(FDBTransactio
context.get_load_f = granule_context.get_load_f; context.get_load_f = granule_context.get_load_f;
context.free_load_f = granule_context.free_load_f; context.free_load_f = granule_context.free_load_f;
context.debugNoMaterialize = granule_context.debugNoMaterialize; context.debugNoMaterialize = granule_context.debugNoMaterialize;
context.granuleParallelism = granule_context.granuleParallelism;
Optional<Version> rv; Optional<Version> rv;
if (readVersion != invalidVersion) { rv = readVersion; } if (readVersion != latestVersion) { rv = readVersion; }
return (FDBResult*)(TXN(tr)->readBlobGranules(range, beginVersion, rv, context).extractPtr());); return (FDBResult*)(TXN(tr)->readBlobGranules(range, beginVersion, rv, context).extractPtr()););
} }

View File

@ -185,7 +185,12 @@ typedef struct readgranulecontext {
void* userContext; void* userContext;
/* Returns a unique id for the load. Asynchronous to support queueing multiple in parallel. */ /* Returns a unique id for the load. Asynchronous to support queueing multiple in parallel. */
int64_t (*start_load_f)(const char* filename, int filenameLength, int64_t offset, int64_t length, void* context); int64_t (*start_load_f)(const char* filename,
int filenameLength,
int64_t offset,
int64_t length,
int64_t fullFileLength,
void* context);
/* Returns data for the load. Pass the loadId returned by start_load_f */ /* Returns data for the load. Pass the loadId returned by start_load_f */
uint8_t* (*get_load_f)(int64_t loadId, void* context); uint8_t* (*get_load_f)(int64_t loadId, void* context);
@ -196,6 +201,9 @@ typedef struct readgranulecontext {
/* Set this to true for testing if you don't want to read the granule files, /* Set this to true for testing if you don't want to read the granule files,
just do the request to the blob workers */ just do the request to the blob workers */
fdb_bool_t debugNoMaterialize; fdb_bool_t debugNoMaterialize;
/* Number of granules to load in parallel */
int granuleParallelism;
} FDBReadBlobGranuleContext; } FDBReadBlobGranuleContext;
DLLEXPORT void fdb_future_cancel(FDBFuture* f); DLLEXPORT void fdb_future_cancel(FDBFuture* f);
@ -441,15 +449,15 @@ DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_range_split_points(F
int end_key_name_length, int end_key_name_length,
int64_t chunk_size); int64_t chunk_size);
DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_blob_granule_ranges(FDBTransaction* db, DLLEXPORT WARN_UNUSED_RESULT FDBFuture* fdb_transaction_get_blob_granule_ranges(FDBTransaction* tr,
uint8_t const* begin_key_name, uint8_t const* begin_key_name,
int begin_key_name_length, int begin_key_name_length,
uint8_t const* end_key_name, uint8_t const* end_key_name,
int end_key_name_length); int end_key_name_length);
/* InvalidVersion (-1) for readVersion means get read version from transaction /* LatestVersion (-2) for readVersion means get read version from transaction
Separated out as optional because BG reads can support longer-lived reads than normal FDB transactions */ Separated out as optional because BG reads can support longer-lived reads than normal FDB transactions */
DLLEXPORT WARN_UNUSED_RESULT FDBResult* fdb_transaction_read_blob_granules(FDBTransaction* db, DLLEXPORT WARN_UNUSED_RESULT FDBResult* fdb_transaction_read_blob_granules(FDBTransaction* tr,
uint8_t const* begin_key_name, uint8_t const* begin_key_name,
int begin_key_name_length, int begin_key_name_length,
uint8_t const* end_key_name, uint8_t const* end_key_name,

View File

@ -0,0 +1,83 @@
/*
* client_memory_test.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define FDB_API_VERSION 710
#include <foundationdb/fdb_c.h>
#include "unit/fdb_api.hpp"
#include <thread>
#include <iostream>
#include <vector>
void fdb_check(fdb_error_t e) {
if (e) {
std::cerr << fdb_get_error(e) << std::endl;
std::abort();
}
}
FDBDatabase* fdb_open_database(const char* clusterFile) {
FDBDatabase* db;
fdb_check(fdb_create_database(clusterFile, &db));
return db;
}
int main(int argc, char** argv) {
if (argc != 2) {
printf("Usage: %s <cluster_file>", argv[0]);
}
fdb_check(fdb_select_api_version(710));
fdb_check(fdb_setup_network());
std::thread network_thread{ &fdb_run_network };
fdb_check(
fdb_network_set_option(FDBNetworkOption::FDB_NET_OPTION_TRACE_ENABLE, reinterpret_cast<const uint8_t*>(""), 0));
fdb_check(fdb_network_set_option(
FDBNetworkOption::FDB_NET_OPTION_TRACE_FORMAT, reinterpret_cast<const uint8_t*>("json"), 4));
// Use a bunch of memory from different client threads
FDBDatabase* db = fdb_open_database(argv[1]);
auto thread_func = [&]() {
fdb::Transaction tr(db);
for (int i = 0; i < 10000; ++i) {
tr.set(std::to_string(i), std::string(i, '\x00'));
}
tr.cancel();
};
std::vector<std::thread> threads;
constexpr auto kThreadCount = 64;
for (int i = 0; i < kThreadCount; ++i) {
threads.emplace_back(thread_func);
}
for (auto& thread : threads) {
thread.join();
}
fdb_database_destroy(db);
db = nullptr;
// Memory usage should go down now if the allocator is returning memory to the OS. It's expected that something is
// externally monitoring the memory usage of this process during this sleep.
using namespace std::chrono_literals;
std::this_thread::sleep_for(10s);
fdb_check(fdb_stop_network());
network_thread.join();
}

View File

@ -585,6 +585,7 @@ int64_t granule_start_load(const char* filename,
int filenameLength, int filenameLength,
int64_t offset, int64_t offset,
int64_t length, int64_t length,
int64_t fullFileLength,
void* userContext) { void* userContext) {
FILE* fp; FILE* fp;
char full_fname[PATH_MAX]; char full_fname[PATH_MAX];
@ -682,6 +683,7 @@ int run_op_read_blob_granules(FDBTransaction* transaction,
granuleContext.get_load_f = &granule_get_load; granuleContext.get_load_f = &granule_get_load;
granuleContext.free_load_f = &granule_free_load; granuleContext.free_load_f = &granule_free_load;
granuleContext.debugNoMaterialize = !doMaterialize; granuleContext.debugNoMaterialize = !doMaterialize;
granuleContext.granuleParallelism = 2; // TODO make knob or setting for changing this?
r = fdb_transaction_read_blob_granules(transaction, r = fdb_transaction_read_blob_granules(transaction,
(uint8_t*)keystr, (uint8_t*)keystr,
@ -689,7 +691,7 @@ int run_op_read_blob_granules(FDBTransaction* transaction,
(uint8_t*)keystr2, (uint8_t*)keystr2,
strlen(keystr2), strlen(keystr2),
0 /* beginVersion*/, 0 /* beginVersion*/,
-1, /* endVersion. -1 is use txn read version */ -2, /* endVersion. -2 (latestVersion) is use txn read version */
granuleContext); granuleContext);
free(fileContext.data_by_id); free(fileContext.data_by_id);

View File

@ -20,6 +20,7 @@
// Unit tests for the FoundationDB C API. // Unit tests for the FoundationDB C API.
#include "fdb_c_options.g.h"
#define FDB_API_VERSION 710 #define FDB_API_VERSION 710
#include <foundationdb/fdb_c.h> #include <foundationdb/fdb_c.h>
#include <assert.h> #include <assert.h>
@ -2430,6 +2431,38 @@ TEST_CASE("Tenant create, access, and delete") {
break; break;
} }
while (1) {
StringRef begin = "\xff\xff/management/tenant_map/"_sr;
StringRef end = "\xff\xff/management/tenant_map0"_sr;
fdb_check(tr.set_option(FDB_TR_OPTION_SPECIAL_KEY_SPACE_ENABLE_WRITES, nullptr, 0));
fdb::KeyValueArrayFuture f = tr.get_range(FDB_KEYSEL_FIRST_GREATER_OR_EQUAL(begin.begin(), begin.size()),
FDB_KEYSEL_FIRST_GREATER_OR_EQUAL(end.begin(), end.size()),
/* limit */ 0,
/* target_bytes */ 0,
/* FDBStreamingMode */ FDB_STREAMING_MODE_WANT_ALL,
/* iteration */ 0,
/* snapshot */ false,
/* reverse */ 0);
fdb_error_t err = wait_future(f);
if (err) {
fdb::EmptyFuture f2 = tr.on_error(err);
fdb_check(wait_future(f2));
continue;
}
FDBKeyValue const* outKv;
int outCount;
int outMore;
fdb_check(f.get(&outKv, &outCount, &outMore));
CHECK(outCount == 1);
CHECK(StringRef(outKv->key, outKv->key_length) == StringRef(tenantName).withPrefix(begin));
tr.reset();
break;
}
fdb::Tenant tenant(db, reinterpret_cast<const uint8_t*>(tenantName.c_str()), tenantName.size()); fdb::Tenant tenant(db, reinterpret_cast<const uint8_t*>(tenantName.c_str()), tenantName.size());
fdb::Transaction tr2(tenant); fdb::Transaction tr2(tenant);
@ -2505,6 +2538,152 @@ TEST_CASE("Tenant create, access, and delete") {
} }
} }
int64_t granule_start_load_fail(const char* filename,
int filenameLength,
int64_t offset,
int64_t length,
int64_t fullFileLength,
void* userContext) {
CHECK(false);
return -1;
}
uint8_t* granule_get_load_fail(int64_t loadId, void* userContext) {
CHECK(false);
return nullptr;
}
void granule_free_load_fail(int64_t loadId, void* userContext) {
CHECK(false);
}
TEST_CASE("Blob Granule Functions") {
auto confValue =
get_value("\xff/conf/blob_granules_enabled", /* snapshot */ false, { FDB_TR_OPTION_READ_SYSTEM_KEYS });
if (!confValue.has_value() || confValue.value() != "1") {
return;
}
// write some data
insert_data(db, create_data({ { "bg1", "a" }, { "bg2", "b" }, { "bg3", "c" } }));
// because wiring up files is non-trivial, just test the calls complete with the expected no_materialize error
FDBReadBlobGranuleContext granuleContext;
granuleContext.userContext = nullptr;
granuleContext.start_load_f = &granule_start_load_fail;
granuleContext.get_load_f = &granule_get_load_fail;
granuleContext.free_load_f = &granule_free_load_fail;
granuleContext.debugNoMaterialize = true;
granuleContext.granuleParallelism = 1;
// dummy values
FDBKeyValue const* out_kv;
int out_count;
int out_more;
fdb::Transaction tr(db);
int64_t originalReadVersion = -1;
// test no materialize gets error but completes, save read version
while (1) {
fdb_check(tr.set_option(FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE, nullptr, 0));
// -2 is latest version
fdb::KeyValueArrayResult r = tr.read_blob_granules(key("bg"), key("bh"), 0, -2, granuleContext);
fdb_error_t err = r.get(&out_kv, &out_count, &out_more);
if (err && err != 2037 /* blob_granule_not_materialized */) {
fdb::EmptyFuture f2 = tr.on_error(err);
fdb_check(wait_future(f2));
continue;
}
CHECK(err == 2037 /* blob_granule_not_materialized */);
// If read done, save read version. Should have already used read version so this shouldn't error
fdb::Int64Future grvFuture = tr.get_read_version();
fdb_error_t grvErr = wait_future(grvFuture);
CHECK(!grvErr);
CHECK(!grvFuture.get(&originalReadVersion));
CHECK(originalReadVersion > 0);
tr.reset();
break;
}
// test with begin version > 0
while (1) {
fdb_check(tr.set_option(FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE, nullptr, 0));
// -2 is latest version, read version should be >= originalReadVersion
fdb::KeyValueArrayResult r =
tr.read_blob_granules(key("bg"), key("bh"), originalReadVersion, -2, granuleContext);
fdb_error_t err = r.get(&out_kv, &out_count, &out_more);
;
if (err && err != 2037 /* blob_granule_not_materialized */) {
fdb::EmptyFuture f2 = tr.on_error(err);
fdb_check(wait_future(f2));
continue;
}
CHECK(err == 2037 /* blob_granule_not_materialized */);
tr.reset();
break;
}
// test with prior read version completes after delay larger than normal MVC window
// TODO: should we not do this?
std::this_thread::sleep_for(std::chrono::milliseconds(6000));
while (1) {
fdb_check(tr.set_option(FDB_TR_OPTION_READ_YOUR_WRITES_DISABLE, nullptr, 0));
fdb::KeyValueArrayResult r =
tr.read_blob_granules(key("bg"), key("bh"), 0, originalReadVersion, granuleContext);
fdb_error_t err = r.get(&out_kv, &out_count, &out_more);
if (err && err != 2037 /* blob_granule_not_materialized */) {
fdb::EmptyFuture f2 = tr.on_error(err);
fdb_check(wait_future(f2));
continue;
}
CHECK(err == 2037 /* blob_granule_not_materialized */);
tr.reset();
break;
}
// test ranges
while (1) {
fdb::KeyRangeArrayFuture f = tr.get_blob_granule_ranges(key("bg"), key("bh"));
fdb_error_t err = wait_future(f);
if (err) {
fdb::EmptyFuture f2 = tr.on_error(err);
fdb_check(wait_future(f2));
continue;
}
const FDBKeyRange* out_kr;
int out_count;
fdb_check(f.get(&out_kr, &out_count));
CHECK(out_count >= 1);
// check key ranges are in order
for (int i = 0; i < out_count; i++) {
// key range start < end
CHECK(std::string((const char*)out_kr[i].begin_key, out_kr[i].begin_key_length) <
std::string((const char*)out_kr[i].end_key, out_kr[i].end_key_length));
}
// Ranges themselves are sorted
for (int i = 0; i < out_count - 1; i++) {
CHECK(std::string((const char*)out_kr[i].end_key, out_kr[i].end_key_length) <=
std::string((const char*)out_kr[i + 1].begin_key, out_kr[i + 1].begin_key_length));
}
tr.reset();
break;
}
}
int main(int argc, char** argv) { int main(int argc, char** argv) {
if (argc < 3) { if (argc < 3) {
std::cout << "Unit tests for the FoundationDB C API.\n" std::cout << "Unit tests for the FoundationDB C API.\n"

View File

@ -212,6 +212,17 @@ endif()
set(COROUTINE_IMPL ${DEFAULT_COROUTINE_IMPL} CACHE STRING "Which coroutine implementation to use. Options are boost and libcoro") set(COROUTINE_IMPL ${DEFAULT_COROUTINE_IMPL} CACHE STRING "Which coroutine implementation to use. Options are boost and libcoro")
################################################################################
# AWS SDK
################################################################################
set(BUILD_AWS_BACKUP OFF CACHE BOOL "Build AWS S3 SDK backup client")
if (BUILD_AWS_BACKUP)
set(WITH_AWS_BACKUP ON)
else()
set(WITH_AWS_BACKUP OFF)
endif()
################################################################################ ################################################################################
file(MAKE_DIRECTORY ${CMAKE_BINARY_DIR}/packages) file(MAKE_DIRECTORY ${CMAKE_BINARY_DIR}/packages)
@ -232,6 +243,7 @@ function(print_components)
message(STATUS "Build Python sdist (make package): ${WITH_PYTHON_BINDING}") message(STATUS "Build Python sdist (make package): ${WITH_PYTHON_BINDING}")
message(STATUS "Configure CTest (depends on Python): ${WITH_PYTHON}") message(STATUS "Configure CTest (depends on Python): ${WITH_PYTHON}")
message(STATUS "Build with RocksDB: ${WITH_ROCKSDB_EXPERIMENTAL}") message(STATUS "Build with RocksDB: ${WITH_ROCKSDB_EXPERIMENTAL}")
message(STATUS "Build with AWS SDK: ${WITH_AWS_BACKUP}")
message(STATUS "=========================================") message(STATUS "=========================================")
endfunction() endfunction()

98
cmake/awssdk.cmake Normal file
View File

@ -0,0 +1,98 @@
project(awssdk-download NONE)
# Compile the sdk with clang and libc++, since otherwise we get libc++ vs libstdc++ link errors when compiling fdb with clang
set(AWSSDK_COMPILER_FLAGS "")
set(AWSSDK_LINK_FLAGS "")
if(APPLE OR CLANG OR USE_LIBCXX)
set(AWSSDK_COMPILER_FLAGS -stdlib=libc++ -nostdlib++)
set(AWSSDK_LINK_FLAGS -stdlib=libc++ -lc++abi)
endif()
include(ExternalProject)
ExternalProject_Add(awssdk_project
GIT_REPOSITORY https://github.com/aws/aws-sdk-cpp.git
GIT_TAG 2af3ce543c322cb259471b3b090829464f825972 # v1.9.200
SOURCE_DIR "${CMAKE_CURRENT_BINARY_DIR}/awssdk-src"
BINARY_DIR "${CMAKE_CURRENT_BINARY_DIR}/awssdk-build"
GIT_CONFIG advice.detachedHead=false
CMAKE_ARGS -DBUILD_SHARED_LIBS=OFF # SDK builds shared libs by default, we want static libs
-DENABLE_TESTING=OFF
-DBUILD_ONLY=core # git repo contains SDK for every AWS product, we only want the core auth libraries
-DSIMPLE_INSTALL=ON
-DCMAKE_INSTALL_PREFIX=install # need to specify an install prefix so it doesn't install in /usr/lib - FIXME: use absolute path
-DBYO_CRYPTO=ON # we have our own crypto libraries that conflict if we let aws sdk build and link its own
-DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
-DCMAKE_EXE_LINKER_FLAGS=${AWSSDK_COMPILER_FLAGS}
-DCMAKE_CXX_FLAGS=${AWSSDK_LINK_FLAGS}
TEST_COMMAND ""
BUILD_ALWAYS TRUE
# the sdk build produces a ton of artifacts, with their own dependency tree, so there is a very specific dependency order they must be linked in
BUILD_BYPRODUCTS "${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-cpp-sdk-core.a"
"${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-crt-cpp.a"
"${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-s3.a"
"${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-auth.a"
"${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-event-stream.a"
"${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-http.a"
"${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-mqtt.a"
"${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-io.a"
"${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-checksums.a"
"${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-compression.a"
"${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-cal.a"
"${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-common.a"
)
add_library(awssdk_core STATIC IMPORTED)
add_dependencies(awssdk_core awssdk_project)
set_target_properties(awssdk_core PROPERTIES IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-cpp-sdk-core.a")
add_library(awssdk_crt STATIC IMPORTED)
add_dependencies(awssdk_crt awssdk_project)
set_target_properties(awssdk_crt PROPERTIES IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-crt-cpp.a")
# TODO: can we remove c_s3? It seems to be a dependency of libaws-crt
add_library(awssdk_c_s3 STATIC IMPORTED)
add_dependencies(awssdk_c_s3 awssdk_project)
set_target_properties(awssdk_c_s3 PROPERTIES IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-s3.a")
add_library(awssdk_c_auth STATIC IMPORTED)
add_dependencies(awssdk_c_auth awssdk_project)
set_target_properties(awssdk_c_auth PROPERTIES IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-auth.a")
add_library(awssdk_c_eventstream STATIC IMPORTED)
add_dependencies(awssdk_c_eventstream awssdk_project)
set_target_properties(awssdk_c_eventstream PROPERTIES IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-event-stream.a")
add_library(awssdk_c_http STATIC IMPORTED)
add_dependencies(awssdk_c_http awssdk_project)
set_target_properties(awssdk_c_http PROPERTIES IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-http.a")
add_library(awssdk_c_mqtt STATIC IMPORTED)
add_dependencies(awssdk_c_mqtt awssdk_project)
set_target_properties(awssdk_c_mqtt PROPERTIES IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-mqtt.a")
add_library(awssdk_c_io STATIC IMPORTED)
add_dependencies(awssdk_c_io awssdk_project)
set_target_properties(awssdk_c_io PROPERTIES IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-io.a")
add_library(awssdk_checksums STATIC IMPORTED)
add_dependencies(awssdk_checksums awssdk_project)
set_target_properties(awssdk_checksums PROPERTIES IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-checksums.a")
add_library(awssdk_c_compression STATIC IMPORTED)
add_dependencies(awssdk_c_compression awssdk_project)
set_target_properties(awssdk_c_compression PROPERTIES IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-compression.a")
add_library(awssdk_c_cal STATIC IMPORTED)
add_dependencies(awssdk_c_cal awssdk_project)
set_target_properties(awssdk_c_cal PROPERTIES IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-cal.a")
add_library(awssdk_c_common STATIC IMPORTED)
add_dependencies(awssdk_c_common awssdk_project)
set_target_properties(awssdk_c_common PROPERTIES IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/lib64/libaws-c-common.a")
# link them all together in one interface target
add_library(awssdk_target INTERFACE)
target_include_directories(awssdk_target SYSTEM INTERFACE ${CMAKE_CURRENT_BINARY_DIR}/awssdk-build/install/include)
target_link_libraries(awssdk_target INTERFACE awssdk_core awssdk_crt awssdk_c_s3 awssdk_c_auth awssdk_c_eventstream awssdk_c_http awssdk_c_mqtt awssdk_c_io awssdk_checksums awssdk_c_compression awssdk_c_cal awssdk_c_common curl)

View File

@ -238,7 +238,7 @@ ACTOR Future<Void> echoClient() {
return Void(); return Void();
} }
struct SimpleKeyValueStoreInteface { struct SimpleKeyValueStoreInterface {
constexpr static FileIdentifier file_identifier = 8226647; constexpr static FileIdentifier file_identifier = 8226647;
RequestStream<struct GetKVInterface> connect; RequestStream<struct GetKVInterface> connect;
RequestStream<struct GetRequest> get; RequestStream<struct GetRequest> get;
@ -253,7 +253,7 @@ struct SimpleKeyValueStoreInteface {
struct GetKVInterface { struct GetKVInterface {
constexpr static FileIdentifier file_identifier = 8062308; constexpr static FileIdentifier file_identifier = 8062308;
ReplyPromise<SimpleKeyValueStoreInteface> reply; ReplyPromise<SimpleKeyValueStoreInterface> reply;
template <class Ar> template <class Ar>
void serialize(Ar& ar) { void serialize(Ar& ar) {
@ -297,7 +297,7 @@ struct ClearRequest {
}; };
ACTOR Future<Void> kvStoreServer() { ACTOR Future<Void> kvStoreServer() {
state SimpleKeyValueStoreInteface inf; state SimpleKeyValueStoreInterface inf;
state std::map<std::string, std::string> store; state std::map<std::string, std::string> store;
inf.connect.makeWellKnownEndpoint(WLTOKEN_SIMPLE_KV_SERVER, TaskPriority::DefaultEndpoint); inf.connect.makeWellKnownEndpoint(WLTOKEN_SIMPLE_KV_SERVER, TaskPriority::DefaultEndpoint);
loop { loop {
@ -333,17 +333,17 @@ ACTOR Future<Void> kvStoreServer() {
} }
} }
ACTOR Future<SimpleKeyValueStoreInteface> connect() { ACTOR Future<SimpleKeyValueStoreInterface> connect() {
std::cout << format("%llu: Connect...\n", uint64_t(g_network->now())); std::cout << format("%llu: Connect...\n", uint64_t(g_network->now()));
SimpleKeyValueStoreInteface c; SimpleKeyValueStoreInterface c;
c.connect = RequestStream<GetKVInterface>(Endpoint::wellKnown({ serverAddress }, WLTOKEN_SIMPLE_KV_SERVER)); c.connect = RequestStream<GetKVInterface>(Endpoint::wellKnown({ serverAddress }, WLTOKEN_SIMPLE_KV_SERVER));
SimpleKeyValueStoreInteface result = wait(c.connect.getReply(GetKVInterface())); SimpleKeyValueStoreInterface result = wait(c.connect.getReply(GetKVInterface()));
std::cout << format("%llu: done..\n", uint64_t(g_network->now())); std::cout << format("%llu: done..\n", uint64_t(g_network->now()));
return result; return result;
} }
ACTOR Future<Void> kvSimpleClient() { ACTOR Future<Void> kvSimpleClient() {
state SimpleKeyValueStoreInteface server = wait(connect()); state SimpleKeyValueStoreInterface server = wait(connect());
std::cout << format("Set %s -> %s\n", "foo", "bar"); std::cout << format("Set %s -> %s\n", "foo", "bar");
SetRequest setRequest; SetRequest setRequest;
setRequest.key = "foo"; setRequest.key = "foo";
@ -356,7 +356,7 @@ ACTOR Future<Void> kvSimpleClient() {
return Void(); return Void();
} }
ACTOR Future<Void> kvClient(SimpleKeyValueStoreInteface server, std::shared_ptr<uint64_t> ops) { ACTOR Future<Void> kvClient(SimpleKeyValueStoreInterface server, std::shared_ptr<uint64_t> ops) {
state Future<Void> timeout = delay(20); state Future<Void> timeout = delay(20);
state int rangeSize = 2 << 12; state int rangeSize = 2 << 12;
loop { loop {
@ -397,7 +397,7 @@ ACTOR Future<Void> throughputMeasurement(std::shared_ptr<uint64_t> operations) {
} }
ACTOR Future<Void> multipleClients() { ACTOR Future<Void> multipleClients() {
SimpleKeyValueStoreInteface server = wait(connect()); SimpleKeyValueStoreInterface server = wait(connect());
auto ops = std::make_shared<uint64_t>(0); auto ops = std::make_shared<uint64_t>(0);
std::vector<Future<Void>> clients(100); std::vector<Future<Void>> clients(100);
for (auto& f : clients) { for (auto& f : clients) {

View File

@ -18,9 +18,12 @@
* limitations under the License. * limitations under the License.
*/ */
#include <vector>
#include "contrib/fmt-8.1.1/include/fmt/format.h" #include "contrib/fmt-8.1.1/include/fmt/format.h"
#include "flow/serialize.h" #include "flow/serialize.h"
#include "fdbclient/BlobGranuleFiles.h" #include "fdbclient/BlobGranuleFiles.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/SystemData.h" // for allKeys unit test - could remove #include "fdbclient/SystemData.h" // for allKeys unit test - could remove
#include "flow/UnitTest.h" #include "flow/UnitTest.h"
@ -225,50 +228,85 @@ RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
return ret; return ret;
} }
struct GranuleLoadIds {
Optional<int64_t> snapshotId;
std::vector<int64_t> deltaIds;
};
static void startLoad(const ReadBlobGranuleContext granuleContext,
const BlobGranuleChunkRef& chunk,
GranuleLoadIds& loadIds) {
// Start load process for all files in chunk
if (chunk.snapshotFile.present()) {
std::string snapshotFname = chunk.snapshotFile.get().filename.toString();
// FIXME: full file length won't always be length of read
loadIds.snapshotId = granuleContext.start_load_f(snapshotFname.c_str(),
snapshotFname.size(),
chunk.snapshotFile.get().offset,
chunk.snapshotFile.get().length,
chunk.snapshotFile.get().length,
granuleContext.userContext);
}
loadIds.deltaIds.reserve(chunk.deltaFiles.size());
for (int deltaFileIdx = 0; deltaFileIdx < chunk.deltaFiles.size(); deltaFileIdx++) {
std::string deltaFName = chunk.deltaFiles[deltaFileIdx].filename.toString();
int64_t deltaLoadId = granuleContext.start_load_f(deltaFName.c_str(),
deltaFName.size(),
chunk.deltaFiles[deltaFileIdx].offset,
chunk.deltaFiles[deltaFileIdx].length,
chunk.deltaFiles[deltaFileIdx].length,
granuleContext.userContext);
loadIds.deltaIds.push_back(deltaLoadId);
}
}
ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<BlobGranuleChunkRef>>& files, ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<BlobGranuleChunkRef>>& files,
const KeyRangeRef& keyRange, const KeyRangeRef& keyRange,
Version beginVersion, Version beginVersion,
Version readVersion, Version readVersion,
ReadBlobGranuleContext granuleContext) { ReadBlobGranuleContext granuleContext) {
int64_t parallelism = granuleContext.granuleParallelism;
if (parallelism < 1) {
parallelism = 1;
}
if (parallelism >= CLIENT_KNOBS->BG_MAX_GRANULE_PARALLELISM) {
parallelism = CLIENT_KNOBS->BG_MAX_GRANULE_PARALLELISM;
}
GranuleLoadIds loadIds[files.size()];
// Kick off first file reads if parallelism > 1
for (int i = 0; i < parallelism - 1 && i < files.size(); i++) {
startLoad(granuleContext, files[i], loadIds[i]);
}
try { try {
RangeResult results; RangeResult results;
// FIXME: could submit multiple chunks to start_load_f in parallel? for (int chunkIdx = 0; chunkIdx < files.size(); chunkIdx++) {
for (const BlobGranuleChunkRef& chunk : files) { // Kick off files for this granule if parallelism == 1, or future granule if parallelism > 1
if (chunkIdx + parallelism - 1 < files.size()) {
startLoad(granuleContext, files[chunkIdx + parallelism - 1], loadIds[chunkIdx + parallelism - 1]);
}
RangeResult chunkRows; RangeResult chunkRows;
int64_t snapshotLoadId;
int64_t deltaLoadIds[chunk.deltaFiles.size()];
// Start load process for all files in chunk
// In V1 of api snapshot is required, optional is just for forward compatibility
ASSERT(chunk.snapshotFile.present());
std::string snapshotFname = chunk.snapshotFile.get().filename.toString();
snapshotLoadId = granuleContext.start_load_f(snapshotFname.c_str(),
snapshotFname.size(),
chunk.snapshotFile.get().offset,
chunk.snapshotFile.get().length,
granuleContext.userContext);
int64_t deltaLoadLengths[chunk.deltaFiles.size()];
StringRef deltaData[chunk.deltaFiles.size()];
for (int deltaFileIdx = 0; deltaFileIdx < chunk.deltaFiles.size(); deltaFileIdx++) {
std::string deltaFName = chunk.deltaFiles[deltaFileIdx].filename.toString();
deltaLoadIds[deltaFileIdx] = granuleContext.start_load_f(deltaFName.c_str(),
deltaFName.size(),
chunk.deltaFiles[deltaFileIdx].offset,
chunk.deltaFiles[deltaFileIdx].length,
granuleContext.userContext);
deltaLoadLengths[deltaFileIdx] = chunk.deltaFiles[deltaFileIdx].length;
}
// once all loads kicked off, load data for chunk // once all loads kicked off, load data for chunk
StringRef snapshotData(granuleContext.get_load_f(snapshotLoadId, granuleContext.userContext), Optional<StringRef> snapshotData;
chunk.snapshotFile.get().length); if (files[chunkIdx].snapshotFile.present()) {
if (!snapshotData.begin()) { snapshotData =
return ErrorOr<RangeResult>(blob_granule_file_load_error()); StringRef(granuleContext.get_load_f(loadIds[chunkIdx].snapshotId.get(), granuleContext.userContext),
files[chunkIdx].snapshotFile.get().length);
if (!snapshotData.get().begin()) {
return ErrorOr<RangeResult>(blob_granule_file_load_error());
}
} }
for (int i = 0; i < chunk.deltaFiles.size(); i++) {
deltaData[i] = StringRef(granuleContext.get_load_f(deltaLoadIds[i], granuleContext.userContext), StringRef deltaData[files[chunkIdx].deltaFiles.size()];
chunk.deltaFiles[i].length); for (int i = 0; i < files[chunkIdx].deltaFiles.size(); i++) {
deltaData[i] =
StringRef(granuleContext.get_load_f(loadIds[chunkIdx].deltaIds[i], granuleContext.userContext),
files[chunkIdx].deltaFiles[i].length);
// null data is error // null data is error
if (!deltaData[i].begin()) { if (!deltaData[i].begin()) {
return ErrorOr<RangeResult>(blob_granule_file_load_error()); return ErrorOr<RangeResult>(blob_granule_file_load_error());
@ -276,14 +314,17 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
} }
// materialize rows from chunk // materialize rows from chunk
chunkRows = materializeBlobGranule(chunk, keyRange, beginVersion, readVersion, snapshotData, deltaData); chunkRows =
materializeBlobGranule(files[chunkIdx], keyRange, beginVersion, readVersion, snapshotData, deltaData);
results.arena().dependsOn(chunkRows.arena()); results.arena().dependsOn(chunkRows.arena());
results.append(results.arena(), chunkRows.begin(), chunkRows.size()); results.append(results.arena(), chunkRows.begin(), chunkRows.size());
granuleContext.free_load_f(snapshotLoadId, granuleContext.userContext); if (loadIds[chunkIdx].snapshotId.present()) {
for (int i = 0; i < chunk.deltaFiles.size(); i++) { granuleContext.free_load_f(loadIds[chunkIdx].snapshotId.get(), granuleContext.userContext);
granuleContext.free_load_f(deltaLoadIds[i], granuleContext.userContext); }
for (int i = 0; i < loadIds[chunkIdx].deltaIds.size(); i++) {
granuleContext.free_load_f(loadIds[chunkIdx].deltaIds[i], granuleContext.userContext);
} }
} }
return ErrorOr<RangeResult>(results); return ErrorOr<RangeResult>(results);

View File

@ -205,6 +205,17 @@ if(BUILD_AZURE_BACKUP)
) )
endif() endif()
if(WITH_AWS_BACKUP)
add_compile_definitions(BUILD_AWS_BACKUP)
set(FDBCLIENT_SRCS
${FDBCLIENT_SRCS}
FDBAWSCredentialsProvider.h)
include(awssdk)
endif()
add_flow_target(STATIC_LIBRARY NAME fdbclient SRCS ${FDBCLIENT_SRCS} ADDL_SRCS ${options_srcs}) add_flow_target(STATIC_LIBRARY NAME fdbclient SRCS ${FDBCLIENT_SRCS} ADDL_SRCS ${options_srcs})
add_dependencies(fdbclient fdboptions) add_dependencies(fdbclient fdboptions)
target_link_libraries(fdbclient PUBLIC fdbrpc msgpack) target_link_libraries(fdbclient PUBLIC fdbrpc msgpack)
@ -224,3 +235,8 @@ if(BUILD_AZURE_BACKUP)
target_link_libraries(fdbclient PRIVATE curl uuid azure-storage-lite) target_link_libraries(fdbclient PRIVATE curl uuid azure-storage-lite)
target_link_libraries(fdbclient_sampling PRIVATE curl uuid azure-storage-lite) target_link_libraries(fdbclient_sampling PRIVATE curl uuid azure-storage-lite)
endif() endif()
if(BUILD_AWS_BACKUP)
target_link_libraries(fdbclient PUBLIC awssdk_target)
target_link_libraries(fdbclient_sampling PUBLIC awssdk_target)
endif()

View File

@ -50,6 +50,7 @@ void ClientKnobs::initialize(Randomize randomize) {
init( MAX_GENERATIONS_OVERRIDE, 0 ); init( MAX_GENERATIONS_OVERRIDE, 0 );
init( MAX_GENERATIONS_SIM, 50 ); //Disable network connections after this many generations in simulation, should be less than RECOVERY_DELAY_START_GENERATION init( MAX_GENERATIONS_SIM, 50 ); //Disable network connections after this many generations in simulation, should be less than RECOVERY_DELAY_START_GENERATION
init( COORDINATOR_HOSTNAME_RESOLVE_DELAY, 0.05 );
init( COORDINATOR_RECONNECTION_DELAY, 1.0 ); init( COORDINATOR_RECONNECTION_DELAY, 1.0 );
init( CLIENT_EXAMPLE_AMOUNT, 20 ); init( CLIENT_EXAMPLE_AMOUNT, 20 );
init( MAX_CLIENT_STATUS_AGE, 1.0 ); init( MAX_CLIENT_STATUS_AGE, 1.0 );
@ -280,6 +281,9 @@ void ClientKnobs::initialize(Randomize randomize) {
init( MVC_CLIENTLIB_CHUNK_SIZE, 8*1024 ); init( MVC_CLIENTLIB_CHUNK_SIZE, 8*1024 );
init( MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION, 32 ); init( MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION, 32 );
// Blob granules
init( BG_MAX_GRANULE_PARALLELISM, 10 );
// clang-format on // clang-format on
} }

View File

@ -49,6 +49,7 @@ public:
double MAX_GENERATIONS_OVERRIDE; double MAX_GENERATIONS_OVERRIDE;
double MAX_GENERATIONS_SIM; double MAX_GENERATIONS_SIM;
double COORDINATOR_HOSTNAME_RESOLVE_DELAY;
double COORDINATOR_RECONNECTION_DELAY; double COORDINATOR_RECONNECTION_DELAY;
int CLIENT_EXAMPLE_AMOUNT; int CLIENT_EXAMPLE_AMOUNT;
double MAX_CLIENT_STATUS_AGE; double MAX_CLIENT_STATUS_AGE;
@ -272,6 +273,9 @@ public:
int MVC_CLIENTLIB_CHUNK_SIZE; int MVC_CLIENTLIB_CHUNK_SIZE;
int MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION; int MVC_CLIENTLIB_CHUNKS_PER_TRANSACTION;
// Blob Granules
int BG_MAX_GRANULE_PARALLELISM;
ClientKnobs(Randomize randomize); ClientKnobs(Randomize randomize);
void initialize(Randomize randomize); void initialize(Randomize randomize);
}; };

View File

@ -514,7 +514,7 @@ public:
Counter transactionGrvTimedOutBatches; Counter transactionGrvTimedOutBatches;
ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, mutationsPerCommit, ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, mutationsPerCommit,
bytesPerCommit; bytesPerCommit, bgLatencies, bgGranulesPerRequest;
int outstandingWatches; int outstandingWatches;
int maxOutstandingWatches; int maxOutstandingWatches;
@ -538,6 +538,7 @@ public:
bool transactionTracingSample; bool transactionTracingSample;
double verifyCausalReadsProp = 0.0; double verifyCausalReadsProp = 0.0;
bool blobGranuleNoMaterialize = false; bool blobGranuleNoMaterialize = false;
bool anyBlobGranuleRequests = false;
Future<Void> logger; Future<Void> logger;
Future<Void> throttleExpirer; Future<Void> throttleExpirer;

View File

@ -0,0 +1,47 @@
/*
* FDBAWSCredentialsProvider.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if (!defined FDB_AWS_CREDENTIALS_PROVIDER_H) && (defined BUILD_AWS_BACKUP)
#define FDB_AWS_CREDENTIALS_PROVIDER_H
#pragma once
#include "aws/core/Aws.h"
#include "aws/core/auth/AWSCredentialsProviderChain.h"
// Singleton
namespace FDBAWSCredentialsProvider {
bool doneInit = false;
// You're supposed to call AWS::ShutdownAPI(options); once done
// But we want this to live for the lifetime of the process, so we don't do that
static Aws::Auth::AWSCredentials getAwsCredentials() {
if (!doneInit) {
doneInit = true;
Aws::SDKOptions options;
Aws::InitAPI(options);
TraceEvent("AWSSDKInitSuccessful");
}
Aws::Auth::DefaultAWSCredentialsProviderChain credProvider;
Aws::Auth::AWSCredentials creds = credProvider.GetAWSCredentials();
return creds;
}
} // namespace FDBAWSCredentialsProvider
#endif

View File

@ -1342,7 +1342,12 @@ struct ReadBlobGranuleContext {
void* userContext; void* userContext;
// Returns a unique id for the load. Asynchronous to support queueing multiple in parallel. // Returns a unique id for the load. Asynchronous to support queueing multiple in parallel.
int64_t (*start_load_f)(const char* filename, int filenameLength, int64_t offset, int64_t length, void* context); int64_t (*start_load_f)(const char* filename,
int filenameLength,
int64_t offset,
int64_t length,
int64_t fullFileLength,
void* context);
// Returns data for the load. Pass the loadId returned by start_load_f // Returns data for the load. Pass the loadId returned by start_load_f
uint8_t* (*get_load_f)(int64_t loadId, void* context); uint8_t* (*get_load_f)(int64_t loadId, void* context);
@ -1353,6 +1358,9 @@ struct ReadBlobGranuleContext {
// Set this to true for testing if you don't want to read the granule files, // Set this to true for testing if you don't want to read the granule files,
// just do the request to the blob workers // just do the request to the blob workers
bool debugNoMaterialize; bool debugNoMaterialize;
// number of granules to load in parallel (default 1)
int granuleParallelism = 1;
}; };
// Store metadata associated with each storage server. Now it only contains data be used in perpetual storage wiggle. // Store metadata associated with each storage server. Now it only contains data be used in perpetual storage wiggle.

View File

@ -169,7 +169,7 @@ void ClusterConnectionString::resolveHostnamesBlocking() {
} }
void ClusterConnectionString::resetToUnresolved() { void ClusterConnectionString::resetToUnresolved() {
if (hostnames.size() > 0) { if (status == RESOLVED && hostnames.size() > 0) {
coords.clear(); coords.clear();
hostnames.clear(); hostnames.clear();
networkAddressToHostname.clear(); networkAddressToHostname.clear();
@ -558,8 +558,8 @@ ACTOR Future<Void> monitorNominee(Key key,
.detail("Hostname", hostname.present() ? hostname.get().toString() : "UnknownHostname") .detail("Hostname", hostname.present() ? hostname.get().toString() : "UnknownHostname")
.detail("OldAddr", coord.getLeader.getEndpoint().getPrimaryAddress().toString()); .detail("OldAddr", coord.getLeader.getEndpoint().getPrimaryAddress().toString());
if (rep.getError().code() == error_code_request_maybe_delivered) { if (rep.getError().code() == error_code_request_maybe_delivered) {
// 50 milliseconds delay to prevent tight resolving loop due to outdated DNS cache // Delay to prevent tight resolving loop due to outdated DNS cache
wait(delay(0.05)); wait(delay(CLIENT_KNOBS->COORDINATOR_HOSTNAME_RESOLVE_DELAY));
throw coordinators_changed(); throw coordinators_changed();
} else { } else {
throw rep.getError(); throw rep.getError();
@ -589,7 +589,6 @@ ACTOR Future<Void> monitorNominee(Key key,
if (li.present() && li.get().forward) if (li.present() && li.get().forward)
wait(Future<Void>(Never())); wait(Future<Void>(Never()));
wait(Future<Void>(Void()));
} }
} }
} }

View File

@ -282,8 +282,9 @@ ThreadResult<RangeResult> DLTransaction::readBlobGranules(const KeyRangeRef& key
context.get_load_f = granuleContext.get_load_f; context.get_load_f = granuleContext.get_load_f;
context.free_load_f = granuleContext.free_load_f; context.free_load_f = granuleContext.free_load_f;
context.debugNoMaterialize = granuleContext.debugNoMaterialize; context.debugNoMaterialize = granuleContext.debugNoMaterialize;
context.granuleParallelism = granuleContext.granuleParallelism;
int64_t rv = readVersion.present() ? readVersion.get() : invalidVersion; int64_t rv = readVersion.present() ? readVersion.get() : latestVersion;
FdbCApi::FDBResult* r = api->transactionReadBlobGranules(tr, FdbCApi::FDBResult* r = api->transactionReadBlobGranules(tr,
keyRange.begin.begin(), keyRange.begin.begin(),

View File

@ -95,8 +95,12 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
void* userContext; void* userContext;
// Returns a unique id for the load. Asynchronous to support queueing multiple in parallel. // Returns a unique id for the load. Asynchronous to support queueing multiple in parallel.
int64_t ( int64_t (*start_load_f)(const char* filename,
*start_load_f)(const char* filename, int filenameLength, int64_t offset, int64_t length, void* context); int filenameLength,
int64_t offset,
int64_t length,
int64_t fullFileLength,
void* context);
// Returns data for the load. Pass the loadId returned by start_load_f // Returns data for the load. Pass the loadId returned by start_load_f
uint8_t* (*get_load_f)(int64_t loadId, void* context); uint8_t* (*get_load_f)(int64_t loadId, void* context);
@ -107,6 +111,9 @@ struct FdbCApi : public ThreadSafeReferenceCounted<FdbCApi> {
// set this to true for testing if you don't want to read the granule files, just // set this to true for testing if you don't want to read the granule files, just
// do the request to the blob workers // do the request to the blob workers
fdb_bool_t debugNoMaterialize; fdb_bool_t debugNoMaterialize;
// number of granules to load in parallel (default 1)
int granuleParallelism;
} FDBReadBlobGranuleContext; } FDBReadBlobGranuleContext;
typedef void (*FDBCallback)(FDBFuture* future, void* callback_parameter); typedef void (*FDBCallback)(FDBFuture* future, void* callback_parameter);

View File

@ -533,6 +533,14 @@ ACTOR Future<Void> databaseLogger(DatabaseContext* cx) {
.detail("MedianBytesPerCommit", cx->bytesPerCommit.median()) .detail("MedianBytesPerCommit", cx->bytesPerCommit.median())
.detail("MaxBytesPerCommit", cx->bytesPerCommit.max()) .detail("MaxBytesPerCommit", cx->bytesPerCommit.max())
.detail("NumLocalityCacheEntries", cx->locationCache.size()); .detail("NumLocalityCacheEntries", cx->locationCache.size());
if (cx->anyBlobGranuleRequests) {
ev.detail("MeanBGLatency", cx->bgLatencies.mean())
.detail("MedianBGLatency", cx->bgLatencies.median())
.detail("MaxBGLatency", cx->bgLatencies.max())
.detail("MeanBGGranulesPerRequest", cx->bgGranulesPerRequest.mean())
.detail("MedianBGGranulesPerRequest", cx->bgGranulesPerRequest.median())
.detail("MaxBGGranulesPerRequest", cx->bgGranulesPerRequest.max());
}
} }
cx->latencies.clear(); cx->latencies.clear();
@ -541,6 +549,8 @@ ACTOR Future<Void> databaseLogger(DatabaseContext* cx) {
cx->commitLatencies.clear(); cx->commitLatencies.clear();
cx->mutationsPerCommit.clear(); cx->mutationsPerCommit.clear();
cx->bytesPerCommit.clear(); cx->bytesPerCommit.clear();
cx->bgLatencies.clear();
cx->bgGranulesPerRequest.clear();
lastLogged = now(); lastLogged = now();
} }
@ -1353,11 +1363,11 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc), transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc), transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc),
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000),
bytesPerCommit(1000), outstandingWatches(0), lastGrvTime(0.0), cachedReadVersion(0), lastRkBatchThrottleTime(0.0), bytesPerCommit(1000), bgLatencies(1000), bgGranulesPerRequest(1000), outstandingWatches(0), lastGrvTime(0.0),
lastRkDefaultThrottleTime(0.0), lastProxyRequestTime(0.0), transactionTracingSample(false), taskID(taskID), cachedReadVersion(0), lastRkBatchThrottleTime(0.0), lastRkDefaultThrottleTime(0.0), lastProxyRequestTime(0.0),
clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), coordinator(coordinator), apiVersion(apiVersion), transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor),
mvCacheInsertLocation(0), healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0), coordinator(coordinator), apiVersion(apiVersion), mvCacheInsertLocation(0), healthMetricsLastUpdated(0),
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT), detailedHealthMetricsLastUpdated(0), smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
specialKeySpace(std::make_unique<SpecialKeySpace>(specialKeys.begin, specialKeys.end, /* test */ false)), specialKeySpace(std::make_unique<SpecialKeySpace>(specialKeys.begin, specialKeys.end, /* test */ false)),
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) { connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {
dbId = deterministicRandom()->randomUniqueID(); dbId = deterministicRandom()->randomUniqueID();
@ -1619,7 +1629,8 @@ DatabaseContext::DatabaseContext(const Error& err)
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc), transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc), transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc),
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000),
bytesPerCommit(1000), transactionTracingSample(false), smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT), bytesPerCommit(1000), bgLatencies(1000), bgGranulesPerRequest(1000), transactionTracingSample(false),
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {} connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {}
// Static constructor used by server processes to create a DatabaseContext // Static constructor used by server processes to create a DatabaseContext
@ -7340,6 +7351,7 @@ ACTOR Future<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranulesActor(
state Version rv; state Version rv;
state Standalone<VectorRef<BlobGranuleChunkRef>> results; state Standalone<VectorRef<BlobGranuleChunkRef>> results;
state double startTime = now();
if (read.present()) { if (read.present()) {
rv = read.get(); rv = read.get();
@ -7515,6 +7527,11 @@ ACTOR Future<Standalone<VectorRef<BlobGranuleChunkRef>>> readBlobGranulesActor(
throw e; throw e;
} }
} }
self->trState->cx->anyBlobGranuleRequests = true;
self->trState->cx->bgGranulesPerRequest.addSample(results.size());
self->trState->cx->bgLatencies.addSample(now() - startTime);
if (readVersionOut != nullptr) { if (readVersionOut != nullptr) {
*readVersionOut = rv; *readVersionOut = rv;
} }
@ -8690,11 +8707,24 @@ ACTOR Future<Void> singleChangeFeedStreamInternal(KeyRange range,
results->lastReturnedVersion.set(feedReply.mutations.back().version); results->lastReturnedVersion.set(feedReply.mutations.back().version);
} }
if (refresh.canBeSet() && !atLatest && feedReply.atLatestVersion) { if (!refresh.canBeSet()) {
try {
// refresh is set if and only if this actor is cancelled
wait(Future<Void>(Void()));
// Catch any unexpected behavior if the above contract is broken
ASSERT(false);
} catch (Error& e) {
ASSERT(e.code() == error_code_actor_cancelled);
throw;
}
}
if (!atLatest && feedReply.atLatestVersion) {
atLatest = true; atLatest = true;
results->notAtLatest.set(0); results->notAtLatest.set(0);
} }
if (refresh.canBeSet() && feedReply.minStreamVersion > results->storageData[0]->version.get()) {
if (feedReply.minStreamVersion > results->storageData[0]->version.get()) {
results->storageData[0]->version.set(feedReply.minStreamVersion); results->storageData[0]->version.set(feedReply.minStreamVersion);
} }
} }

View File

@ -34,6 +34,8 @@
#include "fdbrpc/IAsyncFile.h" #include "fdbrpc/IAsyncFile.h"
#include "flow/UnitTest.h" #include "flow/UnitTest.h"
#include "fdbclient/rapidxml/rapidxml.hpp" #include "fdbclient/rapidxml/rapidxml.hpp"
#include "fdbclient/FDBAWSCredentialsProvider.h"
#include "flow/actorcompiler.h" // has to be last include #include "flow/actorcompiler.h" // has to be last include
using namespace rapidxml; using namespace rapidxml;
@ -82,6 +84,7 @@ S3BlobStoreEndpoint::BlobKnobs::BlobKnobs() {
read_cache_blocks_per_file = CLIENT_KNOBS->BLOBSTORE_READ_CACHE_BLOCKS_PER_FILE; read_cache_blocks_per_file = CLIENT_KNOBS->BLOBSTORE_READ_CACHE_BLOCKS_PER_FILE;
max_send_bytes_per_second = CLIENT_KNOBS->BLOBSTORE_MAX_SEND_BYTES_PER_SECOND; max_send_bytes_per_second = CLIENT_KNOBS->BLOBSTORE_MAX_SEND_BYTES_PER_SECOND;
max_recv_bytes_per_second = CLIENT_KNOBS->BLOBSTORE_MAX_RECV_BYTES_PER_SECOND; max_recv_bytes_per_second = CLIENT_KNOBS->BLOBSTORE_MAX_RECV_BYTES_PER_SECOND;
sdk_auth = false;
} }
bool S3BlobStoreEndpoint::BlobKnobs::set(StringRef name, int value) { bool S3BlobStoreEndpoint::BlobKnobs::set(StringRef name, int value) {
@ -118,6 +121,7 @@ bool S3BlobStoreEndpoint::BlobKnobs::set(StringRef name, int value) {
TRY_PARAM(read_cache_blocks_per_file, rcb); TRY_PARAM(read_cache_blocks_per_file, rcb);
TRY_PARAM(max_send_bytes_per_second, sbps); TRY_PARAM(max_send_bytes_per_second, sbps);
TRY_PARAM(max_recv_bytes_per_second, rbps); TRY_PARAM(max_recv_bytes_per_second, rbps);
TRY_PARAM(sdk_auth, sa);
#undef TRY_PARAM #undef TRY_PARAM
return false; return false;
} }
@ -506,7 +510,38 @@ ACTOR Future<Optional<json_spirit::mObject>> tryReadJSONFile(std::string path) {
return Optional<json_spirit::mObject>(); return Optional<json_spirit::mObject>();
} }
// If the credentials expire, the connection will eventually fail and be discarded from the pool, and then a new
// connection will be constructed, which will call this again to get updated credentials
static S3BlobStoreEndpoint::Credentials getSecretSdk() {
#ifdef BUILD_AWS_BACKUP
double elapsed = -timer_monotonic();
Aws::Auth::AWSCredentials awsCreds = FDBAWSCredentialsProvider::getAwsCredentials();
elapsed += timer_monotonic();
if (awsCreds.IsEmpty()) {
TraceEvent(SevWarn, "S3BlobStoreAWSCredsEmpty");
throw backup_auth_missing();
}
S3BlobStoreEndpoint::Credentials fdbCreds;
fdbCreds.key = awsCreds.GetAWSAccessKeyId();
fdbCreds.secret = awsCreds.GetAWSSecretKey();
fdbCreds.securityToken = awsCreds.GetSessionToken();
TraceEvent("S3BlobStoreGotSdkCredentials").suppressFor(60).detail("Duration", elapsed);
return fdbCreds;
#else
TraceEvent(SevError, "S3BlobStoreNoSDK");
throw backup_auth_missing();
#endif
}
ACTOR Future<Void> updateSecret_impl(Reference<S3BlobStoreEndpoint> b) { ACTOR Future<Void> updateSecret_impl(Reference<S3BlobStoreEndpoint> b) {
if (b->knobs.sdk_auth) {
b->credentials = getSecretSdk();
return Void();
}
std::vector<std::string>* pFiles = (std::vector<std::string>*)g_network->global(INetwork::enBlobCredentialFiles); std::vector<std::string>* pFiles = (std::vector<std::string>*)g_network->global(INetwork::enBlobCredentialFiles);
if (pFiles == nullptr) if (pFiles == nullptr)
return Void(); return Void();
@ -601,7 +636,7 @@ ACTOR Future<S3BlobStoreEndpoint::ReusableConnection> connect_impl(Reference<S3B
.detail("RemoteEndpoint", conn->getPeerAddress()) .detail("RemoteEndpoint", conn->getPeerAddress())
.detail("ExpiresIn", b->knobs.max_connection_life); .detail("ExpiresIn", b->knobs.max_connection_life);
if (b->lookupKey || b->lookupSecret) if (b->lookupKey || b->lookupSecret || b->knobs.sdk_auth)
wait(b->updateSecret()); wait(b->updateSecret());
return S3BlobStoreEndpoint::ReusableConnection({ conn, now() + b->knobs.max_connection_life }); return S3BlobStoreEndpoint::ReusableConnection({ conn, now() + b->knobs.max_connection_life });

View File

@ -59,7 +59,7 @@ public:
delete_requests_per_second, multipart_max_part_size, multipart_min_part_size, concurrent_requests, delete_requests_per_second, multipart_max_part_size, multipart_min_part_size, concurrent_requests,
concurrent_uploads, concurrent_lists, concurrent_reads_per_file, concurrent_writes_per_file, concurrent_uploads, concurrent_lists, concurrent_reads_per_file, concurrent_writes_per_file,
read_block_size, read_ahead_blocks, read_cache_blocks_per_file, max_send_bytes_per_second, read_block_size, read_ahead_blocks, read_cache_blocks_per_file, max_send_bytes_per_second,
max_recv_bytes_per_second; max_recv_bytes_per_second, sdk_auth;
bool set(StringRef name, int value); bool set(StringRef name, int value);
std::string getURLParameters() const; std::string getURLParameters() const;
static std::vector<std::string> getKnobDescriptions() { static std::vector<std::string> getKnobDescriptions() {
@ -91,7 +91,9 @@ public:
"read_cache_blocks_per_file (or rcb) Size of the read cache for a file in blocks.", "read_cache_blocks_per_file (or rcb) Size of the read cache for a file in blocks.",
"max_send_bytes_per_second (or sbps) Max send bytes per second for all requests combined.", "max_send_bytes_per_second (or sbps) Max send bytes per second for all requests combined.",
"max_recv_bytes_per_second (or rbps) Max receive bytes per second for all requests combined (NOT YET " "max_recv_bytes_per_second (or rbps) Max receive bytes per second for all requests combined (NOT YET "
"USED)." "USED).",
"sdk_auth (or sa) Use AWS SDK to resolve credentials. Only valid if "
"BUILD_AWS_BACKUP is enabled."
}; };
} }
}; };

View File

@ -2708,16 +2708,23 @@ Future<Optional<std::string>> FailedLocalitiesRangeImpl::commit(ReadYourWritesTr
} }
ACTOR Future<RangeResult> getTenantList(ReadYourWritesTransaction* ryw, KeyRangeRef kr, GetRangeLimits limitsHint) { ACTOR Future<RangeResult> getTenantList(ReadYourWritesTransaction* ryw, KeyRangeRef kr, GetRangeLimits limitsHint) {
KeyRangeRef tenantRange =
kr.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin)
.removePrefix(TenantMapRangeImpl::submoduleRange.begin);
state KeyRef managementPrefix = state KeyRef managementPrefix =
kr.begin.substr(0, kr.begin.substr(0,
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin.size() + SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin.size() +
TenantMapRangeImpl::submoduleRange.begin.size()); TenantMapRangeImpl::submoduleRange.begin.size());
std::map<TenantName, TenantMapEntry> tenants = wait(ManagementAPI::listTenantsTransaction( kr = kr.removePrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin);
&ryw->getTransaction(), tenantRange.begin, tenantRange.end, limitsHint.rows)); TenantNameRef beginTenant = kr.begin.removePrefix(TenantMapRangeImpl::submoduleRange.begin);
TenantNameRef endTenant = kr.end;
if (endTenant.startsWith(TenantMapRangeImpl::submoduleRange.begin)) {
endTenant = endTenant.removePrefix(TenantMapRangeImpl::submoduleRange.begin);
} else {
endTenant = "\xff"_sr;
}
std::map<TenantName, TenantMapEntry> tenants =
wait(ManagementAPI::listTenantsTransaction(&ryw->getTransaction(), beginTenant, endTenant, limitsHint.rows));
RangeResult results; RangeResult results;
for (auto tenant : tenants) { for (auto tenant : tenants) {
@ -2787,7 +2794,7 @@ Future<Optional<std::string>> TenantMapRangeImpl::commit(ReadYourWritesTransacti
TenantNameRef endTenant = range.end().removePrefix( TenantNameRef endTenant = range.end().removePrefix(
SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin); SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::MANAGEMENT).begin);
if (endTenant.startsWith(submoduleRange.begin)) { if (endTenant.startsWith(submoduleRange.begin)) {
endTenant = endTenant.removePrefix(submoduleRange.end); endTenant = endTenant.removePrefix(submoduleRange.begin);
} else { } else {
endTenant = "\xff"_sr; endTenant = "\xff"_sr;
} }

View File

@ -319,9 +319,6 @@ ThreadResult<RangeResult> ThreadSafeTransaction::readBlobGranules(const KeyRange
Version beginVersion, Version beginVersion,
Optional<Version> readVersion, Optional<Version> readVersion,
ReadBlobGranuleContext granule_context) { ReadBlobGranuleContext granule_context) {
// In V1 of api this is required, field is just for forward compatibility
ASSERT(beginVersion == 0);
// FIXME: prevent from calling this from another main thread! // FIXME: prevent from calling this from another main thread!
ISingleThreadTransaction* tr = this->tr; ISingleThreadTransaction* tr = this->tr;

View File

@ -52,4 +52,7 @@ enum WellKnownEndpoints {
WLTOKEN_RESERVED_COUNT // 23 WLTOKEN_RESERVED_COUNT // 23
}; };
static_assert(WLTOKEN_PROTOCOL_INFO ==
10); // Enforce that the value of this endpoint does not change per comment above.
#endif #endif

View File

@ -10,6 +10,16 @@ endif()
# as soon as we get rid of the old build system # as soon as we get rid of the old build system
target_link_libraries(fdbmonitor PUBLIC Threads::Threads) target_link_libraries(fdbmonitor PUBLIC Threads::Threads)
# We don't compile fdbmonitor with thread sanitizer instrumentation, since this
# appears to change its behavior (it no longer seems to restart killed
# processes). fdbmonitor is single-threaded anyway.
get_target_property(fdbmonitor_options fdbmonitor COMPILE_OPTIONS)
list(REMOVE_ITEM fdbmonitor_options "-fsanitize=thread")
set_property(TARGET fdbmonitor PROPERTY COMPILE_OPTIONS ${target_options})
get_target_property(fdbmonitor_options fdbmonitor LINK_OPTIONS)
list(REMOVE_ITEM fdbmonitor_options "-fsanitize=thread")
set_property(TARGET fdbmonitor PROPERTY LINK_OPTIONS ${target_options})
if(GENERATE_DEBUG_PACKAGES) if(GENERATE_DEBUG_PACKAGES)
fdb_install(TARGETS fdbmonitor DESTINATION fdbmonitor COMPONENT server) fdb_install(TARGETS fdbmonitor DESTINATION fdbmonitor COMPONENT server)
else() else()

View File

@ -29,14 +29,7 @@ static std::map<NetworkAddress, std::pair<Reference<EvictablePageCache>, Referen
EvictablePage::~EvictablePage() { EvictablePage::~EvictablePage() {
if (data) { if (data) {
#if defined(USE_JEMALLOC) freeFast4kAligned(pageCache->pageSize, data);
aligned_free(data);
#else
if (pageCache->pageSize == 4096)
FastAllocator<4096>::release(data);
else
aligned_free(data);
#endif
} }
if (EvictablePageCache::RANDOM == pageCache->cacheEvictionType) { if (EvictablePageCache::RANDOM == pageCache->cacheEvictionType) {
if (index > -1) { if (index > -1) {
@ -173,14 +166,7 @@ void AsyncFileCached::releaseZeroCopy(void* data, int length, int64_t offset) {
if (o != orphanedPages.end()) { if (o != orphanedPages.end()) {
if (o->second == 1) { if (o->second == 1) {
if (data) { if (data) {
#if defined(USE_JEMALLOC) freeFast4kAligned(length, data);
aligned_free(data);
#else
if (length == 4096)
FastAllocator<4096>::release(data);
else
aligned_free(data);
#endif
} }
} else { } else {
--o->second; --o->second;

View File

@ -79,14 +79,9 @@ struct EvictablePageCache : ReferenceCounted<EvictablePageCache> {
void allocate(EvictablePage* page) { void allocate(EvictablePage* page) {
try_evict(); try_evict();
try_evict(); try_evict();
#if defined(USE_JEMALLOC)
page->data = aligned_alloc(4096, pageSize); page->data = allocateFast4kAligned(pageSize);
#else
page->data = pageSize == 4096 ? FastAllocator<4096>::allocate() : aligned_alloc(4096, pageSize);
#endif
if (page->data == nullptr) {
platform::outOfMemory();
}
if (RANDOM == cacheEvictionType) { if (RANDOM == cacheEvictionType) {
page->index = pages.size(); page->index = pages.size();
pages.push_back(page); pages.push_back(page);
@ -394,14 +389,7 @@ struct AFCPage : public EvictablePage, public FastAllocated<AFCPage> {
owner->orphanedPages[data] = zeroCopyRefCount; owner->orphanedPages[data] = zeroCopyRefCount;
zeroCopyRefCount = 0; zeroCopyRefCount = 0;
notReading = Void(); notReading = Void();
#if defined(USE_JEMALLOC) data = allocateFast4kAligned(pageCache->pageSize);
data = aligned_alloc(4096, pageCache->pageSize);
#else
data = pageCache->pageSize == 4096 ? FastAllocator<4096>::allocate() : aligned_alloc(4096, pageCache->pageSize);
#endif
if (data == nullptr) {
platform::outOfMemory();
}
} }
Future<Void> write(void const* data, int length, int offset) { Future<Void> write(void const* data, int length, int offset) {

View File

@ -2475,11 +2475,12 @@ ACTOR Future<Void> workerHealthMonitor(ClusterControllerData* self) {
} }
} }
ACTOR Future<Void> clusterControllerCore(ClusterControllerFullInterface interf, ACTOR Future<Void> clusterControllerCore(Reference<IClusterConnectionRecord> connRecord,
ClusterControllerFullInterface interf,
Future<Void> leaderFail, Future<Void> leaderFail,
ServerCoordinators coordinators,
LocalityData locality, LocalityData locality,
ConfigDBType configDBType) { ConfigDBType configDBType) {
state ServerCoordinators coordinators(connRecord);
state ClusterControllerData self(interf, locality, coordinators); state ClusterControllerData self(interf, locality, coordinators);
state ConfigBroadcaster configBroadcaster(coordinators, configDBType); state ConfigBroadcaster configBroadcaster(coordinators, configDBType);
state Future<Void> coordinationPingDelay = delay(SERVER_KNOBS->WORKER_COORDINATION_PING_DELAY); state Future<Void> coordinationPingDelay = delay(SERVER_KNOBS->WORKER_COORDINATION_PING_DELAY);
@ -2612,7 +2613,7 @@ ACTOR Future<Void> replaceInterface(ClusterControllerFullInterface interf) {
} }
} }
ACTOR Future<Void> clusterController(ServerCoordinators coordinators, ACTOR Future<Void> clusterController(Reference<IClusterConnectionRecord> connRecord,
Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC, Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> currentCC,
bool hasConnected, bool hasConnected,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo, Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo,
@ -2623,9 +2624,10 @@ ACTOR Future<Void> clusterController(ServerCoordinators coordinators,
state bool inRole = false; state bool inRole = false;
cci.initEndpoints(); cci.initEndpoints();
try { try {
wait(connRecord->resolveHostnames());
// Register as a possible leader; wait to be elected // Register as a possible leader; wait to be elected
state Future<Void> leaderFail = state Future<Void> leaderFail =
tryBecomeLeader(coordinators, cci, currentCC, hasConnected, asyncPriorityInfo); tryBecomeLeader(connRecord, cci, currentCC, hasConnected, asyncPriorityInfo);
state Future<Void> shouldReplace = replaceInterface(cci); state Future<Void> shouldReplace = replaceInterface(cci);
while (!currentCC->get().present() || currentCC->get().get() != cci) { while (!currentCC->get().present() || currentCC->get().get() != cci) {
@ -2644,7 +2646,7 @@ ACTOR Future<Void> clusterController(ServerCoordinators coordinators,
startRole(Role::CLUSTER_CONTROLLER, cci.id(), UID()); startRole(Role::CLUSTER_CONTROLLER, cci.id(), UID());
inRole = true; inRole = true;
wait(clusterControllerCore(cci, leaderFail, coordinators, locality, configDBType)); wait(clusterControllerCore(connRecord, cci, leaderFail, locality, configDBType));
} }
} catch (Error& e) { } catch (Error& e) {
if (inRole) if (inRole)
@ -2673,15 +2675,12 @@ ACTOR Future<Void> clusterController(Reference<IClusterConnectionRecord> connRec
state bool hasConnected = false; state bool hasConnected = false;
loop { loop {
try { try {
wait(connRecord->resolveHostnames()); wait(clusterController(connRecord, currentCC, hasConnected, asyncPriorityInfo, locality, configDBType));
ServerCoordinators coordinators(connRecord); hasConnected = true;
wait(clusterController(coordinators, currentCC, hasConnected, asyncPriorityInfo, locality, configDBType));
} catch (Error& e) { } catch (Error& e) {
if (e.code() != error_code_coordinators_changed) if (e.code() != error_code_coordinators_changed)
throw; // Expected to terminate fdbserver throw; // Expected to terminate fdbserver
} }
hasConnected = true;
} }
} }

View File

@ -25,28 +25,56 @@
#include "fdbclient/MonitorLeader.h" #include "fdbclient/MonitorLeader.h"
#include "flow/actorcompiler.h" // This must be the last #include. #include "flow/actorcompiler.h" // This must be the last #include.
// Keep trying to become a leader by submitting itself to all coordinators.
// Monitor the health of all coordinators at the same time.
// Note: for coordinators whose NetworkAddress is parsed out of a hostname, a connection failure will cause this actor
// to throw `coordinators_changed()` error
ACTOR Future<Void> submitCandidacy(Key key, ACTOR Future<Void> submitCandidacy(Key key,
LeaderElectionRegInterface coord, LeaderElectionRegInterface coord,
LeaderInfo myInfo, LeaderInfo myInfo,
UID prevChangeID, UID prevChangeID,
Reference<AsyncVar<std::vector<Optional<LeaderInfo>>>> nominees, AsyncTrigger* nomineeChange,
int index) { Optional<LeaderInfo>* nominee,
Optional<Hostname> hostname = Optional<Hostname>()) {
loop { loop {
auto const& nom = nominees->get()[index]; state Optional<LeaderInfo> li;
Optional<LeaderInfo> li = wait(
retryBrokenPromise(coord.candidacy,
CandidacyRequest(key, myInfo, nom.present() ? nom.get().changeID : UID(), prevChangeID),
TaskPriority::CoordinationReply));
if (li != nominees->get()[index]) { if (coord.candidacy.getEndpoint().getPrimaryAddress().fromHostname) {
std::vector<Optional<LeaderInfo>> v = nominees->get(); state ErrorOr<Optional<LeaderInfo>> rep = wait(coord.candidacy.tryGetReply(
v[index] = li; CandidacyRequest(key, myInfo, nominee->present() ? nominee->get().changeID : UID(), prevChangeID),
nominees->set(v); TaskPriority::CoordinationReply));
if (rep.isError()) {
// Connecting to nominee failed, most likely due to connection failed.
TraceEvent("SubmitCandadicyError")
.error(rep.getError())
.detail("Hostname", hostname.present() ? hostname.get().toString() : "UnknownHostname")
.detail("OldAddr", coord.candidacy.getEndpoint().getPrimaryAddress().toString());
if (rep.getError().code() == error_code_request_maybe_delivered) {
// Delay to prevent tight resolving loop due to outdated DNS cache
wait(delay(CLIENT_KNOBS->COORDINATOR_HOSTNAME_RESOLVE_DELAY));
throw coordinators_changed();
} else {
throw rep.getError();
}
} else if (rep.present()) {
li = rep.get();
}
} else {
Optional<LeaderInfo> tmp = wait(retryBrokenPromise(
coord.candidacy,
CandidacyRequest(key, myInfo, nominee->present() ? nominee->get().changeID : UID(), prevChangeID),
TaskPriority::CoordinationReply));
li = tmp;
}
wait(Future<Void>(Void())); // Make sure we weren't cancelled
if (li != *nominee) {
*nominee = li;
nomineeChange->trigger();
if (li.present() && li.get().forward) if (li.present() && li.get().forward)
wait(Future<Void>(Never())); wait(Future<Void>(Never()));
wait(Future<Void>(Void())); // Make sure we weren't cancelled
} }
} }
} }
@ -84,13 +112,14 @@ ACTOR Future<Void> changeLeaderCoordinators(ServerCoordinators coordinators, Val
return Void(); return Void();
} }
ACTOR Future<Void> tryBecomeLeaderInternal(ServerCoordinators coordinators, ACTOR Future<Void> tryBecomeLeaderInternal(Reference<IClusterConnectionRecord> connRecord,
Value proposedSerializedInterface, Value proposedSerializedInterface,
Reference<AsyncVar<Value>> outSerializedLeader, Reference<AsyncVar<Value>> outSerializedLeader,
bool hasConnected, bool hasConnected,
Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo) { Reference<AsyncVar<ClusterControllerPriorityInfo>> asyncPriorityInfo) {
state Reference<AsyncVar<std::vector<Optional<LeaderInfo>>>> nominees( state ServerCoordinators coordinators(connRecord);
new AsyncVar<std::vector<Optional<LeaderInfo>>>()); state AsyncTrigger nomineeChange;
state std::vector<Optional<LeaderInfo>> nominees;
state LeaderInfo myInfo; state LeaderInfo myInfo;
state Future<Void> candidacies; state Future<Void> candidacies;
state bool iAmLeader = false; state bool iAmLeader = false;
@ -105,8 +134,6 @@ ACTOR Future<Void> tryBecomeLeaderInternal(ServerCoordinators coordinators,
wait(delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY)); wait(delay(SERVER_KNOBS->WAIT_FOR_GOOD_RECRUITMENT_DELAY));
} }
nominees->set(std::vector<Optional<LeaderInfo>>(coordinators.clientLeaderServers.size()));
myInfo.serializedInfo = proposedSerializedInterface; myInfo.serializedInfo = proposedSerializedInterface;
outSerializedLeader->set(Value()); outSerializedLeader->set(Value());
@ -114,6 +141,9 @@ ACTOR Future<Void> tryBecomeLeaderInternal(ServerCoordinators coordinators,
(SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY) ? buggifyDelayedAsyncVar(outSerializedLeader) : Void(); (SERVER_KNOBS->BUGGIFY_ALL_COORDINATION || BUGGIFY) ? buggifyDelayedAsyncVar(outSerializedLeader) : Void();
while (!iAmLeader) { while (!iAmLeader) {
wait(connRecord->resolveHostnames());
coordinators = ServerCoordinators(connRecord);
nominees.resize(coordinators.leaderElectionServers.size());
state Future<Void> badCandidateTimeout; state Future<Void> badCandidateTimeout;
myInfo.changeID = deterministicRandom()->randomUniqueID(); myInfo.changeID = deterministicRandom()->randomUniqueID();
@ -122,13 +152,25 @@ ACTOR Future<Void> tryBecomeLeaderInternal(ServerCoordinators coordinators,
std::vector<Future<Void>> cand; std::vector<Future<Void>> cand;
cand.reserve(coordinators.leaderElectionServers.size()); cand.reserve(coordinators.leaderElectionServers.size());
for (int i = 0; i < coordinators.leaderElectionServers.size(); i++) for (int i = 0; i < coordinators.leaderElectionServers.size(); i++) {
cand.push_back(submitCandidacy( Optional<Hostname> hostname;
coordinators.clusterKey, coordinators.leaderElectionServers[i], myInfo, prevChangeID, nominees, i)); auto r = connRecord->getConnectionString().networkAddressToHostname.find(
coordinators.leaderElectionServers[i].candidacy.getEndpoint().getPrimaryAddress());
if (r != connRecord->getConnectionString().networkAddressToHostname.end()) {
hostname = r->second;
}
cand.push_back(submitCandidacy(coordinators.clusterKey,
coordinators.leaderElectionServers[i],
myInfo,
prevChangeID,
&nomineeChange,
&nominees[i],
hostname));
}
candidacies = waitForAll(cand); candidacies = waitForAll(cand);
loop { loop {
state Optional<std::pair<LeaderInfo, bool>> leader = getLeader(nominees->get()); state Optional<std::pair<LeaderInfo, bool>> leader = getLeader(nominees);
if (leader.present() && leader.get().first.forward) { if (leader.present() && leader.get().first.forward) {
// These coordinators are forwarded to another set. But before we change our own cluster file, we need // These coordinators are forwarded to another set. But before we change our own cluster file, we need
// to make sure that a majority of coordinators know that. SOMEDAY: Wait briefly to see if other // to make sure that a majority of coordinators know that. SOMEDAY: Wait briefly to see if other
@ -172,22 +214,30 @@ ACTOR Future<Void> tryBecomeLeaderInternal(ServerCoordinators coordinators,
// If more than 2*SERVER_KNOBS->POLLING_FREQUENCY elapses while we are nominated by some coordinator but // If more than 2*SERVER_KNOBS->POLLING_FREQUENCY elapses while we are nominated by some coordinator but
// there is no leader, we might be breaking the leader election process for someone with better // there is no leader, we might be breaking the leader election process for someone with better
// communications but lower ID, so change IDs. // communications but lower ID, so change IDs.
if ((!leader.present() || !leader.get().second) && if ((!leader.present() || !leader.get().second) && std::count(nominees.begin(), nominees.end(), myInfo)) {
std::count(nominees->get().begin(), nominees->get().end(), myInfo)) {
if (!badCandidateTimeout.isValid()) if (!badCandidateTimeout.isValid())
badCandidateTimeout = delay(SERVER_KNOBS->POLLING_FREQUENCY * 2, TaskPriority::CoordinationReply); badCandidateTimeout = delay(SERVER_KNOBS->POLLING_FREQUENCY * 2, TaskPriority::CoordinationReply);
} else } else
badCandidateTimeout = Future<Void>(); badCandidateTimeout = Future<Void>();
choose { try {
when(wait(nominees->onChange())) {} choose {
when(wait(badCandidateTimeout.isValid() ? badCandidateTimeout : Never())) { when(wait(nomineeChange.onTrigger())) {}
TEST(true); // Bad candidate timeout when(wait(badCandidateTimeout.isValid() ? badCandidateTimeout : Never())) {
TraceEvent("LeaderBadCandidateTimeout", myInfo.changeID).log(); TEST(true); // Bad candidate timeout
break; TraceEvent("LeaderBadCandidateTimeout", myInfo.changeID).log();
break;
}
when(wait(candidacies)) { ASSERT(false); }
when(wait(asyncPriorityInfo->onChange())) { break; }
}
} catch (Error& e) {
if (e.code() == error_code_coordinators_changed) {
connRecord->getConnectionString().resetToUnresolved();
break;
} else {
throw e;
} }
when(wait(candidacies)) { ASSERT(false); }
when(wait(asyncPriorityInfo->onChange())) { break; }
} }
} }

View File

@ -37,7 +37,7 @@ class ServerCoordinators;
// eventually be set. If the return value is cancelled, the candidacy or leadership of the proposedInterface // eventually be set. If the return value is cancelled, the candidacy or leadership of the proposedInterface
// will eventually end. // will eventually end.
template <class LeaderInterface> template <class LeaderInterface>
Future<Void> tryBecomeLeader(ServerCoordinators const& coordinators, Future<Void> tryBecomeLeader(Reference<IClusterConnectionRecord> const& connRecord,
LeaderInterface const& proposedInterface, LeaderInterface const& proposedInterface,
Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader, Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader,
bool hasConnected, bool hasConnected,
@ -50,20 +50,20 @@ Future<Void> changeLeaderCoordinators(ServerCoordinators const& coordinators, Va
#pragma region Implementation #pragma region Implementation
#endif // __INTEL_COMPILER #endif // __INTEL_COMPILER
Future<Void> tryBecomeLeaderInternal(ServerCoordinators const& coordinators, Future<Void> tryBecomeLeaderInternal(Reference<IClusterConnectionRecord> const& connRecord,
Value const& proposedSerializedInterface, Value const& proposedSerializedInterface,
Reference<AsyncVar<Value>> const& outSerializedLeader, Reference<AsyncVar<Value>> const& outSerializedLeader,
bool const& hasConnected, bool const& hasConnected,
Reference<AsyncVar<ClusterControllerPriorityInfo>> const& asyncPriorityInfo); Reference<AsyncVar<ClusterControllerPriorityInfo>> const& asyncPriorityInfo);
template <class LeaderInterface> template <class LeaderInterface>
Future<Void> tryBecomeLeader(ServerCoordinators const& coordinators, Future<Void> tryBecomeLeader(Reference<IClusterConnectionRecord> const& connRecord,
LeaderInterface const& proposedInterface, LeaderInterface const& proposedInterface,
Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader, Reference<AsyncVar<Optional<LeaderInterface>>> const& outKnownLeader,
bool hasConnected, bool hasConnected,
Reference<AsyncVar<ClusterControllerPriorityInfo>> const& asyncPriorityInfo) { Reference<AsyncVar<ClusterControllerPriorityInfo>> const& asyncPriorityInfo) {
auto serializedInfo = makeReference<AsyncVar<Value>>(); auto serializedInfo = makeReference<AsyncVar<Value>>();
Future<Void> m = tryBecomeLeaderInternal(coordinators, Future<Void> m = tryBecomeLeaderInternal(connRecord,
ObjectWriter::toValue(proposedInterface, IncludeVersion()), ObjectWriter::toValue(proposedInterface, IncludeVersion()),
serializedInfo, serializedInfo,
hasConnected, hasConnected,

View File

@ -2726,8 +2726,6 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
actors.push_back(serveProcess()); actors.push_back(serveProcess());
try { try {
wait(connRecord->resolveHostnames());
ServerCoordinators coordinators(connRecord);
if (g_network->isSimulated()) { if (g_network->isSimulated()) {
whitelistBinPaths = ",, random_path, /bin/snap_create.sh,,"; whitelistBinPaths = ",, random_path, /bin/snap_create.sh,,";
} }
@ -2745,8 +2743,8 @@ ACTOR Future<Void> fdbd(Reference<IClusterConnectionRecord> connRecord,
if (coordFolder.size()) { if (coordFolder.size()) {
// SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up // SOMEDAY: remove the fileNotFound wrapper and make DiskQueue construction safe from errors setting up
// their files // their files
actors.push_back(fileNotFoundToNever( actors.push_back(
coordinationServer(coordFolder, coordinators.ccr, configNode, configBroadcastInterface))); fileNotFoundToNever(coordinationServer(coordFolder, connRecord, configNode, configBroadcastInterface)));
} }
state UID processIDUid = wait(createAndLockProcessIdFile(dataFolder)); state UID processIDUid = wait(createAndLockProcessIdFile(dataFolder));

View File

@ -342,23 +342,23 @@ ArenaBlock* ArenaBlock::create(int dataSize, Reference<ArenaBlock>& next) {
b->bigSize = 256; b->bigSize = 256;
INSTRUMENT_ALLOCATE("Arena256"); INSTRUMENT_ALLOCATE("Arena256");
} else if (reqSize <= 512) { } else if (reqSize <= 512) {
b = (ArenaBlock*)FastAllocator<512>::allocate(); b = (ArenaBlock*)new uint8_t[512];
b->bigSize = 512; b->bigSize = 512;
INSTRUMENT_ALLOCATE("Arena512"); INSTRUMENT_ALLOCATE("Arena512");
} else if (reqSize <= 1024) { } else if (reqSize <= 1024) {
b = (ArenaBlock*)FastAllocator<1024>::allocate(); b = (ArenaBlock*)new uint8_t[1024];
b->bigSize = 1024; b->bigSize = 1024;
INSTRUMENT_ALLOCATE("Arena1024"); INSTRUMENT_ALLOCATE("Arena1024");
} else if (reqSize <= 2048) { } else if (reqSize <= 2048) {
b = (ArenaBlock*)FastAllocator<2048>::allocate(); b = (ArenaBlock*)new uint8_t[2048];
b->bigSize = 2048; b->bigSize = 2048;
INSTRUMENT_ALLOCATE("Arena2048"); INSTRUMENT_ALLOCATE("Arena2048");
} else if (reqSize <= 4096) { } else if (reqSize <= 4096) {
b = (ArenaBlock*)FastAllocator<4096>::allocate(); b = (ArenaBlock*)new uint8_t[4096];
b->bigSize = 4096; b->bigSize = 4096;
INSTRUMENT_ALLOCATE("Arena4096"); INSTRUMENT_ALLOCATE("Arena4096");
} else { } else {
b = (ArenaBlock*)FastAllocator<8192>::allocate(); b = (ArenaBlock*)new uint8_t[8192];
b->bigSize = 8192; b->bigSize = 8192;
INSTRUMENT_ALLOCATE("Arena8192"); INSTRUMENT_ALLOCATE("Arena8192");
} }
@ -460,26 +460,26 @@ void ArenaBlock::destroyLeaf() {
FastAllocator<256>::release(this); FastAllocator<256>::release(this);
INSTRUMENT_RELEASE("Arena256"); INSTRUMENT_RELEASE("Arena256");
} else if (bigSize <= 512) { } else if (bigSize <= 512) {
FastAllocator<512>::release(this); delete[] reinterpret_cast<uint8_t*>(this);
INSTRUMENT_RELEASE("Arena512"); INSTRUMENT_RELEASE("Arena512");
} else if (bigSize <= 1024) { } else if (bigSize <= 1024) {
FastAllocator<1024>::release(this); delete[] reinterpret_cast<uint8_t*>(this);
INSTRUMENT_RELEASE("Arena1024"); INSTRUMENT_RELEASE("Arena1024");
} else if (bigSize <= 2048) { } else if (bigSize <= 2048) {
FastAllocator<2048>::release(this); delete[] reinterpret_cast<uint8_t*>(this);
INSTRUMENT_RELEASE("Arena2048"); INSTRUMENT_RELEASE("Arena2048");
} else if (bigSize <= 4096) { } else if (bigSize <= 4096) {
FastAllocator<4096>::release(this); delete[] reinterpret_cast<uint8_t*>(this);
INSTRUMENT_RELEASE("Arena4096"); INSTRUMENT_RELEASE("Arena4096");
} else if (bigSize <= 8192) { } else if (bigSize <= 8192) {
FastAllocator<8192>::release(this); delete[] reinterpret_cast<uint8_t*>(this);
INSTRUMENT_RELEASE("Arena8192"); INSTRUMENT_RELEASE("Arena8192");
} else { } else {
#ifdef ALLOC_INSTRUMENTATION #ifdef ALLOC_INSTRUMENTATION
allocInstr["ArenaHugeKB"].dealloc((bigSize + 1023) >> 10); allocInstr["ArenaHugeKB"].dealloc((bigSize + 1023) >> 10);
#endif #endif
g_hugeArenaMemory.fetch_sub(bigSize); g_hugeArenaMemory.fetch_sub(bigSize);
delete[](uint8_t*) this; delete[] reinterpret_cast<uint8_t*>(this);
} }
} }
} }

View File

@ -210,13 +210,24 @@ public:
if (s != sizeof(Object)) if (s != sizeof(Object))
abort(); abort();
INSTRUMENT_ALLOCATE(typeid(Object).name()); INSTRUMENT_ALLOCATE(typeid(Object).name());
void* p = FastAllocator < sizeof(Object) <= 64 ? 64 : nextFastAllocatedSize(sizeof(Object)) > ::allocate();
return p; if constexpr (sizeof(Object) <= 256) {
void* p = FastAllocator < sizeof(Object) <= 64 ? 64 : nextFastAllocatedSize(sizeof(Object)) > ::allocate();
return p;
} else {
void* p = new uint8_t[nextFastAllocatedSize(sizeof(Object))];
return p;
}
} }
static void operator delete(void* s) { static void operator delete(void* s) {
INSTRUMENT_RELEASE(typeid(Object).name()); INSTRUMENT_RELEASE(typeid(Object).name());
FastAllocator<sizeof(Object) <= 64 ? 64 : nextFastAllocatedSize(sizeof(Object))>::release(s);
if constexpr (sizeof(Object) <= 256) {
FastAllocator<sizeof(Object) <= 64 ? 64 : nextFastAllocatedSize(sizeof(Object))>::release(s);
} else {
delete[] reinterpret_cast<uint8_t*>(s);
}
} }
// Redefine placement new so you can still use it // Redefine placement new so you can still use it
static void* operator new(size_t, void* p) { return p; } static void* operator new(size_t, void* p) { return p; }
@ -236,18 +247,6 @@ public:
return FastAllocator<128>::allocate(); return FastAllocator<128>::allocate();
if (size <= 256) if (size <= 256)
return FastAllocator<256>::allocate(); return FastAllocator<256>::allocate();
if (size <= 512)
return FastAllocator<512>::allocate();
if (size <= 1024)
return FastAllocator<1024>::allocate();
if (size <= 2048)
return FastAllocator<2048>::allocate();
if (size <= 4096)
return FastAllocator<4096>::allocate();
if (size <= 8192)
return FastAllocator<8192>::allocate();
if (size <= 16384)
return FastAllocator<16384>::allocate();
return new uint8_t[size]; return new uint8_t[size];
} }
@ -264,21 +263,11 @@ inline void freeFast(int size, void* ptr) {
return FastAllocator<128>::release(ptr); return FastAllocator<128>::release(ptr);
if (size <= 256) if (size <= 256)
return FastAllocator<256>::release(ptr); return FastAllocator<256>::release(ptr);
if (size <= 512)
return FastAllocator<512>::release(ptr);
if (size <= 1024)
return FastAllocator<1024>::release(ptr);
if (size <= 2048)
return FastAllocator<2048>::release(ptr);
if (size <= 4096)
return FastAllocator<4096>::release(ptr);
if (size <= 8192)
return FastAllocator<8192>::release(ptr);
if (size <= 16384)
return FastAllocator<16384>::release(ptr);
delete[](uint8_t*) ptr; delete[](uint8_t*) ptr;
} }
// Allocate a block of memory aligned to 4096 bytes. Size must be a multiple of
// 4096. Guaranteed not to return null. Use freeFast4kAligned to free.
[[nodiscard]] inline void* allocateFast4kAligned(int size) { [[nodiscard]] inline void* allocateFast4kAligned(int size) {
#if !defined(USE_JEMALLOC) #if !defined(USE_JEMALLOC)
// Use FastAllocator for sizes it supports to avoid internal fragmentation in some implementations of aligned_alloc // Use FastAllocator for sizes it supports to avoid internal fragmentation in some implementations of aligned_alloc
@ -296,6 +285,7 @@ inline void freeFast(int size, void* ptr) {
return result; return result;
} }
// Free a pointer returned from allocateFast4kAligned(size)
inline void freeFast4kAligned(int size, void* ptr) { inline void freeFast4kAligned(int size, void* ptr) {
#if !defined(USE_JEMALLOC) #if !defined(USE_JEMALLOC)
// Sizes supported by FastAllocator must be release via FastAllocator // Sizes supported by FastAllocator must be release via FastAllocator

View File

@ -19,6 +19,7 @@
*/ */
#include "flow/flat_buffers.h" #include "flow/flat_buffers.h"
#include "flow/FileIdentifier.h"
#include "flow/UnitTest.h" #include "flow/UnitTest.h"
#include "flow/Arena.h" #include "flow/Arena.h"
#include "flow/serialize.h" #include "flow/serialize.h"
@ -26,6 +27,7 @@
#include <algorithm> #include <algorithm>
#include <iomanip> #include <iomanip>
#include <unordered_set>
#include <variant> #include <variant>
namespace detail { namespace detail {
@ -361,6 +363,7 @@ struct string_serialized_traits<Void> : std::true_type {
namespace unit_tests { namespace unit_tests {
struct Y1 { struct Y1 {
constexpr static FileIdentifier file_identifier = 338229;
int a; int a;
template <class Archiver> template <class Archiver>
@ -369,6 +372,14 @@ struct Y1 {
} }
}; };
struct Y1Hasher {
std::size_t operator()(const Y1& y) const noexcept { return std::hash<int>()(y.a); }
};
struct Y1Equal {
bool operator()(const Y1& l, const Y1& r) const { return l.a == r.a; }
};
struct Y2 { struct Y2 {
int a; int a;
std::variant<int> b; std::variant<int> b;
@ -563,4 +574,43 @@ TEST_CASE("/flow/FlatBuffers/EmptyPreSerVectorRefs") {
return Void(); return Void();
} }
TEST_CASE("/flow/FlatBuffers/EmptyUnorderedSet") {
int kSize = deterministicRandom()->randomInt(0, 100);
Standalone<StringRef> msg =
ObjectWriter::toValue(std::vector<std::unordered_set<Y1, Y1Hasher, Y1Equal>>(kSize), Unversioned());
ObjectReader rd(msg.begin(), Unversioned());
std::vector<std::unordered_set<Y1, Y1Hasher, Y1Equal>> xs;
rd.deserialize(xs);
ASSERT(xs.size() == kSize);
for (const auto& x : xs) {
ASSERT(x.size() == 0);
}
return Void();
}
TEST_CASE("/flow/FlatBuffers/NonEmptyUnorderedSet") {
int kSize = deterministicRandom()->randomInt(0, 100);
std::vector<std::unordered_set<Y1, Y1Hasher, Y1Equal>> src;
std::unordered_set<Y1, Y1Hasher, Y1Equal> s;
for (int i = 0; i < kSize; i++) {
Y1 y;
y.a = i;
s.insert(y);
}
src.push_back(s);
Standalone<StringRef> msg = ObjectWriter::toValue(src, Unversioned());
ObjectReader rd(msg.begin(), Unversioned());
std::vector<std::unordered_set<Y1, Y1Hasher, Y1Equal>> xs;
rd.deserialize(xs);
ASSERT(xs.size() == 1);
ASSERT(xs[0].size() == kSize);
for (int i = 0; i < kSize; i++) {
Y1 y;
y.a = i;
ASSERT(xs[0].find(y) != xs[0].end());
}
return Void();
}
} // namespace unit_tests } // namespace unit_tests

View File

@ -35,6 +35,7 @@
#include <cstring> #include <cstring>
#include <array> #include <array>
#include <unordered_map> #include <unordered_map>
#include <unordered_set>
#include <deque> #include <deque>
#include "flow/FileIdentifier.h" #include "flow/FileIdentifier.h"
#include "flow/ObjectSerializerTraits.h" #include "flow/ObjectSerializerTraits.h"
@ -250,6 +251,31 @@ struct vector_like_traits<std::set<Key, Compare, Allocator>> : std::true_type {
return v.begin(); return v.begin();
} }
}; };
template <class Key, class Hash, class KeyEqual, class Allocator>
struct vector_like_traits<std::unordered_set<Key, Hash, KeyEqual, Allocator>> : std::true_type {
using Vec = std::unordered_set<Key, Hash, KeyEqual, Allocator>;
using value_type = Key;
using iterator = typename Vec::const_iterator;
using insert_iterator = std::insert_iterator<Vec>;
template <class Context>
static size_t num_entries(const Vec& v, Context&) {
return v.size();
}
template <class Context>
static void reserve(Vec& v, size_t size, Context&) {
v.reserve(size);
}
template <class Context>
static insert_iterator insert(Vec& v, Context&) {
return std::inserter(v, v.end());
}
template <class Context>
static iterator begin(const Vec& v, Context&) {
return v.begin();
}
};
template <> template <>
struct dynamic_size_traits<std::string> : std::true_type { struct dynamic_size_traits<std::string> : std::true_type {

View File

@ -20,6 +20,7 @@
#ifndef FLOW_SERIALIZE_H #ifndef FLOW_SERIALIZE_H
#define FLOW_SERIALIZE_H #define FLOW_SERIALIZE_H
#include <unordered_set>
#pragma once #pragma once
#include <stdint.h> #include <stdint.h>
@ -172,6 +173,13 @@ template <class T, class Allocator>
struct CompositionDepthFor<std::vector<T, Allocator>> : std::integral_constant<int, CompositionDepthFor<T>::value + 1> { struct CompositionDepthFor<std::vector<T, Allocator>> : std::integral_constant<int, CompositionDepthFor<T>::value + 1> {
}; };
template <class Key, class Hash, class KeyEqual, class Allocator>
struct FileIdentifierFor<std::unordered_set<Key, Hash, KeyEqual, Allocator>> : ComposedIdentifierExternal<Key, 6> {};
template <class Key, class Hash, class KeyEqual, class Allocator>
struct CompositionDepthFor<std::unordered_set<Key, Hash, KeyEqual, Allocator>>
: std::integral_constant<int, CompositionDepthFor<Key>::value + 1> {};
template <class Archive, class T> template <class Archive, class T>
inline void save(Archive& ar, const std::vector<T>& value) { inline void save(Archive& ar, const std::vector<T>& value) {
ar << (int)value.size(); ar << (int)value.size();
@ -762,9 +770,6 @@ private:
public: public:
static PacketBuffer* create(size_t size = 0) { static PacketBuffer* create(size_t size = 0) {
size = std::max(size, PACKET_BUFFER_MIN_SIZE - PACKET_BUFFER_OVERHEAD); size = std::max(size, PACKET_BUFFER_MIN_SIZE - PACKET_BUFFER_OVERHEAD);
if (size == PACKET_BUFFER_MIN_SIZE - PACKET_BUFFER_OVERHEAD) {
return new (FastAllocator<PACKET_BUFFER_MIN_SIZE>::allocate()) PacketBuffer{ size };
}
uint8_t* mem = new uint8_t[size + PACKET_BUFFER_OVERHEAD]; uint8_t* mem = new uint8_t[size + PACKET_BUFFER_OVERHEAD];
return new (mem) PacketBuffer{ size }; return new (mem) PacketBuffer{ size };
} }
@ -772,11 +777,7 @@ public:
void addref() { ++reference_count; } void addref() { ++reference_count; }
void delref() { void delref() {
if (!--reference_count) { if (!--reference_count) {
if (size_ == PACKET_BUFFER_MIN_SIZE - PACKET_BUFFER_OVERHEAD) { delete[] reinterpret_cast<uint8_t*>(this);
FastAllocator<PACKET_BUFFER_MIN_SIZE>::release(this);
} else {
delete[] this;
}
} }
} }
int bytes_unwritten() const { return size_ - bytes_written; } int bytes_unwritten() const { return size_ - bytes_written; }

View File

@ -1,5 +1,7 @@
[configuration] [configuration]
blobGranulesEnabled = true blobGranulesEnabled = true
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4]
[[test]] [[test]]
testTitle = 'BlobGranuleVerifyAtomicOps' testTitle = 'BlobGranuleVerifyAtomicOps'

View File

@ -1,5 +1,7 @@
[configuration] [configuration]
blobGranulesEnabled = true blobGranulesEnabled = true
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4]
[[test]] [[test]]
testTitle = 'BlobGranuleVerifyCycle' testTitle = 'BlobGranuleVerifyCycle'

View File

@ -1,6 +1,8 @@
[configuration] [configuration]
blobGranulesEnabled = true blobGranulesEnabled = true
storageEngineExcludeTypes = [3] # FIXME: exclude redwood because WriteDuringRead can write massive KV pairs and we don't chunk change feed data on disk yet # FIXME: exclude redwood because WriteDuringRead can write massive KV pairs and we don't chunk change feed data on disk yet
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [3, 4]
[[test]] [[test]]
testTitle = 'BlobGranuleVerifySmall' testTitle = 'BlobGranuleVerifySmall'

View File

@ -1,6 +1,8 @@
[configuration] [configuration]
blobGranulesEnabled = true blobGranulesEnabled = true
storageEngineExcludeTypes = [3] # FIXME: exclude redwood because WriteDuringRead can write massive KV pairs and we don't chunk change feed data on disk yet # FIXME: exclude redwood because WriteDuringRead can write massive KV pairs and we don't chunk change feed data on disk yet
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [3, 4]
[[test]] [[test]]
testTitle = 'BlobGranuleVerifySmallClean' testTitle = 'BlobGranuleVerifySmallClean'

View File

@ -1,5 +1,7 @@
[configuration] [configuration]
blobGranulesEnabled = true blobGranulesEnabled = true
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4]
[[test]] [[test]]
testTitle = 'BlobGranuleCorrectness' testTitle = 'BlobGranuleCorrectness'

View File

@ -1,5 +1,7 @@
[configuration] [configuration]
blobGranulesEnabled = true blobGranulesEnabled = true
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4]
[[test]] [[test]]
testTitle = 'BlobGranuleCorrectness' testTitle = 'BlobGranuleCorrectness'

View File

@ -1,5 +1,7 @@
[configuration] [configuration]
blobGranulesEnabled = true blobGranulesEnabled = true
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4]
[[test]] [[test]]
testTitle = 'BlobGranuleVerifyBalance' testTitle = 'BlobGranuleVerifyBalance'

View File

@ -1,5 +1,7 @@
[configuration] [configuration]
blobGranulesEnabled = true blobGranulesEnabled = true
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4]
[[test]] [[test]]
testTitle = 'BlobGranuleVerifyBalanceClean' testTitle = 'BlobGranuleVerifyBalanceClean'

View File

@ -1,5 +1,7 @@
[configuration] [configuration]
blobGranulesEnabled = true blobGranulesEnabled = true
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4]
[[test]] [[test]]
testTitle = 'BlobGranuleVerifyLarge' testTitle = 'BlobGranuleVerifyLarge'

View File

@ -1,5 +1,7 @@
[configuration] [configuration]
blobGranulesEnabled = true blobGranulesEnabled = true
# FIXME: re-enable rocks at some point
storageEngineExcludeTypes = [4]
[[test]] [[test]]
testTitle = 'BlobGranuleVerifyLargeClean' testTitle = 'BlobGranuleVerifyLargeClean'