Merge branch 'main' of github.com:apple/foundationdb into metacluster-assigned-cluster

This commit is contained in:
Jon Fu 2022-09-26 09:59:20 -07:00
commit 2d1a6b4434
68 changed files with 1256 additions and 1133 deletions

View File

@ -44,7 +44,8 @@ private:
OP_GET_GRANULES,
OP_SUMMARIZE,
OP_GET_BLOB_RANGES,
OP_LAST = OP_GET_BLOB_RANGES
OP_VERIFY,
OP_LAST = OP_VERIFY
};
std::vector<OpType> excludedOpTypes;
@ -156,6 +157,12 @@ private:
}
void randomSummarizeOp(TTaskFct cont, std::optional<int> tenantId) {
if (!seenReadSuccess) {
// tester can't handle this throwing bg_txn_too_old, so just don't call it unless we have already seen a
// read success
schedule(cont);
return;
}
fdb::Key begin = randomKeyName();
fdb::Key end = randomKeyName();
if (begin > end) {
@ -174,11 +181,9 @@ private:
true);
},
[this, begin, end, results, cont]() {
if (seenReadSuccess) {
ASSERT(results->size() > 0);
ASSERT(results->front().keyRange.beginKey <= begin);
ASSERT(results->back().keyRange.endKey >= end);
}
ASSERT(results->size() > 0);
ASSERT(results->front().keyRange.beginKey <= begin);
ASSERT(results->back().keyRange.endKey >= end);
for (int i = 0; i < results->size(); i++) {
// TODO: could do validation of subsequent calls and ensure snapshot version never decreases
@ -254,6 +259,39 @@ private:
/* failOnError = */ false);
}
void randomVerifyOp(TTaskFct cont) {
fdb::Key begin = randomKeyName();
fdb::Key end = randomKeyName();
if (begin > end) {
std::swap(begin, end);
}
auto verifyVersion = std::make_shared<int64_t>(false);
// info("Verify op starting");
execOperation(
[begin, end, verifyVersion](auto ctx) {
fdb::Future f = ctx->db().verifyBlobRange(begin, end, -2 /* latest version*/).eraseType();
ctx->continueAfter(f, [ctx, verifyVersion, f]() {
*verifyVersion = f.get<fdb::future_var::Int64>();
ctx->done();
});
},
[this, begin, end, verifyVersion, cont]() {
if (*verifyVersion == -1) {
ASSERT(!seenReadSuccess);
} else {
if (!seenReadSuccess) {
info("BlobGranuleCorrectness::randomVerifyOp first success");
}
seenReadSuccess = true;
}
// info(fmt::format("verify op done @ {}", *verifyVersion));
schedule(cont);
},
/* failOnError = */ false);
}
void randomOperation(TTaskFct cont) {
std::optional<int> tenantId = randomTenant();
@ -284,6 +322,9 @@ private:
case OP_GET_BLOB_RANGES:
randomGetBlobRangesOp(cont);
break;
case OP_VERIFY:
randomVerifyOp(cont);
break;
}
}
};

View File

@ -34,10 +34,21 @@ private:
OP_READ_NO_MATERIALIZE,
OP_READ_FILE_LOAD_ERROR,
OP_READ_TOO_OLD,
OP_CANCEL_RANGES,
OP_LAST = OP_CANCEL_RANGES
OP_PURGE_UNALIGNED,
OP_BLOBBIFY_UNALIGNED,
OP_UNBLOBBIFY_UNALIGNED,
OP_CANCEL_GET_GRANULES,
OP_CANCEL_GET_RANGES,
OP_CANCEL_VERIFY,
OP_CANCEL_SUMMARIZE,
OP_CANCEL_BLOBBIFY,
OP_CANCEL_UNBLOBBIFY,
OP_CANCEL_PURGE,
OP_LAST = OP_CANCEL_PURGE
};
// could add summarize too old and verify too old as ops if desired but those are lower value
// Allow reads at the start to get blob_granule_transaction_too_old if BG data isn't initialized yet
// FIXME: should still guarantee a read succeeds eventually somehow
bool seenReadSuccess = false;
@ -74,9 +85,6 @@ private:
error(fmt::format("Operation succeeded in error test!"));
}
ASSERT(err.code() != error_code_success);
if (err.code() != error_code_blob_granule_transaction_too_old) {
seenReadSuccess = true;
}
if (err.code() != expectedError) {
info(fmt::format("incorrect error. Expected {}, Got {}", expectedError, err.code()));
if (err.code() == error_code_blob_granule_transaction_too_old) {
@ -86,6 +94,9 @@ private:
ctx->onError(err);
}
} else {
if (err.code() != error_code_blob_granule_transaction_too_old) {
seenReadSuccess = true;
}
ctx->done();
}
},
@ -107,7 +118,55 @@ private:
doErrorOp(cont, "", true, 1, error_code_blob_granule_transaction_too_old);
}
void randomCancelGetRangesOp(TTaskFct cont) {
void randomPurgeUnalignedOp(TTaskFct cont) {
// blobbify/unblobbify need to be aligned to blob range boundaries, so this should always fail
fdb::Key begin = randomKeyName();
fdb::Key end = randomKeyName();
if (begin > end) {
std::swap(begin, end);
}
execOperation(
[this, begin, end](auto ctx) {
fdb::Future f = ctx->db().purgeBlobGranules(begin, end, -2, false).eraseType();
ctx->continueAfter(
f,
[this, ctx, f]() {
info(fmt::format("unaligned purge got {}", f.error().code()));
ASSERT(f.error().code() == error_code_unsupported_operation);
ctx->done();
},
true);
},
[this, cont]() { schedule(cont); });
}
void randomBlobbifyUnalignedOp(bool blobbify, TTaskFct cont) {
// blobbify/unblobbify need to be aligned to blob range boundaries, so this should always return false
fdb::Key begin = randomKeyName();
fdb::Key end = randomKeyName();
if (begin > end) {
std::swap(begin, end);
}
auto success = std::make_shared<bool>(false);
execOperation(
[begin, end, blobbify, success](auto ctx) {
fdb::Future f = blobbify ? ctx->db().blobbifyRange(begin, end).eraseType()
: ctx->db().unblobbifyRange(begin, end).eraseType();
ctx->continueAfter(
f,
[ctx, f, success]() {
*success = f.get<fdb::future_var::Bool>();
ctx->done();
},
true);
},
[this, cont, success]() {
ASSERT(!(*success));
schedule(cont);
});
}
void randomCancelGetGranulesOp(TTaskFct cont) {
fdb::Key begin = randomKeyName();
fdb::Key end = randomKeyName();
if (begin > end) {
@ -121,6 +180,90 @@ private:
[this, cont]() { schedule(cont); });
}
void randomCancelGetRangesOp(TTaskFct cont) {
fdb::Key begin = randomKeyName();
fdb::Key end = randomKeyName();
if (begin > end) {
std::swap(begin, end);
}
execOperation(
[begin, end](auto ctx) {
fdb::Future f = ctx->db().listBlobbifiedRanges(begin, end, 1000).eraseType();
ctx->done();
},
[this, cont]() { schedule(cont); });
}
void randomCancelVerifyOp(TTaskFct cont) {
fdb::Key begin = randomKeyName();
fdb::Key end = randomKeyName();
if (begin > end) {
std::swap(begin, end);
}
execOperation(
[begin, end](auto ctx) {
fdb::Future f = ctx->db().verifyBlobRange(begin, end, -2 /* latest version*/).eraseType();
ctx->done();
},
[this, cont]() { schedule(cont); });
}
void randomCancelSummarizeOp(TTaskFct cont) {
fdb::Key begin = randomKeyName();
fdb::Key end = randomKeyName();
if (begin > end) {
std::swap(begin, end);
}
execTransaction(
[begin, end](auto ctx) {
fdb::Future f = ctx->tx().summarizeBlobGranules(begin, end, -2, 1000).eraseType();
ctx->done();
},
[this, cont]() { schedule(cont); });
}
void randomCancelBlobbifyOp(TTaskFct cont) {
fdb::Key begin = randomKeyName();
fdb::Key end = randomKeyName();
if (begin > end) {
std::swap(begin, end);
}
execOperation(
[begin, end](auto ctx) {
fdb::Future f = ctx->db().blobbifyRange(begin, end).eraseType();
ctx->done();
},
[this, cont]() { schedule(cont); });
}
void randomCancelUnblobbifyOp(TTaskFct cont) {
fdb::Key begin = randomKeyName();
fdb::Key end = randomKeyName();
if (begin > end) {
std::swap(begin, end);
}
execOperation(
[begin, end](auto ctx) {
fdb::Future f = ctx->db().unblobbifyRange(begin, end).eraseType();
ctx->done();
},
[this, cont]() { schedule(cont); });
}
void randomCancelPurgeOp(TTaskFct cont) {
fdb::Key begin = randomKeyName();
fdb::Key end = randomKeyName();
if (begin > end) {
std::swap(begin, end);
}
execOperation(
[begin, end](auto ctx) {
fdb::Future f = ctx->db().purgeBlobGranules(begin, end, -2, false).eraseType();
ctx->done();
},
[this, cont]() { schedule(cont); });
}
void randomOperation(TTaskFct cont) override {
OpType txType = (OpType)Random::get().randomInt(0, OP_LAST);
switch (txType) {
@ -133,9 +276,37 @@ private:
case OP_READ_TOO_OLD:
randomOpReadTooOld(cont);
break;
case OP_CANCEL_RANGES:
case OP_PURGE_UNALIGNED:
// gets the correct error but it doesn't propagate properly in the test
// randomPurgeUnalignedOp(cont);
break;
case OP_BLOBBIFY_UNALIGNED:
randomBlobbifyUnalignedOp(true, cont);
break;
case OP_UNBLOBBIFY_UNALIGNED:
randomBlobbifyUnalignedOp(false, cont);
break;
case OP_CANCEL_GET_GRANULES:
randomCancelGetGranulesOp(cont);
break;
case OP_CANCEL_GET_RANGES:
randomCancelGetRangesOp(cont);
break;
case OP_CANCEL_VERIFY:
randomCancelVerifyOp(cont);
break;
case OP_CANCEL_SUMMARIZE:
randomCancelSummarizeOp(cont);
break;
case OP_CANCEL_BLOBBIFY:
randomCancelBlobbifyOp(cont);
break;
case OP_CANCEL_UNBLOBBIFY:
randomCancelUnblobbifyOp(cont);
break;
case OP_CANCEL_PURGE:
randomCancelPurgeOp(cont);
break;
}
}
};

View File

@ -155,6 +155,13 @@ struct None {
struct Type {};
static Error extract(native::FDBFuture*, Type&) noexcept { return Error(0); }
};
struct Bool {
using Type = native::fdb_bool_t;
static Error extract(native::FDBFuture* f, Type& out) noexcept {
auto err = native::fdb_future_get_bool(f, &out);
return Error(err);
}
};
struct Int64 {
using Type = int64_t;
static Error extract(native::FDBFuture* f, Type& out) noexcept {
@ -775,10 +782,43 @@ public:
TypedFuture<future_var::KeyRangeRefArray> listBlobbifiedRanges(KeyRef begin, KeyRef end, int rangeLimit) {
if (!db)
throw std::runtime_error("list_blobbified_ranges from null database");
throw std::runtime_error("listBlobbifiedRanges from null database");
return native::fdb_database_list_blobbified_ranges(
db.get(), begin.data(), intSize(begin), end.data(), intSize(end), rangeLimit);
}
TypedFuture<future_var::Int64> verifyBlobRange(KeyRef begin, KeyRef end, int64_t version) {
if (!db)
throw std::runtime_error("verifyBlobRange from null database");
return native::fdb_database_verify_blob_range(
db.get(), begin.data(), intSize(begin), end.data(), intSize(end), version);
}
TypedFuture<future_var::Bool> blobbifyRange(KeyRef begin, KeyRef end) {
if (!db)
throw std::runtime_error("blobbifyRange from null database");
return native::fdb_database_blobbify_range(db.get(), begin.data(), intSize(begin), end.data(), intSize(end));
}
TypedFuture<future_var::Bool> unblobbifyRange(KeyRef begin, KeyRef end) {
if (!db)
throw std::runtime_error("unblobbifyRange from null database");
return native::fdb_database_unblobbify_range(db.get(), begin.data(), intSize(begin), end.data(), intSize(end));
}
TypedFuture<future_var::KeyRef> purgeBlobGranules(KeyRef begin, KeyRef end, int64_t version, bool force) {
if (!db)
throw std::runtime_error("purgeBlobGranules from null database");
native::fdb_bool_t forceBool = force;
return native::fdb_database_purge_blob_granules(
db.get(), begin.data(), intSize(begin), end.data(), intSize(end), version, forceBool);
}
TypedFuture<future_var::None> waitPurgeGranulesComplete(KeyRef purgeKey) {
if (!db)
throw std::runtime_error("purgeBlobGranules from null database");
return native::fdb_database_wait_purge_granules_complete(db.get(), purgeKey.data(), intSize(purgeKey));
}
};
inline Error selectApiVersionNothrow(int version) {

View File

@ -75,38 +75,3 @@ add_custom_command(OUTPUT ${package_file}
add_custom_target(python_package DEPENDS ${package_file})
add_dependencies(python_package python_binding)
add_dependencies(packages python_package)
if (NOT WIN32 AND NOT OPEN_FOR_IDE)
add_fdbclient_test(
NAME single_process_fdbcli_tests
COMMAND ${CMAKE_SOURCE_DIR}/bindings/python/tests/fdbcli_tests.py
${CMAKE_BINARY_DIR}
@CLUSTER_FILE@
)
add_fdbclient_test(
NAME multi_process_fdbcli_tests
PROCESS_NUMBER 5
COMMAND ${CMAKE_SOURCE_DIR}/bindings/python/tests/fdbcli_tests.py
${CMAKE_BINARY_DIR}
@CLUSTER_FILE@
5
)
if (TARGET external_client) # external_client copies fdb_c to bindings/c/libfdb_c_external.so
add_fdbclient_test(
NAME single_process_external_client_fdbcli_tests
COMMAND ${CMAKE_SOURCE_DIR}/bindings/python/tests/fdbcli_tests.py
${CMAKE_BINARY_DIR}
@CLUSTER_FILE@
--external-client-library ${CMAKE_BINARY_DIR}/bindings/c/libfdb_c_external.so
)
add_fdbclient_test(
NAME multi_process_external_client_fdbcli_tests
PROCESS_NUMBER 5
COMMAND ${CMAKE_SOURCE_DIR}/bindings/python/tests/fdbcli_tests.py
${CMAKE_BINARY_DIR}
@CLUSTER_FILE@
5
--external-client-library ${CMAKE_BINARY_DIR}/bindings/c/libfdb_c_external.so
)
endif()
endif()

View File

@ -4,31 +4,42 @@ find_package(RocksDB 6.27.3)
include(ExternalProject)
if (RocksDB_FOUND)
set(RocksDB_CMAKE_ARGS
-DUSE_RTTI=1
-DPORTABLE=${PORTABLE_ROCKSDB}
-DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
-DCMAKE_C_FLAGS=${CMAKE_C_FLAGS}
-DCMAKE_CXX_STANDARD=${CMAKE_CXX_STANDARD}
-DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
-DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}
-DCMAKE_SHARED_LINKER_FLAGS=${CMAKE_SHARED_LINKER_FLAGS}
-DCMAKE_STATIC_LINKER_FLAGS=${CMAKE_STATIC_LINKER_FLAGS}
-DCMAKE_EXE_LINKER_FLAGS=${CMAKE_EXE_LINKER_FLAGS}
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
-DFAIL_ON_WARNINGS=OFF
-DWITH_GFLAGS=OFF
-DWITH_TESTS=OFF
-DWITH_TOOLS=OFF
-DWITH_CORE_TOOLS=OFF
-DWITH_BENCHMARK_TOOLS=OFF
-DWITH_BZ2=OFF
-DWITH_LZ4=ON
-DWITH_SNAPPY=OFF
-DWITH_ZLIB=OFF
-DWITH_ZSTD=OFF
-DWITH_LIBURING=${WITH_LIBURING}
-DWITH_TSAN=${USE_TSAN}
-DWITH_ASAN=${USE_ASAN}
-DWITH_UBSAN=${USE_UBSAN}
-DROCKSDB_BUILD_SHARED=OFF
-DCMAKE_POSITION_INDEPENDENT_CODE=True
)
if(ROCKSDB_FOUND)
ExternalProject_Add(rocksdb
SOURCE_DIR "${RocksDB_ROOT}"
DOWNLOAD_COMMAND ""
CMAKE_ARGS -DUSE_RTTI=1 -DPORTABLE=${PORTABLE_ROCKSDB}
-DCMAKE_CXX_STANDARD=${CMAKE_CXX_STANDARD}
-DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
-DFAIL_ON_WARNINGS=OFF
-DWITH_GFLAGS=OFF
-DWITH_TESTS=OFF
-DWITH_TOOLS=OFF
-DWITH_CORE_TOOLS=OFF
-DWITH_BENCHMARK_TOOLS=OFF
-DWITH_BZ2=OFF
-DWITH_LZ4=ON
-DWITH_SNAPPY=OFF
-DWITH_ZLIB=OFF
-DWITH_ZSTD=OFF
-DWITH_LIBURING=${WITH_LIBURING}
-DWITH_TSAN=${USE_TSAN}
-DWITH_ASAN=${USE_ASAN}
-DWITH_UBSAN=${USE_UBSAN}
-DROCKSDB_BUILD_SHARED=OFF
-DCMAKE_POSITION_INDEPENDENT_CODE=True
CMAKE_ARGS ${RocksDB_CMAKE_ARGS}
BUILD_BYPRODUCTS <BINARY_DIR>/librocksdb.a
INSTALL_COMMAND ""
)
@ -38,29 +49,9 @@ if (RocksDB_FOUND)
${BINARY_DIR}/librocksdb.a)
else()
ExternalProject_Add(rocksdb
URL https://github.com/facebook/rocksdb/archive/refs/tags/v6.27.3.tar.gz
URL_HASH SHA256=ee29901749b9132692b26f0a6c1d693f47d1a9ed8e3771e60556afe80282bf58
CMAKE_ARGS -DUSE_RTTI=1 -DPORTABLE=${PORTABLE_ROCKSDB}
-DCMAKE_CXX_STANDARD=${CMAKE_CXX_STANDARD}
-DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
-DFAIL_ON_WARNINGS=OFF
-DWITH_GFLAGS=OFF
-DWITH_TESTS=OFF
-DWITH_TOOLS=OFF
-DWITH_CORE_TOOLS=OFF
-DWITH_BENCHMARK_TOOLS=OFF
-DWITH_BZ2=OFF
-DWITH_LZ4=ON
-DWITH_SNAPPY=OFF
-DWITH_ZLIB=OFF
-DWITH_ZSTD=OFF
-DWITH_LIBURING=${WITH_LIBURING}
-DWITH_TSAN=${USE_TSAN}
-DWITH_ASAN=${USE_ASAN}
-DWITH_UBSAN=${USE_UBSAN}
-DROCKSDB_BUILD_SHARED=OFF
-DCMAKE_POSITION_INDEPENDENT_CODE=True
URL https://github.com/facebook/rocksdb/archive/refs/tags/v6.27.3.tar.gz
URL_HASH SHA256=ee29901749b9132692b26f0a6c1d693f47d1a9ed8e3771e60556afe80282bf58
CMAKE_ARGS ${RocksDB_CMAKE_ARGS}
BUILD_BYPRODUCTS <BINARY_DIR>/librocksdb.a
INSTALL_COMMAND ""
)
@ -70,7 +61,7 @@ else()
${BINARY_DIR}/librocksdb.a)
ExternalProject_Get_Property(rocksdb SOURCE_DIR)
set (ROCKSDB_INCLUDE_DIR "${SOURCE_DIR}/include")
set(ROCKSDB_INCLUDE_DIR "${SOURCE_DIR}/include")
set(ROCKSDB_FOUND TRUE)
endif()

View File

@ -292,19 +292,21 @@ else()
#add_compile_options(-fno-builtin-memcpy)
if (CLANG OR ICX)
add_compile_options()
if (APPLE OR USE_LIBCXX)
add_compile_options($<$<COMPILE_LANGUAGE:CXX>:-stdlib=libc++>)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++")
if (NOT APPLE)
if (STATIC_LINK_LIBCXX)
add_link_options(-static-libgcc -nostdlib++ -Wl,-Bstatic -lc++ -lc++abi -Wl,-Bdynamic)
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libgcc -nostdlib++ -Wl,-Bstatic -lc++ -lc++abi -Wl,-Bdynamic")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -static-libgcc -nostdlib++ -Wl,-Bstatic -lc++ -lc++abi -Wl,-Bdynamic")
endif()
add_link_options(-stdlib=libc++ -Wl,-build-id=sha1)
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -stdlib=libc++ -Wl,-build-id=sha1")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -stdlib=libc++ -Wl,-build-id=sha1")
endif()
endif()
if (NOT APPLE AND NOT USE_LIBCXX)
message(STATUS "Linking libatomic")
add_link_options(-latomic)
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -latomic")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -latomic")
endif()
if (OPEN_FOR_IDE)
add_compile_options(

View File

@ -232,7 +232,12 @@ set(COROUTINE_IMPL ${DEFAULT_COROUTINE_IMPL} CACHE STRING "Which coroutine imple
set(BUILD_AWS_BACKUP OFF CACHE BOOL "Build AWS S3 SDK backup client")
if (BUILD_AWS_BACKUP)
set(WITH_AWS_BACKUP ON)
if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
set(WITH_AWS_BACKUP ON)
else()
message(WARNING "BUILD_AWS_BACKUP set but ignored ${CMAKE_SYSTEM_PROCESSOR} is not supported yet")
set(WITH_AWS_BACKUP OFF)
endif()
else()
set(WITH_AWS_BACKUP OFF)
endif()

View File

@ -335,7 +335,12 @@ class TestRun:
command: List[str] = []
env: Dict[str, str] = os.environ.copy()
valgrind_file: Path | None = None
if self.use_valgrind:
if self.use_valgrind and self.binary == config.binary:
# Only run the binary under test under valgrind. There's nothing we
# can do about valgrind errors in old binaries anyway, and it makes
# the test take longer. Also old binaries weren't built with
# USE_VALGRIND=ON, and we have seen false positives with valgrind in
# such binaries.
command.append('valgrind')
valgrind_file = self.temp_path / Path('valgrind-{}.xml'.format(self.random_seed))
dbg_path = os.getenv('FDB_VALGRIND_DBGPATH')

View File

@ -0,0 +1,188 @@
/*
* BlobKeyCommand.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbcli/fdbcli.actor.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/IClientApi.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/NativeAPI.actor.h"
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
namespace {
ACTOR Future<bool> printBlobHistory(Database db, Key key, Optional<Version> version) {
fmt::print("Printing blob history for {0}", key.printable());
if (version.present()) {
fmt::print(" @ {0}", version.get());
}
fmt::print("\n");
state Transaction tr(db);
state KeyRange activeGranule;
state KeyRange queryRange(KeyRangeRef(key, keyAfter(key)));
loop {
try {
Standalone<VectorRef<KeyRangeRef>> granules = wait(tr.getBlobGranuleRanges(queryRange, 2));
if (granules.empty()) {
fmt::print("No active granule for {0}\n", key.printable());
return false;
}
ASSERT(granules.size() == 1);
activeGranule = granules[0];
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
fmt::print("Active granule: [{0} - {1})\n", activeGranule.begin.printable(), activeGranule.end.printable());
// get latest history entry for range
state GranuleHistory history;
loop {
try {
RangeResult result =
wait(tr.getRange(blobGranuleHistoryKeyRangeFor(activeGranule), 1, Snapshot::False, Reverse::True));
ASSERT(result.size() <= 1);
if (result.empty()) {
fmt::print("No history entry found\n");
return true;
}
std::pair<KeyRange, Version> decodedKey = decodeBlobGranuleHistoryKey(result[0].key);
ASSERT(activeGranule == decodedKey.first);
history = GranuleHistory(activeGranule, decodedKey.second, decodeBlobGranuleHistoryValue(result[0].value));
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
fmt::print("History:\n\n");
loop {
// print history
std::string boundaryChangeAction;
if (history.value.parentVersions.empty()) {
boundaryChangeAction = "root";
} else if (history.value.parentVersions.size() == 1) {
boundaryChangeAction = "split";
} else {
boundaryChangeAction = "merge";
}
fmt::print("{0}) {1}\n\t{2}\n\t{3}\n({4})\n\n",
history.version,
history.value.granuleID.toString(),
history.range.begin.printable(),
history.range.end.printable(),
boundaryChangeAction);
// traverse back
if (history.value.parentVersions.empty() || (version.present() && history.version <= version.get())) {
break;
}
int i;
for (i = 0; i < history.value.parentBoundaries.size(); i++) {
if (history.value.parentBoundaries[i] <= key) {
break;
}
}
// key should fall between boundaries
ASSERT(i < history.value.parentBoundaries.size());
KeyRangeRef parentRange(history.value.parentBoundaries[i], history.value.parentBoundaries[i + 1]);
Version parentVersion = history.value.parentVersions[i];
state Key parentHistoryKey = blobGranuleHistoryKeyFor(parentRange, parentVersion);
state bool foundParent;
loop {
try {
Optional<Value> parentHistoryValue = wait(tr.get(parentHistoryKey));
foundParent = parentHistoryValue.present();
if (foundParent) {
std::pair<KeyRange, Version> decodedKey = decodeBlobGranuleHistoryKey(parentHistoryKey);
history = GranuleHistory(
decodedKey.first, decodedKey.second, decodeBlobGranuleHistoryValue(parentHistoryValue.get()));
}
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
if (!foundParent) {
break;
}
}
fmt::print("Done\n");
return true;
}
} // namespace
namespace fdb_cli {
ACTOR Future<bool> blobKeyCommandActor(Database localDb,
Optional<TenantMapEntry> tenantEntry,
std::vector<StringRef> tokens) {
// enables blob writing for the given range
if (tokens.size() != 3 && tokens.size() != 4) {
printUsage(tokens[0]);
return false;
}
ASSERT(tokens[1] == "history"_sr);
Key key;
Optional<Version> version;
if (tenantEntry.present()) {
key = tokens[2].withPrefix(tenantEntry.get().prefix);
} else {
key = tokens[2];
}
if (tokens.size() > 3) {
Version v;
int n = 0;
if (sscanf(tokens[3].toString().c_str(), "%" PRId64 "%n", &v, &n) != 1 || n != tokens[3].size()) {
printUsage(tokens[0]);
return false;
}
version = v;
}
if (key >= LiteralStringRef("\xff")) {
fmt::print("No blob history for system keyspace\n", key.printable());
return false;
} else {
bool result = wait(printBlobHistory(localDb, key, version));
return result;
}
}
// can extend to other blobkey commands later
CommandFactory blobKeyFactory("blobkey", CommandHelp("blobkey history <key> [version]", "", ""));
} // namespace fdb_cli

View File

@ -1,3 +1,4 @@
include(AddFdbTest)
fdb_find_sources(FDBCLI_SRCS)
add_flow_target(EXECUTABLE NAME fdbcli SRCS ${FDBCLI_SRCS})
@ -23,3 +24,38 @@ if(NOT OPEN_FOR_IDE)
fdb_install(PROGRAMS ${CMAKE_BINARY_DIR}/packages/bin/fdbcli DESTINATION bin COMPONENT clients)
endif()
endif()
if (NOT WIN32 AND NOT OPEN_FOR_IDE)
add_dependencies(fdbcli external_client)
add_fdbclient_test(
NAME single_process_fdbcli_tests
COMMAND ${CMAKE_SOURCE_DIR}/fdbcli/tests/fdbcli_tests.py
${CMAKE_BINARY_DIR}
@CLUSTER_FILE@
)
add_fdbclient_test(
NAME multi_process_fdbcli_tests
PROCESS_NUMBER 5
COMMAND ${CMAKE_SOURCE_DIR}/fdbcli/tests/fdbcli_tests.py
${CMAKE_BINARY_DIR}
@CLUSTER_FILE@
5
)
add_fdbclient_test(
NAME single_process_external_client_fdbcli_tests
COMMAND ${CMAKE_SOURCE_DIR}/fdbcli/tests/fdbcli_tests.py
${CMAKE_BINARY_DIR}
@CLUSTER_FILE@
--external-client-library ${CMAKE_BINARY_DIR}/bindings/c/libfdb_c_external.so
)
add_fdbclient_test(
NAME multi_process_external_client_fdbcli_tests
PROCESS_NUMBER 5
COMMAND ${CMAKE_SOURCE_DIR}/fdbcli/tests/fdbcli_tests.py
${CMAKE_BINARY_DIR}
@CLUSTER_FILE@
5
--external-client-library ${CMAKE_BINARY_DIR}/bindings/c/libfdb_c_external.so
)
endif()

View File

@ -1390,6 +1390,13 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
continue;
}
if (tokencmp(tokens[0], "blobkey")) {
bool _result = wait(makeInterruptable(blobKeyCommandActor(localDb, tenantEntry, tokens)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "unlock")) {
if ((tokens.size() != 2) || (tokens[1].size() != 32) ||
!std::all_of(tokens[1].begin(), tokens[1].end(), &isxdigit)) {

View File

@ -208,6 +208,11 @@ ACTOR Future<bool> changeFeedCommandActor(Database localDb,
ACTOR Future<bool> blobRangeCommandActor(Database localDb,
Optional<TenantMapEntry> tenantEntry,
std::vector<StringRef> tokens);
// blobkey command
ACTOR Future<bool> blobKeyCommandActor(Database localDb,
Optional<TenantMapEntry> tenantEntry,
std::vector<StringRef> tokens);
// maintenance command
ACTOR Future<bool> setHealthyZone(Reference<IDatabase> db, StringRef zoneId, double seconds, bool printWarning = false);
ACTOR Future<bool> clearHealthyZone(Reference<IDatabase> db,

View File

@ -1545,7 +1545,8 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
const KeyRangeRef& keyRange,
Version beginVersion,
Version readVersion,
ReadBlobGranuleContext granuleContext) {
ReadBlobGranuleContext granuleContext,
GranuleMaterializeStats& stats) {
int64_t parallelism = granuleContext.granuleParallelism;
if (parallelism < 1) {
parallelism = 1;
@ -1555,6 +1556,8 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
}
GranuleLoadIds loadIds[files.size()];
int64_t inputBytes = 0;
int64_t outputBytes = 0;
try {
// Kick off first file reads if parallelism > 1
@ -1579,6 +1582,7 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
if (!snapshotData.get().begin()) {
return ErrorOr<RangeResult>(blob_granule_file_load_error());
}
inputBytes += snapshotData.get().size();
}
// +1 to avoid UBSAN variable length array of size zero
@ -1591,18 +1595,25 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
if (!deltaData[i].begin()) {
return ErrorOr<RangeResult>(blob_granule_file_load_error());
}
inputBytes += deltaData[i].size();
}
inputBytes += files[chunkIdx].newDeltas.expectedSize();
// materialize rows from chunk
chunkRows =
materializeBlobGranule(files[chunkIdx], keyRange, beginVersion, readVersion, snapshotData, deltaData);
outputBytes += chunkRows.expectedSize();
results.arena().dependsOn(chunkRows.arena());
results.append(results.arena(), chunkRows.begin(), chunkRows.size());
// free once done by forcing FreeHandles to trigger
loadIds[chunkIdx].freeHandles.clear();
}
stats.inputBytes = inputBytes;
stats.outputBytes = outputBytes;
return ErrorOr<RangeResult>(results);
} catch (Error& e) {
return ErrorOr<RangeResult>(e);

View File

@ -1470,13 +1470,13 @@ DatabaseContext::DatabaseContext(Reference<AsyncVar<Reference<IClusterConnection
transactionsProcessBehind("ProcessBehind", cc), transactionsThrottled("Throttled", cc),
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc),
transactionCommitVersionNotFoundForSS("CommitVersionNotFoundForSS", cc), usedAnyChangeFeeds(false),
ccFeed("ChangeFeedClientMetrics"), feedStreamStarts("FeedStreamStarts", ccFeed),
feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed), feedErrors("FeedErrors", ccFeed),
feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed), feedPops("FeedPops", ccFeed),
feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000), commitLatencies(1000),
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), bgLatencies(1000), bgGranulesPerRequest(1000),
outstandingWatches(0), sharedStatePtr(nullptr), lastGrvTime(0.0), cachedReadVersion(0),
transactionCommitVersionNotFoundForSS("CommitVersionNotFoundForSS", cc), bgReadInputBytes("BGReadInputBytes", cc),
bgReadOutputBytes("BGReadOutputBytes", cc), usedAnyChangeFeeds(false), ccFeed("ChangeFeedClientMetrics"),
feedStreamStarts("FeedStreamStarts", ccFeed), feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed),
feedErrors("FeedErrors", ccFeed), feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed),
feedPops("FeedPops", ccFeed), feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000),
commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), bgLatencies(1000),
bgGranulesPerRequest(1000), outstandingWatches(0), sharedStatePtr(nullptr), lastGrvTime(0.0), cachedReadVersion(0),
lastRkBatchThrottleTime(0.0), lastRkDefaultThrottleTime(0.0), lastProxyRequestTime(0.0),
transactionTracingSample(false), taskID(taskID), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor),
coordinator(coordinator), apiVersion(_apiVersion), mvCacheInsertLocation(0), healthMetricsLastUpdated(0),
@ -1770,13 +1770,13 @@ DatabaseContext::DatabaseContext(const Error& err)
transactionsProcessBehind("ProcessBehind", cc), transactionsThrottled("Throttled", cc),
transactionsExpensiveClearCostEstCount("ExpensiveClearCostEstCount", cc),
transactionGrvFullBatches("NumGrvFullBatches", cc), transactionGrvTimedOutBatches("NumGrvTimedOutBatches", cc),
transactionCommitVersionNotFoundForSS("CommitVersionNotFoundForSS", cc), usedAnyChangeFeeds(false),
ccFeed("ChangeFeedClientMetrics"), feedStreamStarts("FeedStreamStarts", ccFeed),
feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed), feedErrors("FeedErrors", ccFeed),
feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed), feedPops("FeedPops", ccFeed),
feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000), commitLatencies(1000),
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), bgLatencies(1000), bgGranulesPerRequest(1000),
sharedStatePtr(nullptr), transactionTracingSample(false),
transactionCommitVersionNotFoundForSS("CommitVersionNotFoundForSS", cc), bgReadInputBytes("BGReadInputBytes", cc),
bgReadOutputBytes("BGReadOutputBytes", cc), usedAnyChangeFeeds(false), ccFeed("ChangeFeedClientMetrics"),
feedStreamStarts("FeedStreamStarts", ccFeed), feedMergeStreamStarts("FeedMergeStreamStarts", ccFeed),
feedErrors("FeedErrors", ccFeed), feedNonRetriableErrors("FeedNonRetriableErrors", ccFeed),
feedPops("FeedPops", ccFeed), feedPopsFallback("FeedPopsFallback", ccFeed), latencies(1000), readLatencies(1000),
commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), bgLatencies(1000),
bgGranulesPerRequest(1000), sharedStatePtr(nullptr), transactionTracingSample(false),
smoothMidShardSize(CLIENT_KNOBS->SHARD_STAT_SMOOTH_AMOUNT),
connectToDatabaseEventCacheHolder(format("ConnectToDatabase/%s", dbId.toString().c_str())) {}
@ -3936,7 +3936,6 @@ Future<RangeResultFamily> getExactRange(Reference<TransactionState> trState,
req.version = version;
req.begin = firstGreaterOrEqual(range.begin);
req.end = firstGreaterOrEqual(range.end);
setMatchIndex<GetKeyValuesFamilyRequest>(req, matchIndex);
req.spanContext = span.context;
trState->cx->getLatestCommitVersions(
@ -8077,6 +8076,11 @@ Transaction::summarizeBlobGranules(const KeyRange& range, Optional<Version> summ
return summarizeBlobGranulesActor(this, range, summaryVersion, rangeLimit);
}
void Transaction::addGranuleMaterializeStats(const GranuleMaterializeStats& stats) {
trState->cx->bgReadInputBytes += stats.inputBytes;
trState->cx->bgReadOutputBytes += stats.outputBytes;
}
ACTOR Future<Version> setPerpetualStorageWiggle(Database cx, bool enable, LockAware lockAware) {
state ReadYourWritesTransaction tr(cx);
state Version version = invalidVersion;

View File

@ -1842,6 +1842,13 @@ Future<Standalone<VectorRef<BlobGranuleSummaryRef>>> ReadYourWritesTransaction::
return waitOrError(tr.summarizeBlobGranules(range, summaryVersion, rangeLimit), resetPromise.getFuture());
}
void ReadYourWritesTransaction::addGranuleMaterializeStats(const GranuleMaterializeStats& stats) {
if (checkUsedDuringCommit()) {
throw used_during_commit();
}
tr.addGranuleMaterializeStats(stats);
}
void ReadYourWritesTransaction::addReadConflictRange(KeyRangeRef const& keys) {
if (checkUsedDuringCommit()) {
throw used_during_commit();

View File

@ -688,8 +688,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( BW_THROTTLING_ENABLED, true );
bool buggifySmallBWLag = randomize && BUGGIFY;
init( TARGET_BW_LAG, 50.0 ); if(buggifySmallBWLag) TARGET_BW_LAG = 10.0;
init( TARGET_BW_LAG_BATCH, 30.0 ); if(buggifySmallBWLag) TARGET_BW_LAG_BATCH = 4.0;
init( TARGET_BW_LAG, 240.0 ); if(buggifySmallBWLag) TARGET_BW_LAG = 10.0;
init( TARGET_BW_LAG_BATCH, 200.0 ); if(buggifySmallBWLag) TARGET_BW_LAG_BATCH = 4.0;
init( TARGET_BW_LAG_UPDATE, 9.0 ); if(buggifySmallBWLag) TARGET_BW_LAG_UPDATE = 1.0;
init( MIN_BW_HISTORY, 10 );
init( BW_ESTIMATION_INTERVAL, 10.0 ); if(buggifySmallBWLag) BW_ESTIMATION_INTERVAL = 2.0;
@ -783,10 +783,6 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( QUICK_GET_KEY_VALUES_LIMIT, 2000 );
init( QUICK_GET_KEY_VALUES_LIMIT_BYTES, 1e7 );
init( STORAGE_FEED_QUERY_HARD_LIMIT, 100000 );
init( STORAGE_SERVER_READ_CONCURRENCY, 70 );
// Priorities which each ReadType maps to, in enumeration order
init( STORAGESERVER_READ_RANKS, "0,2,1,1,1" );
init( STORAGESERVER_READ_PRIORITIES, "48,32,8" );
//Wait Failure
init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2;
@ -898,6 +894,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( REDWOOD_DEFAULT_EXTENT_SIZE, 32 * 1024 * 1024 );
init( REDWOOD_DEFAULT_EXTENT_READ_SIZE, 1024 * 1024 );
init( REDWOOD_EXTENT_CONCURRENT_READS, 4 );
init( REDWOOD_KVSTORE_CONCURRENT_READS, 64 );
init( REDWOOD_KVSTORE_RANGE_PREFETCH, true );
init( REDWOOD_PAGE_REBUILD_MAX_SLACK, 0.33 );
init( REDWOOD_LAZY_CLEAR_BATCH_SIZE_PAGES, 10 );
@ -910,7 +907,6 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( REDWOOD_HISTOGRAM_INTERVAL, 30.0 );
init( REDWOOD_EVICT_UPDATED_PAGES, true ); if( randomize && BUGGIFY ) { REDWOOD_EVICT_UPDATED_PAGES = false; }
init( REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT, 2 ); if( randomize && BUGGIFY ) { REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT = deterministicRandom()->randomInt(1, 7); }
init( REDWOOD_PRIORITY_LAUNCHS, "32,32,32,32" );
// Server request latency measurement
init( LATENCY_SAMPLE_SIZE, 100000 );

View File

@ -454,7 +454,13 @@ ThreadResult<RangeResult> ThreadSafeTransaction::readBlobGranulesFinish(
ReadBlobGranuleContext granuleContext) {
// do this work off of fdb network threads for performance!
Standalone<VectorRef<BlobGranuleChunkRef>> files = startFuture.get();
return loadAndMaterializeBlobGranules(files, keyRange, beginVersion, readVersion, granuleContext);
GranuleMaterializeStats stats;
auto ret = loadAndMaterializeBlobGranules(files, keyRange, beginVersion, readVersion, granuleContext, stats);
if (!ret.isError()) {
ISingleThreadTransaction* tr = this->tr;
onMainThreadVoid([tr, stats]() { tr->addGranuleMaterializeStats(stats); });
}
return ret;
}
ThreadFuture<Standalone<VectorRef<BlobGranuleSummaryRef>>> ThreadSafeTransaction::summarizeBlobGranules(

View File

@ -22,6 +22,8 @@
#include "flow/UnitTest.h"
const uint8_t VERSIONSTAMP_96_CODE = 0x33;
const uint8_t USER_TYPE_START = 0x40;
const uint8_t USER_TYPE_END = 0x4f;
// TODO: Many functions copied from bindings/flow/Tuple.cpp. Merge at some point.
static float bigEndianFloat(float orig) {
@ -59,7 +61,7 @@ static void adjustFloatingPoint(uint8_t* bytes, size_t size, bool encode) {
}
}
Tuple::Tuple(StringRef const& str, bool exclude_incomplete) {
Tuple::Tuple(StringRef const& str, bool exclude_incomplete, bool include_user_type) {
data.append(data.arena(), str.begin(), str.size());
size_t i = 0;
@ -80,6 +82,9 @@ Tuple::Tuple(StringRef const& str, bool exclude_incomplete) {
i += 1;
} else if (data[i] == VERSIONSTAMP_96_CODE) {
i += VERSIONSTAMP_TUPLE_SIZE + 1;
} else if (include_user_type && isUserType(data[i])) {
// User defined codes must come at the end of a Tuple and are not delimited.
i = data.size();
} else {
throw invalid_tuple_data_type();
}
@ -94,6 +99,14 @@ Tuple Tuple::unpack(StringRef const& str, bool exclude_incomplete) {
return Tuple(str, exclude_incomplete);
}
Tuple Tuple::unpackUserType(StringRef const& str, bool exclude_incomplete) {
return Tuple(str, exclude_incomplete, true);
}
bool Tuple::isUserType(uint8_t code) const {
return code >= USER_TYPE_START && code <= USER_TYPE_END;
}
Tuple& Tuple::append(Tuple const& tuple) {
for (size_t offset : tuple.offsets) {
offsets.push_back(offset + data.size());
@ -218,6 +231,15 @@ Tuple& Tuple::appendNull() {
return append(nullptr);
}
Tuple& Tuple::append(Tuple::UserTypeStr const& udt) {
offsets.push_back(data.size());
ASSERT(isUserType(udt.code));
data.push_back(data.arena(), udt.code);
data.append(data.arena(), udt.str.begin(), udt.str.size());
return *this;
}
Tuple::ElementType Tuple::getType(size_t index) const {
if (index >= offsets.size()) {
throw invalid_tuple_index();
@ -241,6 +263,8 @@ Tuple::ElementType Tuple::getType(size_t index) const {
return ElementType::BOOL;
} else if (code == VERSIONSTAMP_96_CODE) {
return ElementType::VERSIONSTAMP;
} else if (isUserType(code)) {
return ElementType::USER_TYPE;
} else {
throw invalid_tuple_data_type();
}
@ -401,6 +425,29 @@ Versionstamp Tuple::getVersionstamp(size_t index) const {
return Versionstamp(StringRef(data.begin() + offsets[index] + 1, VERSIONSTAMP_TUPLE_SIZE));
}
Tuple::UserTypeStr Tuple::getUserType(size_t index) const {
// Valid index.
if (index >= offsets.size()) {
throw invalid_tuple_index();
}
// Valid user type code.
ASSERT_LT(offsets[index], data.size());
uint8_t code = data[offsets[index]];
if (!isUserType(code)) {
throw invalid_tuple_data_type();
}
size_t start = offsets[index] + 1;
Standalone<StringRef> str;
VectorRef<uint8_t> staging;
staging.append(str.arena(), data.begin() + start, data.size() - start);
str.StringRef::operator=(StringRef(staging.begin(), staging.size()));
return Tuple::UserTypeStr(code, str);
}
KeyRange Tuple::range(Tuple const& tuple) const {
VectorRef<uint8_t> begin;
VectorRef<uint8_t> end;
@ -440,9 +487,16 @@ StringRef Tuple::subTupleRawString(size_t index) const {
return StringRef(data.begin() + offsets[index], endPos - offsets[index]);
}
TEST_CASE("fdbclient/Tuple/makeTuple") {
Tuple t1 = Tuple::makeTuple(
1, 1.0f, 1.0, false, "byteStr"_sr, Tuple::UnicodeStr("str"_sr), nullptr, Versionstamp("000000000000"_sr));
TEST_CASE("/fdbclient/Tuple/makeTuple") {
Tuple t1 = Tuple::makeTuple(1,
1.0f,
1.0,
false,
"byteStr"_sr,
Tuple::UnicodeStr("str"_sr),
nullptr,
Versionstamp("000000000000"_sr),
Tuple::UserTypeStr(0x41, "12345678"_sr));
Tuple t2 = Tuple()
.append(1)
.append(1.0f)
@ -451,7 +505,8 @@ TEST_CASE("fdbclient/Tuple/makeTuple") {
.append("byteStr"_sr)
.append(Tuple::UnicodeStr("str"_sr))
.append(nullptr)
.append(Versionstamp("000000000000"_sr));
.append(Versionstamp("000000000000"_sr))
.append(Tuple::UserTypeStr(0x41, "12345678"_sr));
ASSERT(t1.pack() == t2.pack());
ASSERT(t1.getType(0) == Tuple::INT);
@ -462,7 +517,45 @@ TEST_CASE("fdbclient/Tuple/makeTuple") {
ASSERT(t1.getType(5) == Tuple::UTF8);
ASSERT(t1.getType(6) == Tuple::NULL_TYPE);
ASSERT(t1.getType(7) == Tuple::VERSIONSTAMP);
ASSERT(t1.size() == 8);
ASSERT(t1.getType(8) == Tuple::USER_TYPE);
ASSERT(t1.size() == 9);
return Void();
}
TEST_CASE("/fdbclient/Tuple/unpack") {
Tuple t1 = Tuple::makeTuple(1,
1.0f,
1.0,
false,
"byteStr"_sr,
Tuple::UnicodeStr("str"_sr),
nullptr,
Versionstamp("000000000000"_sr),
Tuple::UserTypeStr(0x41, "12345678"_sr));
Standalone<StringRef> packed = t1.pack();
Tuple t2 = Tuple::unpackUserType(packed);
ASSERT(t2.pack() == t1.pack());
ASSERT(t2.getInt(0) == t1.getInt(0));
ASSERT(t2.getFloat(1) == t1.getFloat(1));
ASSERT(t2.getDouble(2) == t1.getDouble(2));
ASSERT(t2.getBool(3) == t1.getBool(3));
ASSERT(t2.getString(4) == t1.getString(4));
ASSERT(t2.getString(5) == t1.getString(5));
ASSERT(t2.getType(6) == Tuple::NULL_TYPE);
ASSERT(t2.getVersionstamp(7) == t1.getVersionstamp(7));
ASSERT(t2.getUserType(8) == t1.getUserType(8));
ASSERT(t2.size() == 9);
try {
Tuple t3 = Tuple::unpack(packed);
ASSERT(false);
} catch (Error& e) {
if (e.code() != error_code_invalid_tuple_data_type) {
throw e;
}
}
return Void();
}

View File

@ -55,6 +55,13 @@ struct GranuleDeltas : VectorRef<MutationsAndVersionRef> {
}
};
struct GranuleMaterializeStats {
int64_t inputBytes;
int64_t outputBytes;
GranuleMaterializeStats() : inputBytes(0), outputBytes(0) {}
};
struct BlobGranuleCipherKeysMeta {
EncryptCipherDomainId textDomainId;
EncryptCipherBaseKeyId textBaseCipherId;
@ -276,6 +283,17 @@ struct BlobGranuleHistoryValue {
}
};
struct GranuleHistory {
KeyRange range;
Version version;
Standalone<BlobGranuleHistoryValue> value;
GranuleHistory() {}
GranuleHistory(KeyRange range, Version version, Standalone<BlobGranuleHistoryValue> value)
: range(range), version(version), value(value) {}
};
// A manifest to assist full fdb restore from blob granule files
struct BlobManifest {
constexpr static FileIdentifier file_identifier = 298872;

View File

@ -43,7 +43,8 @@ ErrorOr<RangeResult> loadAndMaterializeBlobGranules(const Standalone<VectorRef<B
const KeyRangeRef& keyRange,
Version beginVersion,
Version readVersion,
ReadBlobGranuleContext granuleContext);
ReadBlobGranuleContext granuleContext,
GranuleMaterializeStats& stats);
RangeResult materializeBlobGranule(const BlobGranuleChunkRef& chunk,
KeyRangeRef keyRange,

View File

@ -544,6 +544,8 @@ public:
Counter transactionGrvFullBatches;
Counter transactionGrvTimedOutBatches;
Counter transactionCommitVersionNotFoundForSS;
Counter bgReadInputBytes;
Counter bgReadOutputBytes;
// Change Feed metrics. Omit change feed metrics from logging if not used
bool usedAnyChangeFeeds;

View File

@ -1581,7 +1581,13 @@ struct StorageWiggleValue {
}
};
enum class ReadType { EAGER = 0, FETCH = 1, LOW = 2, NORMAL = 3, HIGH = 4, MIN = EAGER, MAX = HIGH };
enum class ReadType {
EAGER,
FETCH,
LOW,
NORMAL,
HIGH,
};
FDB_DECLARE_BOOLEAN_PARAM(CacheResult);

View File

@ -70,6 +70,7 @@ public:
throw client_invalid_operation();
}
Future<int64_t> getEstimatedRangeSizeBytes(KeyRange const& keys) override { throw client_invalid_operation(); }
void addGranuleMaterializeStats(const GranuleMaterializeStats& stats) override { throw client_invalid_operation(); }
void addReadConflictRange(KeyRangeRef const& keys) override { throw client_invalid_operation(); }
void makeSelfConflicting() override { throw client_invalid_operation(); }
void atomicOp(KeyRef const& key, ValueRef const& operand, uint32_t operationType) override {

View File

@ -88,6 +88,7 @@ public:
virtual Future<Standalone<VectorRef<BlobGranuleSummaryRef>>> summarizeBlobGranules(KeyRange const& range,
Optional<Version> summaryVersion,
int rangeLimit) = 0;
virtual void addGranuleMaterializeStats(const GranuleMaterializeStats& stats) = 0;
virtual void addReadConflictRange(KeyRangeRef const& keys) = 0;
virtual void makeSelfConflicting() = 0;
virtual void atomicOp(KeyRef const& key, ValueRef const& operand, uint32_t operationType) = 0;

View File

@ -423,6 +423,8 @@ public:
Optional<Version> summaryVersion,
int rangeLimit);
void addGranuleMaterializeStats(const GranuleMaterializeStats& stats);
// If checkWriteConflictRanges is true, existing write conflict ranges will be searched for this key
void set(const KeyRef& key, const ValueRef& value, AddConflictRange = AddConflictRange::True);
void atomicOp(const KeyRef& key,
@ -477,7 +479,6 @@ public:
Database getDatabase() const { return trState->cx; }
static Reference<TransactionLogInfo> createTrLogInfoProbabilistically(const Database& cx);
Transaction& getTransaction() { return *this; }
void setTransactionID(UID id);
void setToken(uint64_t token);

View File

@ -130,6 +130,7 @@ public:
Future<Standalone<VectorRef<BlobGranuleSummaryRef>>> summarizeBlobGranules(const KeyRange& range,
Optional<Version> summaryVersion,
int rangeLimit) override;
void addGranuleMaterializeStats(const GranuleMaterializeStats& stats) override;
void addReadConflictRange(KeyRangeRef const& keys) override;
void makeSelfConflicting() override { tr.makeSelfConflicting(); }

View File

@ -735,9 +735,6 @@ public:
int QUICK_GET_KEY_VALUES_LIMIT;
int QUICK_GET_KEY_VALUES_LIMIT_BYTES;
int STORAGE_FEED_QUERY_HARD_LIMIT;
int STORAGE_SERVER_READ_CONCURRENCY;
std::string STORAGESERVER_READ_RANKS;
std::string STORAGESERVER_READ_PRIORITIES;
// Wait Failure
int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS;
@ -867,6 +864,7 @@ public:
int REDWOOD_DEFAULT_EXTENT_SIZE; // Extent size for new Redwood files
int REDWOOD_DEFAULT_EXTENT_READ_SIZE; // Extent read size for Redwood files
int REDWOOD_EXTENT_CONCURRENT_READS; // Max number of simultaneous extent disk reads in progress.
int REDWOOD_KVSTORE_CONCURRENT_READS; // Max number of simultaneous point or range reads in progress.
bool REDWOOD_KVSTORE_RANGE_PREFETCH; // Whether to use range read prefetching
double REDWOOD_PAGE_REBUILD_MAX_SLACK; // When rebuilding pages, max slack to allow in page
int REDWOOD_LAZY_CLEAR_BATCH_SIZE_PAGES; // Number of pages to try to pop from the lazy delete queue and process at
@ -885,8 +883,6 @@ public:
bool REDWOOD_EVICT_UPDATED_PAGES; // Whether to prioritize eviction of updated pages from cache.
int REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT; // Minimum height for which to keep and reuse page decode caches
std::string REDWOOD_PRIORITY_LAUNCHS;
// Server request latency measurement
int LATENCY_SAMPLE_SIZE;
double LATENCY_METRICS_LOGGING_INTERVAL;

View File

@ -33,6 +33,14 @@ struct Tuple {
explicit UnicodeStr(StringRef str) : str(str) {}
};
struct UserTypeStr {
uint8_t code;
Standalone<StringRef> str;
UserTypeStr(uint8_t code, StringRef str) : code(code), str(str) {}
bool operator==(const UserTypeStr& other) const { return (code == other.code && str == other.str); }
};
Tuple() {}
// Tuple parsing normally does not care of the final value is a numeric type and is incomplete.
@ -40,6 +48,7 @@ struct Tuple {
// Note that strings can't be incomplete because they are parsed such that the end of the packed
// byte string is considered the end of the string in lieu of a specific end.
static Tuple unpack(StringRef const& str, bool exclude_incomplete = false);
static Tuple unpackUserType(StringRef const& str, bool exclude_incomplete = false);
Tuple& append(Tuple const& tuple);
@ -55,6 +64,7 @@ struct Tuple {
Tuple& append(std::nullptr_t);
Tuple& appendNull();
Tuple& append(Versionstamp const&);
Tuple& append(UserTypeStr const&);
Standalone<StringRef> pack() const {
return Standalone<StringRef>(StringRef(data.begin(), data.size()), data.arena());
@ -65,7 +75,9 @@ struct Tuple {
return append(t);
}
enum ElementType { NULL_TYPE, INT, BYTES, UTF8, BOOL, FLOAT, DOUBLE, VERSIONSTAMP };
enum ElementType { NULL_TYPE, INT, BYTES, UTF8, BOOL, FLOAT, DOUBLE, VERSIONSTAMP, USER_TYPE };
bool isUserType(uint8_t code) const;
// this is number of elements, not length of data
size_t size() const { return offsets.size(); }
@ -85,6 +97,7 @@ struct Tuple {
bool getBool(size_t index) const;
float getFloat(size_t index) const;
double getDouble(size_t index) const;
Tuple::UserTypeStr getUserType(size_t index) const;
KeyRange range(Tuple const& tuple = Tuple()) const;
@ -107,7 +120,7 @@ struct Tuple {
}
private:
Tuple(const StringRef& data, bool exclude_incomplete = false);
Tuple(const StringRef& data, bool exclude_incomplete = false, bool exclude_user_type = false);
Standalone<VectorRef<uint8_t>> data;
std::vector<size_t> offsets;
};

View File

@ -451,7 +451,13 @@ public:
int physicalDatacenters;
int processesPerMachine;
int listenersPerProcess;
// We won't kill machines in this set, but we might reboot
// them. This is a conservative mechanism to prevent the
// simulator from killing off important processes and rendering
// the cluster unrecoverable, e.g. a quorum of coordinators.
std::set<NetworkAddress> protectedAddresses;
std::map<NetworkAddress, ProcessInfo*> currentlyRebootingProcesses;
std::vector<std::string> extraDatabases;
Reference<IReplicationPolicy> storagePolicy;

View File

@ -1688,15 +1688,20 @@ public:
}
void killProcess(ProcessInfo* machine, KillType kt) override {
TraceEvent("AttemptingKillProcess").detail("ProcessInfo", machine->toString());
if (kt < RebootAndDelete) {
// Refuse to kill a protected process.
if (kt < RebootAndDelete && protectedAddresses.count(machine->address) == 0) {
killProcess_internal(machine, kt);
}
}
void killInterface(NetworkAddress address, KillType kt) override {
if (kt < RebootAndDelete) {
std::vector<ProcessInfo*>& processes = machines[addressMap[address]->locality.machineId()].processes;
for (int i = 0; i < processes.size(); i++)
killProcess_internal(processes[i], kt);
for (auto& process : processes) {
// Refuse to kill a protected process.
if (protectedAddresses.count(process->address) == 0) {
killProcess_internal(process, kt);
}
}
}
}
bool killZone(Optional<Standalone<StringRef>> zoneId, KillType kt, bool forceKill, KillType* ktFinal) override {

View File

@ -508,7 +508,7 @@ static void alignKeyBoundary(Reference<BlobManagerData> bmData,
alignedKey = alignedKey.removePrefix(tenantData->entry.prefix);
}
try {
t = Tuple::unpack(alignedKey, true);
t = Tuple::unpackUserType(alignedKey, true);
if (t.size() > offset) {
t2 = t.subTuple(0, t.size() - offset);
alignedKey = t2.pack();

View File

@ -2437,10 +2437,13 @@ ACTOR Future<Void> blobGranuleUpdateFiles(Reference<BlobWorkerData> bwData,
previousFuture = Future<BlobFileIndex>(BlobFileIndex());
}
// The last version included in the old change feed is startState.cfStartVersion - 1.
// So if the previous delta file did not include this version, and the new delta file does, the old
// change feed is considered complete.
Optional<std::pair<KeyRange, UID>> oldChangeFeedDataComplete;
if (startState.splitParentGranule.present() &&
metadata->pendingDeltaVersion < startState.changeFeedStartVersion &&
lastDeltaVersion >= startState.changeFeedStartVersion) {
metadata->pendingDeltaVersion + 1 < startState.changeFeedStartVersion &&
lastDeltaVersion + 1 >= startState.changeFeedStartVersion) {
oldChangeFeedDataComplete = startState.splitParentGranule.get();
}

View File

@ -1112,7 +1112,7 @@ ACTOR Future<Void> readTransactionSystemState(Reference<ClusterRecoveryData> sel
self->recoveryTransactionVersion += deterministicRandom()->randomInt64(0, 10000000);
}
TraceEvent(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_RECOVERED_EVENT_NAME).c_str(),
TraceEvent(getRecoveryEventName(ClusterRecoveryEventType::CLUSTER_RECOVERY_RECOVERING_EVENT_NAME).c_str(),
self->dbgid)
.detail("LastEpochEnd", self->lastEpochEnd)
.detail("RecoveryTransactionVersion", self->recoveryTransactionVersion);

View File

@ -728,7 +728,7 @@ struct DDQueue : public IDDRelocationQueue {
DDQueue(UID mid,
MoveKeysLock lock,
Database cx,
const std::shared_ptr<IDDTxnProcessor>& db,
std::vector<TeamCollectionInterface> teamCollections,
Reference<ShardsAffectedByTeamFailure> sABTF,
Reference<PhysicalShardCollection> physicalShardCollection,
@ -739,7 +739,7 @@ struct DDQueue : public IDDRelocationQueue {
FutureStream<RelocateShard> input,
PromiseStream<GetMetricsRequest> getShardMetrics,
PromiseStream<GetTopKMetricsRequest> getTopKMetrics)
: IDDRelocationQueue(), distributorId(mid), lock(lock), cx(cx), txnProcessor(new DDTxnProcessor(cx)),
: IDDRelocationQueue(), distributorId(mid), lock(lock), cx(db->getDb()), txnProcessor(db),
teamCollections(teamCollections), shardsAffectedByTeamFailure(sABTF),
physicalShardCollection(physicalShardCollection), getAverageShardBytes(getAverageShardBytes),
startMoveKeysParallelismLock(SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM),
@ -1293,6 +1293,7 @@ struct DDQueue : public IDDRelocationQueue {
// Schedules cancellation of a data move.
void enqueueCancelledDataMove(UID dataMoveId, KeyRange range, const DDEnabledState* ddEnabledState) {
ASSERT(!txnProcessor->isMocked()); // the mock implementation currently doesn't support data move
std::vector<Future<Void>> cleanup;
auto f = this->dataMoves.intersectingRanges(range);
for (auto it = f.begin(); it != f.end(); ++it) {
@ -1324,6 +1325,24 @@ struct DDQueue : public IDDRelocationQueue {
}
int getUnhealthyRelocationCount() override { return unhealthyRelocations; }
Future<SrcDestTeamPair> getSrcDestTeams(const int& teamCollectionIndex,
const GetTeamRequest& srcReq,
const GetTeamRequest& destReq,
const int& priority,
TraceEvent* traceEvent);
Future<bool> rebalanceReadLoad(DataMovementReason moveReason,
Reference<IDataDistributionTeam> sourceTeam,
Reference<IDataDistributionTeam> destTeam,
bool primary,
TraceEvent* traceEvent);
Future<bool> rebalanceTeams(DataMovementReason moveReason,
Reference<IDataDistributionTeam const> sourceTeam,
Reference<IDataDistributionTeam const> destTeam,
bool primary,
TraceEvent* traceEvent);
};
ACTOR Future<Void> cancelDataMove(struct DDQueue* self, KeyRange range, const DDEnabledState* ddEnabledState) {
@ -2016,7 +2035,7 @@ ACTOR Future<bool> rebalanceReadLoad(DDQueue* self,
}
// randomly choose topK shards
int topK = std::max(1, std::min(int(0.1 * shards.size()), SERVER_KNOBS->READ_REBALANCE_SHARD_TOPK));
state Future<HealthMetrics> healthMetrics = self->cx->getHealthMetrics(true);
state Future<HealthMetrics> healthMetrics = self->txnProcessor->getHealthMetrics(true);
state GetTopKMetricsRequest req(
shards, topK, (srcLoad - destLoad) * SERVER_KNOBS->READ_REBALANCE_MAX_SHARD_FRAC, srcLoad / shards.size());
state GetTopKMetricsReply reply = wait(brokenPromiseToNever(self->getTopKMetrics.getReply(req)));
@ -2164,16 +2183,60 @@ ACTOR Future<SrcDestTeamPair> getSrcDestTeams(DDQueue* self,
return {};
}
Future<SrcDestTeamPair> DDQueue::getSrcDestTeams(const int& teamCollectionIndex,
const GetTeamRequest& srcReq,
const GetTeamRequest& destReq,
const int& priority,
TraceEvent* traceEvent) {
return ::getSrcDestTeams(this, teamCollectionIndex, srcReq, destReq, priority, traceEvent);
}
Future<bool> DDQueue::rebalanceReadLoad(DataMovementReason moveReason,
Reference<IDataDistributionTeam> sourceTeam,
Reference<IDataDistributionTeam> destTeam,
bool primary,
TraceEvent* traceEvent) {
return ::rebalanceReadLoad(this, moveReason, sourceTeam, destTeam, primary, traceEvent);
}
Future<bool> DDQueue::rebalanceTeams(DataMovementReason moveReason,
Reference<const IDataDistributionTeam> sourceTeam,
Reference<const IDataDistributionTeam> destTeam,
bool primary,
TraceEvent* traceEvent) {
return ::rebalanceTeams(this, moveReason, sourceTeam, destTeam, primary, traceEvent);
}
ACTOR Future<bool> getSkipRebalanceValue(std::shared_ptr<IDDTxnProcessor> txnProcessor, bool readRebalance) {
Optional<Value> val = wait(txnProcessor->readRebalanceDDIgnoreKey());
if (!val.present())
return false;
bool skipCurrentLoop = false;
// NOTE: check special value "" and "on" might written in old version < 7.2
if (val.get().size() > 0 && val.get() != "on"_sr) {
int ddIgnore = BinaryReader::fromStringRef<uint8_t>(val.get(), Unversioned());
if (readRebalance) {
skipCurrentLoop = (ddIgnore & DDIgnore::REBALANCE_READ) > 0;
} else {
skipCurrentLoop = (ddIgnore & DDIgnore::REBALANCE_DISK) > 0;
}
} else {
skipCurrentLoop = true;
}
return skipCurrentLoop;
}
ACTOR Future<Void> BgDDLoadRebalance(DDQueue* self, int teamCollectionIndex, DataMovementReason reason) {
state int resetCount = SERVER_KNOBS->DD_REBALANCE_RESET_AMOUNT;
state Transaction tr(self->cx);
state int resetCount = 0;
state double lastRead = 0;
state bool skipCurrentLoop = false;
state Future<Void> delayF = Never();
state const bool readRebalance = isDataMovementForReadBalancing(reason);
state const char* eventName =
isDataMovementForMountainChopper(reason) ? "BgDDMountainChopper_New" : "BgDDValleyFiller_New";
state const char* eventName = isDataMovementForMountainChopper(reason) ? "BgDDMountainChopper" : "BgDDValleyFiller";
state int ddPriority = dataMovementPriority(reason);
state double rebalancePollingInterval = 0;
loop {
state bool moved = false;
state Reference<IDataDistributionTeam> sourceTeam;
@ -2182,40 +2245,24 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueue* self, int teamCollectionIndex, Dat
state GetTeamRequest destReq;
state TraceEvent traceEvent(eventName, self->distributorId);
traceEvent.suppressFor(5.0)
.detail("PollingInterval", SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL)
.detail("PollingInterval", rebalancePollingInterval)
.detail("Rebalance", readRebalance ? "Read" : "Disk");
// NOTE: the DD throttling relies on DDQueue, so here just trigger the balancer periodically
wait(delay(rebalancePollingInterval, TaskPriority::DataDistributionLaunch));
try {
// NOTE: the DD throttling relies on DDQueue
delayF = delay(SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL, TaskPriority::DataDistributionLaunch);
if ((now() - lastRead) > SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL) {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<Value> val = wait(tr.get(rebalanceDDIgnoreKey));
wait(store(skipCurrentLoop, getSkipRebalanceValue(self->txnProcessor, readRebalance)));
lastRead = now();
if (!val.present()) {
skipCurrentLoop = false;
} else {
// NOTE: check special value "" and "on" might written in old version < 7.2
if (val.get().size() > 0 && val.get() != "on"_sr) {
int ddIgnore = BinaryReader::fromStringRef<uint8_t>(val.get(), Unversioned());
if (readRebalance) {
skipCurrentLoop = (ddIgnore & DDIgnore::REBALANCE_READ) > 0;
} else {
skipCurrentLoop = (ddIgnore & DDIgnore::REBALANCE_DISK) > 0;
}
} else {
skipCurrentLoop = true;
}
}
}
traceEvent.detail("Enabled", !skipCurrentLoop);
wait(delayF);
if (skipCurrentLoop) {
tr.reset();
rebalancePollingInterval =
std::max(rebalancePollingInterval, SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL);
continue;
} else {
rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
}
traceEvent.detail("QueuedRelocations", self->priority_relocations[ddPriority]);
@ -2235,7 +2282,7 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueue* self, int teamCollectionIndex, Dat
ForReadBalance(readRebalance),
PreferLowerReadUtil::True);
state Future<SrcDestTeamPair> getTeamFuture =
getSrcDestTeams(self, teamCollectionIndex, srcReq, destReq, ddPriority, &traceEvent);
self->getSrcDestTeams(teamCollectionIndex, srcReq, destReq, ddPriority, &traceEvent);
wait(ready(getTeamFuture));
sourceTeam = getTeamFuture.get().first;
destTeam = getTeamFuture.get().second;
@ -2243,9 +2290,9 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueue* self, int teamCollectionIndex, Dat
// clang-format off
if (sourceTeam.isValid() && destTeam.isValid()) {
if (readRebalance) {
wait(store(moved,rebalanceReadLoad(self, reason, sourceTeam, destTeam, teamCollectionIndex == 0, &traceEvent)));
wait(store(moved,self->rebalanceReadLoad( reason, sourceTeam, destTeam, teamCollectionIndex == 0, &traceEvent)));
} else {
wait(store(moved,rebalanceTeams(self, reason, sourceTeam, destTeam, teamCollectionIndex == 0, &traceEvent)));
wait(store(moved,self->rebalanceTeams( reason, sourceTeam, destTeam, teamCollectionIndex == 0, &traceEvent)));
}
}
// clang-format on
@ -2253,11 +2300,10 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueue* self, int teamCollectionIndex, Dat
}
traceEvent.detail("ResetCount", resetCount);
tr.reset();
} catch (Error& e) {
// Log actor_cancelled because it's not legal to suppress an event that's initialized
traceEvent.errorUnsuppressed(e);
wait(tr.onError(e));
throw;
}
traceEvent.detail("Moved", moved);
@ -2265,204 +2311,7 @@ ACTOR Future<Void> BgDDLoadRebalance(DDQueue* self, int teamCollectionIndex, Dat
}
}
ACTOR Future<Void> BgDDMountainChopper(DDQueue* self, int teamCollectionIndex) {
state double rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
state Transaction tr(self->cx);
state double lastRead = 0;
state bool skipCurrentLoop = false;
loop {
state std::pair<Optional<Reference<IDataDistributionTeam>>, bool> randomTeam;
state bool moved = false;
state TraceEvent traceEvent("BgDDMountainChopper_Old", self->distributorId);
traceEvent.suppressFor(5.0).detail("PollingInterval", rebalancePollingInterval).detail("Rebalance", "Disk");
try {
state Future<Void> delayF = delay(rebalancePollingInterval, TaskPriority::DataDistributionLaunch);
if ((now() - lastRead) > SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL) {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<Value> val = wait(tr.get(rebalanceDDIgnoreKey));
lastRead = now();
if (!val.present()) {
// reset loop interval
if (skipCurrentLoop) {
rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
}
skipCurrentLoop = false;
} else {
// NOTE: check special value "" and "on" might written in old version < 7.2
if (val.get().size() > 0 && val.get() != "on"_sr) {
int ddIgnore = BinaryReader::fromStringRef<uint8_t>(val.get(), Unversioned());
skipCurrentLoop = (ddIgnore & DDIgnore::REBALANCE_DISK) > 0;
} else {
skipCurrentLoop = true;
}
}
}
traceEvent.detail("Enabled", !skipCurrentLoop);
wait(delayF);
if (skipCurrentLoop) {
// set loop interval to avoid busy wait here.
rebalancePollingInterval =
std::max(rebalancePollingInterval, SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL);
tr.reset();
continue;
}
traceEvent.detail("QueuedRelocations",
self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM]);
if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM] <
SERVER_KNOBS->DD_REBALANCE_PARALLELISM) {
std::pair<Optional<Reference<IDataDistributionTeam>>, bool> _randomTeam =
wait(brokenPromiseToNever(self->teamCollections[teamCollectionIndex].getTeam.getReply(
GetTeamRequest(WantNewServers::True,
WantTrueBest::False,
PreferLowerDiskUtil::True,
TeamMustHaveShards::False))));
randomTeam = _randomTeam;
traceEvent.detail("DestTeam",
printable(randomTeam.first.map<std::string>(
[](const Reference<IDataDistributionTeam>& team) { return team->getDesc(); })));
if (randomTeam.first.present()) {
std::pair<Optional<Reference<IDataDistributionTeam>>, bool> loadedTeam =
wait(brokenPromiseToNever(self->teamCollections[teamCollectionIndex].getTeam.getReply(
GetTeamRequest(WantNewServers::True,
WantTrueBest::True,
PreferLowerDiskUtil::False,
TeamMustHaveShards::True))));
traceEvent.detail(
"SourceTeam",
printable(loadedTeam.first.map<std::string>(
[](const Reference<IDataDistributionTeam>& team) { return team->getDesc(); })));
if (loadedTeam.first.present()) {
bool _moved = wait(rebalanceTeams(self,
DataMovementReason::REBALANCE_OVERUTILIZED_TEAM,
loadedTeam.first.get(),
randomTeam.first.get(),
teamCollectionIndex == 0,
&traceEvent));
moved = _moved;
}
}
}
tr.reset();
} catch (Error& e) {
// Log actor_cancelled because it's not legal to suppress an event that's initialized
traceEvent.errorUnsuppressed(e);
wait(tr.onError(e));
}
traceEvent.detail("Moved", moved);
traceEvent.log();
}
}
ACTOR Future<Void> BgDDValleyFiller(DDQueue* self, int teamCollectionIndex) {
state double rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
state Transaction tr(self->cx);
state double lastRead = 0;
state bool skipCurrentLoop = false;
loop {
state std::pair<Optional<Reference<IDataDistributionTeam>>, bool> randomTeam;
state bool moved = false;
state TraceEvent traceEvent("BgDDValleyFiller_Old", self->distributorId);
traceEvent.suppressFor(5.0).detail("PollingInterval", rebalancePollingInterval).detail("Rebalance", "Disk");
try {
state Future<Void> delayF = delay(rebalancePollingInterval, TaskPriority::DataDistributionLaunch);
if ((now() - lastRead) > SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL) {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<Value> val = wait(tr.get(rebalanceDDIgnoreKey));
lastRead = now();
if (!val.present()) {
// reset loop interval
if (skipCurrentLoop) {
rebalancePollingInterval = SERVER_KNOBS->BG_REBALANCE_POLLING_INTERVAL;
}
skipCurrentLoop = false;
} else {
// NOTE: check special value "" and "on" might written in old version < 7.2
if (val.get().size() > 0 && val.get() != "on"_sr) {
int ddIgnore = BinaryReader::fromStringRef<uint8_t>(val.get(), Unversioned());
skipCurrentLoop = (ddIgnore & DDIgnore::REBALANCE_DISK) > 0;
} else {
skipCurrentLoop = true;
}
}
}
traceEvent.detail("Enabled", !skipCurrentLoop);
wait(delayF);
if (skipCurrentLoop) {
// set loop interval to avoid busy wait here.
rebalancePollingInterval =
std::max(rebalancePollingInterval, SERVER_KNOBS->BG_REBALANCE_SWITCH_CHECK_INTERVAL);
tr.reset();
continue;
}
traceEvent.detail("QueuedRelocations",
self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM]);
if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM] <
SERVER_KNOBS->DD_REBALANCE_PARALLELISM) {
std::pair<Optional<Reference<IDataDistributionTeam>>, bool> _randomTeam =
wait(brokenPromiseToNever(self->teamCollections[teamCollectionIndex].getTeam.getReply(
GetTeamRequest(WantNewServers::True,
WantTrueBest::False,
PreferLowerDiskUtil::False,
TeamMustHaveShards::True))));
randomTeam = _randomTeam;
traceEvent.detail("SourceTeam",
printable(randomTeam.first.map<std::string>(
[](const Reference<IDataDistributionTeam>& team) { return team->getDesc(); })));
if (randomTeam.first.present()) {
std::pair<Optional<Reference<IDataDistributionTeam>>, bool> unloadedTeam =
wait(brokenPromiseToNever(self->teamCollections[teamCollectionIndex].getTeam.getReply(
GetTeamRequest(WantNewServers::True,
WantTrueBest::True,
PreferLowerDiskUtil::True,
TeamMustHaveShards::False))));
traceEvent.detail(
"DestTeam",
printable(unloadedTeam.first.map<std::string>(
[](const Reference<IDataDistributionTeam>& team) { return team->getDesc(); })));
if (unloadedTeam.first.present()) {
bool _moved = wait(rebalanceTeams(self,
DataMovementReason::REBALANCE_UNDERUTILIZED_TEAM,
randomTeam.first.get(),
unloadedTeam.first.get(),
teamCollectionIndex == 0,
&traceEvent));
moved = _moved;
}
}
}
tr.reset();
} catch (Error& e) {
// Log actor_cancelled because it's not legal to suppress an event that's initialized
traceEvent.errorUnsuppressed(e);
wait(tr.onError(e));
}
traceEvent.detail("Moved", moved);
traceEvent.log();
}
}
ACTOR Future<Void> dataDistributionQueue(Database cx,
ACTOR Future<Void> dataDistributionQueue(std::shared_ptr<IDDTxnProcessor> db,
PromiseStream<RelocateShard> output,
FutureStream<RelocateShard> input,
PromiseStream<GetMetricsRequest> getShardMetrics,
@ -2481,7 +2330,7 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
const DDEnabledState* ddEnabledState) {
state DDQueue self(distributorId,
lock,
cx,
db,
teamCollections,
shardsAffectedByTeamFailure,
physicalShardCollection,
@ -2503,15 +2352,12 @@ ACTOR Future<Void> dataDistributionQueue(Database cx,
state Future<Void> launchQueuedWorkTimeout = Never();
for (int i = 0; i < teamCollections.size(); i++) {
// FIXME: Use BgDDLoadBalance for disk rebalance too after DD simulation test proof.
// ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_OVERUTILIZED_TEAM));
// ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_UNDERUTILIZED_TEAM));
ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_OVERUTILIZED_TEAM));
ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_UNDERUTILIZED_TEAM));
if (SERVER_KNOBS->READ_SAMPLING_ENABLED) {
ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_READ_OVERUTIL_TEAM));
ddQueueFutures.push_back(BgDDLoadRebalance(&self, i, DataMovementReason::REBALANCE_READ_UNDERUTIL_TEAM));
}
ddQueueFutures.push_back(BgDDMountainChopper(&self, i));
ddQueueFutures.push_back(BgDDValleyFiller(&self, i));
}
ddQueueFutures.push_back(delayedAsyncVar(self.rawProcessingUnhealthy, processingUnhealthy, 0));
ddQueueFutures.push_back(delayedAsyncVar(self.rawProcessingWiggle, processingWiggle, 0));

View File

@ -3581,7 +3581,7 @@ bool DDTeamCollection::satisfiesPolicy(const std::vector<Reference<TCServerInfo>
return result && resultEntries.size() == 0;
}
DDTeamCollection::DDTeamCollection(Database const& cx,
DDTeamCollection::DDTeamCollection(const std::shared_ptr<IDDTxnProcessor>& db,
UID distributorId,
MoveKeysLock const& lock,
PromiseStream<RelocateShard> const& output,
@ -3597,7 +3597,7 @@ DDTeamCollection::DDTeamCollection(Database const& cx,
PromiseStream<GetMetricsRequest> getShardMetrics,
Promise<UID> removeFailedServer,
PromiseStream<Promise<int>> getUnhealthyRelocationCount)
: doBuildTeams(true), lastBuildTeamsFailed(false), teamBuilder(Void()), lock(lock), output(output),
: db(db), doBuildTeams(true), lastBuildTeamsFailed(false), teamBuilder(Void()), lock(lock), output(output),
unhealthyServers(0), storageWiggler(makeReference<StorageWiggler>(this)), processingWiggle(processingWiggle),
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure),
initialFailureReactionDelay(
@ -3615,8 +3615,13 @@ DDTeamCollection::DDTeamCollection(Database const& cx,
teamCollectionInfoEventHolder(makeReference<EventCacheHolder>("TeamCollectionInfo")),
storageServerRecruitmentEventHolder(
makeReference<EventCacheHolder>("StorageServerRecruitment_" + distributorId.toString())),
primary(primary), distributorId(distributorId), cx(cx), configuration(configuration),
primary(primary), distributorId(distributorId), configuration(configuration),
storageServerSet(new LocalityMap<UID>()) {
if (!db->isMocked()) {
cx = this->db->getDb();
}
if (!primary || configuration.usableRegions == 1) {
TraceEvent("DDTrackerStarting", distributorId)
.detail("State", "Inactive")
@ -5147,13 +5152,13 @@ public:
int processCount) {
Database database = DatabaseContext::create(
makeReference<AsyncVar<ClientDBInfo>>(), Never(), LocalityData(), EnableLocalityLoadBalance::False);
auto txnProcessor = std::shared_ptr<IDDTxnProcessor>(new DDTxnProcessor(database));
DatabaseConfiguration conf;
conf.storageTeamSize = teamSize;
conf.storagePolicy = policy;
auto collection =
std::unique_ptr<DDTeamCollection>(new DDTeamCollection(database,
std::unique_ptr<DDTeamCollection>(new DDTeamCollection(txnProcessor,
UID(0, 0),
MoveKeysLock(),
PromiseStream<RelocateShard>(),
@ -5191,13 +5196,13 @@ public:
int processCount) {
Database database = DatabaseContext::create(
makeReference<AsyncVar<ClientDBInfo>>(), Never(), LocalityData(), EnableLocalityLoadBalance::False);
auto txnProcessor = std::shared_ptr<IDDTxnProcessor>(new DDTxnProcessor(database));
DatabaseConfiguration conf;
conf.storageTeamSize = teamSize;
conf.storagePolicy = policy;
auto collection =
std::unique_ptr<DDTeamCollection>(new DDTeamCollection(database,
std::unique_ptr<DDTeamCollection>(new DDTeamCollection(txnProcessor,
UID(0, 0),
MoveKeysLock(),
PromiseStream<RelocateShard>(),

View File

@ -22,6 +22,7 @@
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbserver/DataDistribution.actor.h"
#include "fdbclient/DatabaseContext.h"
#include "flow/actorcompiler.h" // This must be the last #include.
class DDTxnProcessorImpl {
@ -450,6 +451,20 @@ class DDTxnProcessorImpl {
}
}
}
ACTOR static Future<Optional<Value>> readRebalanceDDIgnoreKey(Database cx) {
state Transaction tr(cx);
loop {
try {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
Optional<Value> res = wait(tr.get(rebalanceDDIgnoreKey));
return res;
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
};
Future<IDDTxnProcessor::SourceServers> DDTxnProcessor::getSourceServersForRange(const KeyRangeRef range) {
@ -493,4 +508,12 @@ Future<bool> DDTxnProcessor::isDataDistributionEnabled(const DDEnabledState* ddE
Future<Void> DDTxnProcessor::pollMoveKeysLock(const MoveKeysLock& lock, const DDEnabledState* ddEnabledState) const {
return DDTxnProcessorImpl::pollMoveKeysLock(cx, lock, ddEnabledState);
}
}
Future<HealthMetrics> DDTxnProcessor::getHealthMetrics(bool detailed) const {
return cx->getHealthMetrics(detailed);
}
Future<Optional<Value>> DDTxnProcessor::readRebalanceDDIgnoreKey() const {
return DDTxnProcessorImpl::readRebalanceDDIgnoreKey(cx);
}

View File

@ -644,7 +644,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
"DDTracker",
self->ddId,
&normalDDQueueErrors()));
actors.push_back(reportErrorsExcept(dataDistributionQueue(cx,
actors.push_back(reportErrorsExcept(dataDistributionQueue(self->txnProcessor,
self->relocationProducer,
self->relocationConsumer.getFuture(),
getShardMetrics,
@ -679,7 +679,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
std::vector<DDTeamCollection*> teamCollectionsPtrs;
primaryTeamCollection = makeReference<DDTeamCollection>(
cx,
self->txnProcessor,
self->ddId,
self->lock,
self->relocationProducer,
@ -700,7 +700,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributor> self,
self->dbInfo, [](auto const& info) { return info.clusterInterface.recruitStorage; });
if (self->configuration.usableRegions > 1) {
remoteTeamCollection =
makeReference<DDTeamCollection>(cx,
makeReference<DDTeamCollection>(self->txnProcessor,
self->ddId,
self->lock,
self->relocationProducer,

View File

@ -18,6 +18,14 @@
* limitations under the License.
*/
#if !defined(_WIN32) && !defined(__APPLE__) && !defined(__INTEL_COMPILER)
#define BOOST_SYSTEM_NO_LIB
#define BOOST_DATE_TIME_NO_LIB
#define BOOST_REGEX_NO_LIB
#include <boost/process.hpp>
#endif
#include <boost/algorithm/string.hpp>
#include "flow/TLSConfig.actor.h"
#include "flow/Trace.h"
#include "flow/Platform.h"
@ -34,14 +42,6 @@
#include "fdbserver/Knobs.h"
#include "fdbserver/RemoteIKeyValueStore.actor.h"
#if !defined(_WIN32) && !defined(__APPLE__) && !defined(__INTEL_COMPILER)
#define BOOST_SYSTEM_NO_LIB
#define BOOST_DATE_TIME_NO_LIB
#define BOOST_REGEX_NO_LIB
#include <boost/process.hpp>
#endif
#include <boost/algorithm/string.hpp>
#include "flow/actorcompiler.h" // This must be the last #include.
ExecCmdValueString::ExecCmdValueString(StringRef pCmdValueString) {

View File

@ -365,7 +365,7 @@ ACTOR Future<Void> pullAsyncData(LogRouterData* self) {
if (!foundMessage) {
ver--; // ver is the next possible version we will get data for
if (ver > self->version.get()) {
if (ver > self->version.get() && ver >= r->popped()) {
wait(waitForVersion(self, ver));
self->version.set(ver);

View File

@ -2172,6 +2172,10 @@ void setupSimulatedSystem(std::vector<Future<Void>>* systemActors,
}
ASSERT(coordinatorAddresses.size() > 0);
// Mark a random majority of the coordinators as protected, so
// we won't accidently kill off a quorum and render the
// cluster unrecoverable.
deterministicRandom()->randomShuffle(coordinatorAddresses);
for (int i = 0; i < (coordinatorAddresses.size() / 2) + 1; i++) {
TraceEvent("ProtectCoordinator")

View File

@ -28,7 +28,6 @@
#include "flow/Trace.h"
#include "flow/flow.h"
#include "flow/Histogram.h"
#include "flow/PriorityMultiLock.actor.h"
#include <limits>
#include <random>
#include "fdbrpc/ContinuousSample.h"
@ -103,6 +102,201 @@ std::string addPrefix(std::string prefix, std::string lines) {
return s;
}
#define PRIORITYMULTILOCK_DEBUG 0
// A multi user lock with a concurrent holder limit where waiters are granted the lock according to
// an integer priority from 0 to maxPriority, inclusive, where higher integers are given priority.
//
// The interface is similar to FlowMutex except that lock holders can drop the lock to release it.
//
// Usage:
// Lock lock = wait(prioritylock.lock(priorityLevel));
// lock.release(); // Explicit release, or
// // let lock and all copies of lock go out of scope to release
class PriorityMultiLock {
public:
// Waiting on the lock returns a Lock, which is really just a Promise<Void>
// Calling release() is not necessary, it exists in case the Lock holder wants to explicitly release
// the Lock before it goes out of scope.
struct Lock {
void release() { promise.send(Void()); }
// This is exposed in case the caller wants to use/copy it directly
Promise<Void> promise;
};
private:
struct Waiter {
Waiter() : queuedTime(now()) {}
Promise<Lock> lockPromise;
double queuedTime;
};
typedef Deque<Waiter> Queue;
#if PRIORITYMULTILOCK_DEBUG
#define prioritylock_printf(...) printf(__VA_ARGS__)
#else
#define prioritylock_printf(...)
#endif
public:
PriorityMultiLock(int concurrency, int maxPriority, int launchLimit = std::numeric_limits<int>::max())
: concurrency(concurrency), available(concurrency), waiting(0), launchLimit(launchLimit) {
waiters.resize(maxPriority + 1);
fRunner = runner(this);
}
~PriorityMultiLock() { prioritylock_printf("destruct"); }
Future<Lock> lock(int priority = 0) {
prioritylock_printf("lock begin %s\n", toString().c_str());
// This shortcut may enable a waiter to jump the line when the releaser loop yields
if (available > 0) {
--available;
Lock p;
addRunner(p);
prioritylock_printf("lock exit immediate %s\n", toString().c_str());
return p;
}
Waiter w;
waiters[priority].push_back(w);
++waiting;
prioritylock_printf("lock exit queued %s\n", toString().c_str());
return w.lockPromise.getFuture();
}
std::string toString() const {
int runnersDone = 0;
for (int i = 0; i < runners.size(); ++i) {
if (runners[i].isReady()) {
++runnersDone;
}
}
std::string s =
format("{ ptr=%p concurrency=%d available=%d running=%d waiting=%d runnersQueue=%d runnersDone=%d ",
this,
concurrency,
available,
concurrency - available,
waiting,
runners.size(),
runnersDone);
for (int i = 0; i < waiters.size(); ++i) {
s += format("p%d_waiters=%u ", i, waiters[i].size());
}
s += "}";
return s;
}
private:
void addRunner(Lock& lock) {
runners.push_back(map(ready(lock.promise.getFuture()), [=](Void) {
prioritylock_printf("Lock released\n");
++available;
if (waiting > 0 || runners.size() > 100) {
release.trigger();
}
return Void();
}));
}
ACTOR static Future<Void> runner(PriorityMultiLock* self) {
state int sinceYield = 0;
state Future<Void> error = self->brokenOnDestruct.getFuture();
state int maxPriority = self->waiters.size() - 1;
// Priority to try to run tasks from next
state int priority = maxPriority;
state Queue* pQueue = &self->waiters[maxPriority];
// Track the number of waiters unlocked at the same priority in a row
state int lastPriorityCount = 0;
loop {
// Cleanup finished runner futures at the front of the runner queue.
while (!self->runners.empty() && self->runners.front().isReady()) {
self->runners.pop_front();
}
// Wait for a runner to release its lock
wait(self->release.onTrigger());
prioritylock_printf("runner wakeup %s\n", self->toString().c_str());
if (++sinceYield == 1000) {
sinceYield = 0;
wait(delay(0));
}
// While there are available slots and there are waiters, launch tasks
while (self->available > 0 && self->waiting > 0) {
prioritylock_printf("Checking priority=%d lastPriorityCount=%d %s\n",
priority,
lastPriorityCount,
self->toString().c_str());
while (!pQueue->empty() && ++lastPriorityCount < self->launchLimit) {
Waiter w = pQueue->front();
pQueue->pop_front();
--self->waiting;
Lock lock;
prioritylock_printf(" Running waiter priority=%d wait=%f %s\n",
priority,
now() - w.queuedTime,
self->toString().c_str());
w.lockPromise.send(lock);
// Self may have been destructed during the lock callback
if (error.isReady()) {
throw error.getError();
}
// If the lock was not already released, add it to the runners future queue
if (lock.promise.canBeSet()) {
self->addRunner(lock);
// A slot has been consumed, so stop reading from this queue if there aren't any more
if (--self->available == 0) {
break;
}
}
}
// If there are no more slots available, then don't move to the next priority
if (self->available == 0) {
break;
}
// Decrease priority, wrapping around to max from 0
if (priority == 0) {
priority = maxPriority;
} else {
--priority;
}
pQueue = &self->waiters[priority];
lastPriorityCount = 0;
}
}
}
int concurrency;
int available;
int waiting;
int launchLimit;
std::vector<Queue> waiters;
Deque<Future<Void>> runners;
Future<Void> fRunner;
AsyncTrigger release;
Promise<Void> brokenOnDestruct;
};
// Some convenience functions for debugging to stringify various structures
// Classes can add compatibility by either specializing toString<T> or implementing
// std::string toString() const;
@ -1471,8 +1665,6 @@ struct RedwoodMetrics {
kvSizeReadByGetRange = Reference<Histogram>(
new Histogram(Reference<HistogramRegistry>(), "kvSize", "ReadByGetRange", Histogram::Unit::bytes));
ioLock = nullptr;
// These histograms are used for Btree events, hence level > 0
unsigned int levelCounter = 0;
for (RedwoodMetrics::Level& level : levels) {
@ -1515,8 +1707,6 @@ struct RedwoodMetrics {
// btree levels and one extra level for non btree level.
Level levels[btreeLevels + 1];
metrics metric;
// pointer to the priority multi lock used in pager
PriorityMultiLock* ioLock;
Reference<Histogram> kvSizeWritten;
Reference<Histogram> kvSizeReadByGet;
@ -1571,12 +1761,9 @@ struct RedwoodMetrics {
// The string is a reasonably well formatted page of information
void getFields(TraceEvent* e, std::string* s = nullptr, bool skipZeroes = false);
void getIOLockFields(TraceEvent* e, std::string* s = nullptr);
std::string toString(bool clearAfter) {
std::string s;
getFields(nullptr, &s);
getIOLockFields(nullptr, &s);
if (clearAfter) {
clear();
@ -1611,7 +1798,6 @@ ACTOR Future<Void> redwoodMetricsLogger() {
double elapsed = now() - g_redwoodMetrics.startTime;
e.detail("Elapsed", elapsed);
g_redwoodMetrics.getFields(&e);
g_redwoodMetrics.getIOLockFields(&e);
g_redwoodMetrics.clear();
}
}
@ -2008,7 +2194,7 @@ public:
bool memoryOnly,
Reference<IPageEncryptionKeyProvider> keyProvider,
Promise<Void> errorPromise = {})
: keyProvider(keyProvider), ioLock(FLOW_KNOBS->MAX_OUTSTANDING, SERVER_KNOBS->REDWOOD_PRIORITY_LAUNCHS),
: keyProvider(keyProvider), ioLock(FLOW_KNOBS->MAX_OUTSTANDING, ioMaxPriority, FLOW_KNOBS->MAX_OUTSTANDING / 2),
pageCacheBytes(pageCacheSizeBytes), desiredPageSize(desiredPageSize), desiredExtentSize(desiredExtentSize),
filename(filename), memoryOnly(memoryOnly), errorPromise(errorPromise),
remapCleanupWindowBytes(remapCleanupWindowBytes), concurrentExtentReads(new FlowLock(concurrentExtentReads)) {
@ -2020,7 +2206,6 @@ public:
// This sets the page cache size for all PageCacheT instances using the same evictor
pageCache.evictor().sizeLimit = pageCacheBytes;
g_redwoodMetrics.ioLock = &ioLock;
if (!g_redwoodMetricsActor.isValid()) {
g_redwoodMetricsActor = redwoodMetricsLogger();
}
@ -7510,7 +7695,8 @@ RedwoodRecordRef VersionedBTree::dbEnd("\xff\xff\xff\xff\xff"_sr);
class KeyValueStoreRedwood : public IKeyValueStore {
public:
KeyValueStoreRedwood(std::string filename, UID logID, Reference<IPageEncryptionKeyProvider> encryptionKeyProvider)
: m_filename(filename), prefetch(SERVER_KNOBS->REDWOOD_KVSTORE_RANGE_PREFETCH) {
: m_filename(filename), m_concurrentReads(SERVER_KNOBS->REDWOOD_KVSTORE_CONCURRENT_READS, 0),
prefetch(SERVER_KNOBS->REDWOOD_KVSTORE_RANGE_PREFETCH) {
int pageSize =
BUGGIFY ? deterministicRandom()->randomInt(1000, 4096 * 4) : SERVER_KNOBS->REDWOOD_DEFAULT_PAGE_SIZE;
@ -7570,8 +7756,6 @@ public:
ACTOR void shutdown(KeyValueStoreRedwood* self, bool dispose) {
TraceEvent(SevInfo, "RedwoodShutdown").detail("Filename", self->m_filename).detail("Dispose", dispose);
g_redwoodMetrics.ioLock = nullptr;
// In simulation, if the instance is being disposed of then sometimes run destructive sanity check.
if (g_network->isSimulated() && dispose && BUGGIFY) {
// Only proceed if the last commit is a success, but don't throw if it's not because shutdown
@ -7672,6 +7856,7 @@ public:
f.get();
} else {
CODE_PROBE(true, "Uncached forward range read seek");
wait(store(lock, self->m_concurrentReads.lock()));
wait(f);
}
@ -7727,6 +7912,7 @@ public:
f.get();
} else {
CODE_PROBE(true, "Uncached reverse range read seek");
wait(store(lock, self->m_concurrentReads.lock()));
wait(f);
}
@ -7793,6 +7979,9 @@ public:
wait(self->m_tree->initBTreeCursor(
&cur, self->m_tree->getLastCommittedVersion(), PagerEventReasons::PointRead, options));
// Not locking for point reads, instead relying on IO priority lock
// state PriorityMultiLock::Lock lock = wait(self->m_concurrentReads.lock());
++g_redwoodMetrics.metric.opGet;
wait(cur.seekGTE(key));
if (cur.isValid() && cur.get().key == key) {
@ -7828,6 +8017,7 @@ private:
Future<Void> m_init;
Promise<Void> m_closed;
Promise<Void> m_error;
PriorityMultiLock m_concurrentReads;
bool prefetch;
Version m_nextCommitVersion;
Reference<IPageEncryptionKeyProvider> m_keyProvider;
@ -8463,43 +8653,6 @@ void RedwoodMetrics::getFields(TraceEvent* e, std::string* s, bool skipZeroes) {
}
}
void RedwoodMetrics::getIOLockFields(TraceEvent* e, std::string* s) {
if (ioLock == nullptr)
return;
int maxPriority = ioLock->maxPriority();
if (e != nullptr) {
e->detail("ActiveReads", ioLock->totalRunners());
e->detail("AwaitReads", ioLock->totalWaiters());
for (int priority = 0; priority <= maxPriority; ++priority) {
e->detail(format("ActiveP%d", priority), ioLock->numRunners(priority));
e->detail(format("AwaitP%d", priority), ioLock->numWaiters(priority));
}
}
if (s != nullptr) {
std::string active = "Active";
std::string await = "Await";
*s += "\n";
*s += format("%-15s %-8u ", "ActiveReads", ioLock->totalRunners());
*s += format("%-15s %-8u ", "AwaitReads", ioLock->totalWaiters());
*s += "\n";
for (int priority = 0; priority <= maxPriority; ++priority) {
*s +=
format("%-15s %-8u ", (active + 'P' + std::to_string(priority)).c_str(), ioLock->numRunners(priority));
}
*s += "\n";
for (int priority = 0; priority <= maxPriority; ++priority) {
*s +=
format("%-15s %-8u ", (await + 'P' + std::to_string(priority)).c_str(), ioLock->numWaiters(priority));
}
}
}
TEST_CASE("/redwood/correctness/unit/RedwoodRecordRef") {
ASSERT(RedwoodRecordRef::Delta::LengthFormatSizes[0] == 3);
ASSERT(RedwoodRecordRef::Delta::LengthFormatSizes[1] == 4);
@ -10977,57 +11130,3 @@ TEST_CASE(":/redwood/performance/histograms") {
return Void();
}
ACTOR Future<Void> waitLockIncrement(PriorityMultiLock* pml, int priority, int* pout) {
state PriorityMultiLock::Lock lock = wait(pml->lock(priority));
wait(delay(deterministicRandom()->random01() * .1));
++*pout;
return Void();
}
TEST_CASE("/redwood/PriorityMultiLock") {
state std::vector<int> priorities = { 10, 20, 40 };
state int concurrency = 25;
state PriorityMultiLock* pml = new PriorityMultiLock(concurrency, priorities);
state std::vector<int> counts;
counts.resize(priorities.size(), 0);
// Clog the lock buy taking concurrency locks at each level
state std::vector<Future<PriorityMultiLock::Lock>> lockFutures;
for (int i = 0; i < priorities.size(); ++i) {
for (int j = 0; j < concurrency; ++j) {
lockFutures.push_back(pml->lock(i));
}
}
// Wait for n = concurrency locks to be acquired
wait(quorum(lockFutures, concurrency));
state std::vector<Future<Void>> futures;
for (int i = 0; i < 10e3; ++i) {
int p = i % priorities.size();
futures.push_back(waitLockIncrement(pml, p, &counts[p]));
}
state Future<Void> f = waitForAll(futures);
// Release the locks
lockFutures.clear();
// Print stats and wait for all futures to be ready
loop {
choose {
when(wait(delay(1))) {
printf("counts: ");
for (auto c : counts) {
printf("%d ", c);
}
printf(" pml: %s\n", pml->toString().c_str());
}
when(wait(f)) { break; }
}
}
delete pml;
return Void();
}

View File

@ -88,7 +88,11 @@
#if defined(__linux__) || defined(__FreeBSD__)
#include <execinfo.h>
#include <signal.h>
#if defined(__linux__)
#include <sys/prctl.h>
#elif defined(__FreeBSD__)
#include <sys/procctl.h>
#endif
#ifdef ALLOC_INSTRUMENTATION
#include <cxxabi.h>
#endif
@ -2396,10 +2400,15 @@ int main(int argc, char* argv[]) {
g_network->getLocalAddress(), opts.rollsize, opts.maxLogsSize, opts.logFolder, "trace", opts.logGroup);
auto m = startSystemMonitor(opts.dataFolder, opts.dcId, opts.zoneId, opts.zoneId);
TraceEvent(SevDebug, "StartingFlowProcess").detail("FlowProcessName", opts.flowProcessName);
#if defined(__linux__) || defined(__FreeBSD__)
#if defined(__linux__)
prctl(PR_SET_PDEATHSIG, SIGTERM);
if (getppid() == 1) /* parent already died before prctl */
flushAndExit(FDB_EXIT_SUCCESS);
#elif defined(__FreeBSD__)
const int sig = SIGTERM;
procctl(P_PID, 0, PROC_PDEATHSIG_CTL, (void*)&sig);
if (getppid() == 1) /* parent already died before procctl */
flushAndExit(FDB_EXIT_SUCCESS);
#endif
if (opts.flowProcessName == "KeyValueStoreProcess") {

View File

@ -38,17 +38,6 @@
#include "flow/actorcompiler.h" // has to be last include
struct GranuleHistory {
KeyRange range;
Version version;
Standalone<BlobGranuleHistoryValue> value;
GranuleHistory() {}
GranuleHistory(KeyRange range, Version version, Standalone<BlobGranuleHistoryValue> value)
: range(range), version(version), value(value) {}
};
// Stores info about a file in blob storage
struct BlobFileIndex {
Version version;

View File

@ -603,6 +603,7 @@ class DDTeamCollection : public ReferenceCounted<DDTeamCollection> {
int addTeamsBestOf(int teamsToBuild, int desiredTeams, int maxTeams);
public:
std::shared_ptr<IDDTxnProcessor> db;
Database cx;
DatabaseConfiguration configuration;
@ -620,7 +621,7 @@ public:
AsyncTrigger printDetailedTeamsInfo;
Reference<LocalitySet> storageServerSet;
DDTeamCollection(Database const& cx,
DDTeamCollection(const std::shared_ptr<IDDTxnProcessor>& db,
UID distributorId,
MoveKeysLock const& lock,
PromiseStream<RelocateShard> const& output,

View File

@ -36,6 +36,8 @@ public:
struct SourceServers {
std::vector<UID> srcServers, completeSources; // the same as RelocateData.src, RelocateData.completeSources;
};
virtual Database getDb() const = 0;
virtual bool isMocked() const = 0;
// get the source server list and complete source server list for range
virtual Future<SourceServers> getSourceServersForRange(const KeyRangeRef range) = 0;
@ -76,6 +78,10 @@ public:
const DDEnabledState* ddEnabledState) const = 0;
virtual Future<Void> moveKeys(const MoveKeysParams& params) const = 0;
virtual Future<HealthMetrics> getHealthMetrics(bool detailed = false) const = 0;
virtual Future<Optional<Value>> readRebalanceDDIgnoreKey() const { return {}; }
};
class DDTxnProcessorImpl;
@ -89,6 +95,9 @@ public:
DDTxnProcessor() = default;
explicit DDTxnProcessor(Database cx) : cx(cx) {}
Database getDb() const override { return cx; };
bool isMocked() const override { return false; };
Future<SourceServers> getSourceServersForRange(const KeyRangeRef range) override;
// Call NativeAPI implementation directly
@ -129,6 +138,10 @@ public:
}
Future<Void> moveKeys(const MoveKeysParams& params) const override { return ::moveKeys(cx, params); }
Future<HealthMetrics> getHealthMetrics(bool detailed) const override;
Future<Optional<Value>> readRebalanceDDIgnoreKey() const override;
};
// A mock transaction implementation for test usage.

View File

@ -248,7 +248,7 @@ FDB_DECLARE_BOOLEAN_PARAM(MoveKeyRangeOutPhysicalShard);
class PhysicalShardCollection : public ReferenceCounted<PhysicalShardCollection> {
public:
PhysicalShardCollection() : requireTransition(false), lastTransitionStartTime(now()) {}
PhysicalShardCollection() : lastTransitionStartTime(now()), requireTransition(false) {}
enum class PhysicalShardCreationTime { DDInit, DDRelocator };
@ -514,14 +514,14 @@ ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> in
bool* trackerCancelled,
Optional<Reference<TenantCache>> ddTenantCache);
ACTOR Future<Void> dataDistributionQueue(Database cx,
ACTOR Future<Void> dataDistributionQueue(std::shared_ptr<IDDTxnProcessor> db,
PromiseStream<RelocateShard> output,
FutureStream<RelocateShard> input,
PromiseStream<GetMetricsRequest> getShardMetrics,
PromiseStream<GetTopKMetricsRequest> getTopKMetrics,
Reference<AsyncVar<bool>> processingUnhealthy,
Reference<AsyncVar<bool>> processingWiggle,
std::vector<TeamCollectionInterface> teamCollection,
std::vector<TeamCollectionInterface> teamCollections,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
Reference<PhysicalShardCollection> physicalShardCollection,
MoveKeysLock lock,

View File

@ -101,8 +101,8 @@ struct NoOptions {};
struct FailureInjectionWorkload : TestWorkload {
FailureInjectionWorkload(WorkloadContext const&);
virtual ~FailureInjectionWorkload() {}
virtual bool add(DeterministicRandom& random, WorkloadRequest const& work, CompoundWorkload const& workload);
virtual void initFailureInjectionMode(DeterministicRandom& random, unsigned count);
virtual void initFailureInjectionMode(DeterministicRandom& random);
virtual bool shouldInject(DeterministicRandom& random, const WorkloadRequest& work, const unsigned count) const;
Future<Void> setupInjectionWorkload(Database const& cx, Future<Void> done);
Future<Void> startInjectionWorkload(Database const& cx, Future<Void> done);
@ -137,6 +137,9 @@ struct CompoundWorkload : TestWorkload {
CompoundWorkload(WorkloadContext& wcx);
CompoundWorkload* add(Reference<TestWorkload>&& w);
void addFailureInjection(WorkloadRequest& work);
bool shouldInjectFailure(DeterministicRandom& random,
const WorkloadRequest& work,
Reference<FailureInjectionWorkload> failureInjection) const;
std::string description() const override;

View File

@ -37,7 +37,6 @@
#include "flow/Error.h"
#include "flow/Hash3.h"
#include "flow/Histogram.h"
#include "flow/PriorityMultiLock.actor.h"
#include "flow/IRandom.h"
#include "flow/IndexedSet.h"
#include "flow/SystemMonitor.h"
@ -1023,9 +1022,6 @@ public:
FlowLock serveFetchCheckpointParallelismLock;
PriorityMultiLock ssLock;
std::vector<int> readPriorityRanks;
int64_t instanceID;
Promise<Void> otherError;
@ -1291,15 +1287,13 @@ public:
changeFeedDiskReadsLock(SERVER_KNOBS->CHANGE_FEED_DISK_READS_PARALLELISM),
fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false),
serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM),
ssLock(SERVER_KNOBS->STORAGE_SERVER_READ_CONCURRENCY, SERVER_KNOBS->STORAGESERVER_READ_PRIORITIES),
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0),
lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()),
busiestWriteTagContext(ssi.id()), counters(this),
storageServerSourceTLogIDEventHolder(
makeReference<EventCacheHolder>(ssi.id().toString() + "/StorageServerSourceTLogID")) {
readPriorityRanks = parseStringToVector<int>(SERVER_KNOBS->STORAGESERVER_READ_RANKS, ',');
ASSERT(readPriorityRanks.size() > (int)ReadType::MAX);
version.initMetric("StorageServer.Version"_sr, counters.cc.id);
oldestVersion.initMetric("StorageServer.OldestVersion"_sr, counters.cc.id);
durableVersion.initMetric("StorageServer.DurableVersion"_sr, counters.cc.id);
@ -1856,7 +1850,6 @@ std::vector<StorageServerShard> StorageServer::getStorageServerShards(KeyRangeRe
ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
state int64_t resultSize = 0;
state PriorityMultiLock::Lock lock;
Span span("SS:getValue"_loc, req.spanContext);
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
@ -1865,8 +1858,6 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
// Temporarily disabled -- this path is hit a lot
// getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
state ReadType type = req.options.present() ? req.options.get().type : ReadType::NORMAL;
try {
++data->counters.getValueQueries;
++data->counters.allQueries;
@ -1877,8 +1868,6 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
// so we need to downgrade here
wait(data->getQueryDelay());
wait(store(lock, data->ssLock.lock(data->readPriorityRanks[(int)type])));
// Track time from requestTime through now as read queueing wait time
state double queueWaitEnd = g_network->timer();
data->counters.readQueueWaitSample.addMeasurement(queueWaitEnd - req.requestTime());
@ -2978,8 +2967,10 @@ ACTOR Future<Void> changeFeedStreamQ(StorageServer* data, ChangeFeedStreamReques
++data->counters.feedStreamQueries;
// FIXME: do something more sophisticated here besides hard limit
if (data->activeFeedQueries >= SERVER_KNOBS->STORAGE_FEED_QUERY_HARD_LIMIT ||
(g_network->isSimulated() && BUGGIFY_WITH_PROB(0.005))) {
// Allow other storage servers fetching feeds to go above this limit. currently, req.canReadPopped == read is a
// fetch from another ss
if (!req.canReadPopped && (data->activeFeedQueries >= SERVER_KNOBS->STORAGE_FEED_QUERY_HARD_LIMIT ||
(g_network->isSimulated() && BUGGIFY_WITH_PROB(0.005)))) {
req.reply.sendError(storage_too_many_feed_streams());
++data->counters.rejectedFeedStreamQueries;
return Void();
@ -3732,7 +3723,6 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
state Span span("SS:getKeyValues"_loc, req.spanContext);
state int64_t resultSize = 0;
state Optional<ReadOptions> options = req.options;
state ReadType type = options.present() ? options.get().type : ReadType::NORMAL;
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
@ -3747,13 +3737,12 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
// so we need to downgrade here
wait(data->getQueryDelay());
if (!SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && type == ReadType::FETCH) {
type = ReadType::NORMAL;
if (SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && req.options.present() && req.options.get().type == ReadType::FETCH) {
wait(delay(0, TaskPriority::FetchKeys));
} else {
wait(data->getQueryDelay());
}
state PriorityMultiLock::Lock lock = wait(data->ssLock.lock(data->readPriorityRanks[(int)type]));
// Track time from requestTime through now as read queueing wait time
state double queueWaitEnd = g_network->timer();
data->counters.readQueueWaitSample.addMeasurement(queueWaitEnd - req.requestTime());
@ -4478,7 +4467,6 @@ ACTOR Future<Void> getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe
state Span span("SS:getMappedKeyValues"_loc, req.spanContext);
state int64_t resultSize = 0;
state Optional<ReadOptions> options = req.options;
state ReadType type = options.present() ? options.get().type : ReadType::NORMAL;
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
@ -4493,13 +4481,12 @@ ACTOR Future<Void> getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
// so we need to downgrade here
wait(data->getQueryDelay());
if (!SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && type == ReadType::FETCH) {
type = ReadType::NORMAL;
if (SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && req.options.present() && req.options.get().type == ReadType::FETCH) {
wait(delay(0, TaskPriority::FetchKeys));
} else {
wait(data->getQueryDelay());
}
state PriorityMultiLock::Lock lock = wait(data->ssLock.lock(data->readPriorityRanks[(int)type]));
// Track time from requestTime through now as read queueing wait time
state double queueWaitEnd = g_network->timer();
data->counters.readQueueWaitSample.addMeasurement(queueWaitEnd - req.requestTime());
@ -4696,7 +4683,6 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
state Span span("SS:getKeyValuesStream"_loc, req.spanContext);
state int64_t resultSize = 0;
state Optional<ReadOptions> options = req.options;
state ReadType type = options.present() ? options.get().type : ReadType::NORMAL;
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
@ -4710,14 +4696,12 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
// so we need to downgrade here
wait(delay(0, TaskPriority::DefaultEndpoint));
if (!SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && type == ReadType::FETCH) {
type = ReadType::NORMAL;
if (SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && req.options.present() && req.options.get().type == ReadType::FETCH) {
wait(delay(0, TaskPriority::FetchKeys));
} else {
wait(delay(0, TaskPriority::DefaultEndpoint));
}
state int readPriority = data->readPriorityRanks[(int)type];
state PriorityMultiLock::Lock lock = wait(data->ssLock.lock(readPriority));
try {
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent(
@ -4888,8 +4872,12 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
end = lastKey;
}
lock.release();
wait(store(lock, data->ssLock.lock(readPriority)));
if (SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && req.options.present() &&
req.options.get().type == ReadType::FETCH) {
wait(delay(0, TaskPriority::FetchKeys));
} else {
wait(delay(0, TaskPriority::DefaultEndpoint));
}
data->transactionTagCounter.addRequest(req.tags, resultSize);
}
@ -4910,19 +4898,14 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
ACTOR Future<Void> getKeyQ(StorageServer* data, GetKeyRequest req) {
state Span span("SS:getKey"_loc, req.spanContext);
state PriorityMultiLock::Lock lock;
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
}
state int64_t resultSize = 0;
state ReadOptions options;
state ReadType type = ReadType::NORMAL;
if (req.options.present()) {
options = req.options.get();
type = options.type;
}
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID;
++data->counters.getKeyQueries;
@ -4934,8 +4917,6 @@ ACTOR Future<Void> getKeyQ(StorageServer* data, GetKeyRequest req) {
// so we need to downgrade here
wait(data->getQueryDelay());
wait(store(lock, data->ssLock.lock(data->readPriorityRanks[(int)type])));
// Track time from requestTime through now as read queueing wait time
state double queueWaitEnd = g_network->timer();
data->counters.readQueueWaitSample.addMeasurement(queueWaitEnd - req.requestTime());
@ -6490,8 +6471,11 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
state int debug_nextRetryToLog = 1;
state Error lastError;
// TODO: update to FETCH once the priority multi lock is used.
// leaving the readtype off for now to prevent data fetches stall under heavy load
// it is used to inform the storage that the rangeRead is for Fetch
state ReadOptions options = ReadOptions(fetchKeysID, ReadType::FETCH);
// state ReadOptions options = ReadOptions(Optional<UID>(), ReadType::FETCH);
state ReadOptions options = ReadOptions(Optional<UID>(), ReadType::NORMAL);
// FIXME: The client cache does not notice when servers are added to a team. To read from a local storage server
// we must refresh the cache manually.
@ -9842,21 +9826,6 @@ ACTOR Future<Void> metricsCore(StorageServer* self, StorageServerInterface ssi)
[self = self](TraceEvent& te) {
te.detail("StorageEngine", self->storage.getKeyValueStoreType().toString());
te.detail("Tag", self->tag.toString());
std::vector<int> rpr = self->readPriorityRanks;
te.detail("ReadsActive", self->ssLock.totalRunners());
te.detail("ReadsWaiting", self->ssLock.totalWaiters());
int type = (int)ReadType::FETCH;
te.detail("ReadFetchActive", self->ssLock.numRunners(rpr[type]));
te.detail("ReadFetchWaiting", self->ssLock.numWaiters(rpr[type]));
type = (int)ReadType::LOW;
te.detail("ReadLowActive", self->ssLock.numRunners(rpr[type]));
te.detail("ReadLowWaiting", self->ssLock.numWaiters(rpr[type]));
type = (int)ReadType::NORMAL;
te.detail("ReadNormalActive", self->ssLock.numRunners(rpr[type]));
te.detail("ReadNormalWaiting", self->ssLock.numWaiters(rpr[type]));
type = (int)ReadType::HIGH;
te.detail("ReadHighActive", self->ssLock.numRunners(rpr[type]));
te.detail("ReadHighWaiting", self->ssLock.numWaiters(rpr[type]));
StorageBytes sb = self->storage.getStorageBytes();
te.detail("KvstoreBytesUsed", sb.used);
te.detail("KvstoreBytesFree", sb.free);
@ -10713,9 +10682,6 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
// If the storage server dies while something that uses self is still on the stack,
// we want that actor to complete before we terminate and that memory goes out of scope
self.ssLock.kill();
state Error err = e;
if (storageServerTerminated(self, persistentData, err)) {
ssCore.cancel();
@ -10836,9 +10802,6 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
throw internal_error();
} catch (Error& e) {
self.ssLock.kill();
if (self.byteSampleRecovery.isValid()) {
self.byteSampleRecovery.cancel();
}

View File

@ -399,13 +399,25 @@ void CompoundWorkload::addFailureInjection(WorkloadRequest& work) {
if (disabledWorkloads.count(workload->description()) > 0) {
continue;
}
while (workload->add(random, work, *this)) {
while (shouldInjectFailure(random, work, workload)) {
workload->initFailureInjectionMode(random);
failureInjection.push_back(workload);
workload = factory->create(*this);
}
}
}
bool CompoundWorkload::shouldInjectFailure(DeterministicRandom& random,
const WorkloadRequest& work,
Reference<FailureInjectionWorkload> failure) const {
auto desc = failure->description();
unsigned alreadyAdded =
std::count_if(workloads.begin(), workloads.end(), [&desc](auto const& w) { return w->description() == desc; });
alreadyAdded += std::count_if(
failureInjection.begin(), failureInjection.end(), [&desc](auto const& w) { return w->description() == desc; });
return failure->shouldInject(random, work, alreadyAdded);
}
Future<std::vector<PerfMetric>> CompoundWorkload::getMetrics() {
return getMetricsCompoundWorkload(this);
}
@ -425,24 +437,13 @@ void TestWorkload::disableFailureInjectionWorkloads(std::set<std::string>& out)
FailureInjectionWorkload::FailureInjectionWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {}
bool FailureInjectionWorkload::add(DeterministicRandom& random,
const WorkloadRequest& work,
const CompoundWorkload& workload) {
auto desc = description();
unsigned alreadyAdded = std::count_if(workload.workloads.begin(), workload.workloads.end(), [&desc](auto const& w) {
return w->description() == desc;
});
alreadyAdded += std::count_if(workload.failureInjection.begin(),
workload.failureInjection.end(),
[&desc](auto const& w) { return w->description() == desc; });
bool willAdd = alreadyAdded < 3 && work.useDatabase && 0.1 / (1 + alreadyAdded) > random.random01();
if (willAdd) {
initFailureInjectionMode(random, alreadyAdded);
}
return willAdd;
}
void FailureInjectionWorkload::initFailureInjectionMode(DeterministicRandom& random) {}
void FailureInjectionWorkload::initFailureInjectionMode(DeterministicRandom& random, unsigned count) {}
bool FailureInjectionWorkload::shouldInject(DeterministicRandom& random,
const WorkloadRequest& work,
const unsigned alreadyAdded) const {
return alreadyAdded < 3 && work.useDatabase && 0.1 / (1 + alreadyAdded) > random.random01();
}
Future<Void> FailureInjectionWorkload::setupInjectionWorkload(const Database& cx, Future<Void> done) {
return holdWhile(this->setup(cx), done);

View File

@ -129,7 +129,13 @@ struct ThreadData : ReferenceCounted<ThreadData>, NonCopyable {
}
// TODO could make keys variable length?
Key getKey(uint32_t key, uint32_t id) { return Tuple().append((int64_t)key).append((int64_t)id).pack(); }
Key getKey(uint32_t key, uint32_t id) {
std::stringstream ss;
ss << std::setw(32) << std::setfill('0') << id;
Standalone<StringRef> str(ss.str());
Tuple::UserTypeStr udt(0x41, str);
return Tuple::makeTuple((int64_t)key, udt).pack();
}
void validateGranuleBoundary(Key k, Key e, Key lastKey) {
if (k == allKeys.begin || k == allKeys.end) {
@ -138,11 +144,11 @@ struct ThreadData : ReferenceCounted<ThreadData>, NonCopyable {
// Fully formed tuples are inserted. The expectation is boundaries should be a
// sub-tuple of the inserted key.
Tuple t = Tuple::unpack(k, true);
Tuple t = Tuple::unpackUserType(k, true);
if (SERVER_KNOBS->BG_KEY_TUPLE_TRUNCATE_OFFSET) {
Tuple t2;
try {
t2 = Tuple::unpack(lastKey);
t2 = Tuple::unpackUserType(lastKey);
} catch (Error& e) {
// Ignore being unable to parse lastKey as it may be a dummy key.
}

View File

@ -1218,7 +1218,7 @@ struct BlobGranuleVerifierWorkload : TestWorkload {
}
Future<bool> check(Database const& cx) override {
if (clientId == 0 || !doForcePurge) {
if (clientId == 0 || (!doForcePurge && !purgeAtLatest)) {
return _check(cx, this);
}
return true;

View File

@ -1109,6 +1109,7 @@ struct ConsistencyCheckWorkload : TestWorkload {
}
if (foundExtraDataStore) {
wait(delay(10)); // let the cluster get to fully_recovered after the reboot before retrying
self->testFailure("Extra data stores present on workers");
return false;
}

View File

@ -63,6 +63,8 @@ struct DataLossRecoveryWorkload : TestWorkload {
Future<Void> setup(Database const& cx) override { return Void(); }
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override { out.insert("MoveKeysWorkload"); }
Future<Void> start(Database const& cx) override {
if (!enabled) {
return Void();

View File

@ -66,7 +66,7 @@ struct DiskFailureInjectionWorkload : FailureInjectionWorkload {
periodicBroadcastInterval = getOption(options, "periodicBroadcastInterval"_sr, periodicBroadcastInterval);
}
void initFailureInjectionMode(DeterministicRandom& random, unsigned count) override { enabled = clientId == 0; }
void initFailureInjectionMode(DeterministicRandom& random) override { enabled = clientId == 0; }
std::string description() const override {
if (g_simulator == g_network)

View File

@ -112,26 +112,10 @@ struct MachineAttritionWorkload : FailureInjectionWorkload {
allowFaultInjection = getOption(options, "allowFaultInjection"_sr, allowFaultInjection);
}
bool add(DeterministicRandom& random, WorkloadRequest const& work, CompoundWorkload const& workload) override {
auto desc = this->description();
unsigned alreadyAdded = std::count_if(workload.workloads.begin(),
workload.workloads.end(),
[&desc](auto const& w) { return w->description() == desc; });
alreadyAdded += std::count_if(workload.failureInjection.begin(),
workload.failureInjection.end(),
[&desc](auto const& w) { return w->description() == desc; });
auto res = work.useDatabase && random.random01() < 1.0 / (2.0 + alreadyAdded);
if (res) {
initializeForInjection(random);
}
TraceEvent("AddingFailureInjection")
.detail("Reboot", reboot)
.detail("Replacement", replacement)
.detail("AllowFaultInjection", allowFaultInjection)
.detail("KillDC", killDc)
.detail("KillDataHall", killDatahall)
.detail("KillZone", killZone);
return res;
bool shouldInject(DeterministicRandom& random,
const WorkloadRequest& work,
const unsigned alreadyAdded) const override {
return work.useDatabase && random.random01() < 1.0 / (2.0 + alreadyAdded);
}
void initializeForInjection(DeterministicRandom& random) {
@ -152,6 +136,13 @@ struct MachineAttritionWorkload : FailureInjectionWorkload {
killDatahall = dataHalls.size() > 0 && killDc && random.random01() < 0.5;
killZone = zones.size() > 0 && random.random01() < 0.2;
}
TraceEvent("AddingFailureInjection")
.detail("Reboot", reboot)
.detail("Replacement", replacement)
.detail("AllowFaultInjection", allowFaultInjection)
.detail("KillDC", killDc)
.detail("KillDataHall", killDatahall)
.detail("KillZone", killZone);
}
static std::vector<ISimulator::ProcessInfo*> getServers() {

View File

@ -70,6 +70,8 @@ struct PhysicalShardMoveWorkLoad : TestWorkload {
return _start(this, cx);
}
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override { out.insert("MoveKeysWorkload"); }
ACTOR Future<Void> _start(PhysicalShardMoveWorkLoad* self, Database cx) {
int ignore = wait(setDDMode(cx, 0));
state std::map<Key, Value> kvs({ { "TestKeyA"_sr, "TestValueA"_sr },

View File

@ -43,23 +43,18 @@ struct RandomCloggingWorkload : FailureInjectionWorkload {
swizzleClog = getOption(options, "swizzle"_sr, swizzleClog);
}
bool add(DeterministicRandom& random, WorkloadRequest const& work, CompoundWorkload const& workload) override {
auto desc = description();
unsigned alreadyAdded = std::count_if(workload.workloads.begin(),
workload.workloads.end(),
[&desc](auto const& w) { return w->description() == desc; });
alreadyAdded += std::count_if(workload.failureInjection.begin(),
workload.failureInjection.end(),
[&desc](auto const& w) { return w->description() == desc; });
bool willAdd = work.useDatabase && 0.25 / (1 + alreadyAdded) > random.random01();
if (willAdd) {
enabled = this->clientId == 0;
scale = std::max(random.random01(), 0.1);
clogginess = std::max(random.random01(), 0.1);
swizzleClog = random.random01() < 0.3;
iterate = random.random01() < 0.5;
}
return willAdd;
bool shouldInject(DeterministicRandom& random,
const WorkloadRequest& work,
const unsigned alreadyAdded) const override {
return work.useDatabase && 0.25 / (1 + alreadyAdded) > random.random01();
}
void initFailureInjectionMode(DeterministicRandom& random) override {
enabled = this->clientId == 0;
scale = std::max(random.random01(), 0.1);
clogginess = std::max(random.random01(), 0.1);
swizzleClog = random.random01() < 0.3;
iterate = random.random01() < 0.5;
}
std::string description() const override {

View File

@ -27,6 +27,7 @@
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/QuietDatabase.h"
#include "flow/DeterministicRandom.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct MoveKeysWorkload : FailureInjectionWorkload {
@ -50,6 +51,12 @@ struct MoveKeysWorkload : FailureInjectionWorkload {
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(Database const& cx) override { return _start(cx, this); }
bool shouldInject(DeterministicRandom& random,
const WorkloadRequest& work,
const unsigned alreadyAdded) const override {
return alreadyAdded < 1 && work.useDatabase && 0.1 / (1 + alreadyAdded) > random.random01();
}
ACTOR Future<Void> _start(Database cx, MoveKeysWorkload* self) {
if (self->enabled) {
// Get the database configuration so as to use proper team size

View File

@ -22,7 +22,6 @@
#include <utility>
#include <vector>
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/ContinuousSample.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
@ -378,8 +377,6 @@ struct ReadWriteWorkload : ReadWriteCommon {
bool adjacentReads; // keys are adjacent within a transaction
bool adjacentWrites;
int extraReadConflictRangesPerTransaction, extraWriteConflictRangesPerTransaction;
int readType;
bool cacheResult;
Optional<Key> transactionTag;
int transactionsTagThrottled{ 0 };
@ -402,8 +399,6 @@ struct ReadWriteWorkload : ReadWriteCommon {
rampUpConcurrency = getOption(options, "rampUpConcurrency"_sr, false);
batchPriority = getOption(options, "batchPriority"_sr, false);
descriptionString = getOption(options, "description"_sr, "ReadWrite"_sr);
readType = getOption(options, "readType"_sr, 3);
cacheResult = getOption(options, "cacheResult"_sr, true);
if (hasOption(options, "transactionTag"_sr)) {
transactionTag = getOption(options, "transactionTag"_sr, ""_sr);
}
@ -433,10 +428,6 @@ struct ReadWriteWorkload : ReadWriteCommon {
if (transactionTag.present() && tr.getTags().size() == 0) {
tr.setOption(FDBTransactionOptions::AUTO_THROTTLE_TAG, transactionTag.get());
}
ReadOptions options;
options.type = static_cast<ReadType>(readType);
options.cacheResult = cacheResult;
tr.getTransaction().trState->readOptions = options;
}
std::string description() const override { return descriptionString.toString(); }
@ -512,6 +503,7 @@ struct ReadWriteWorkload : ReadWriteCommon {
state double startTime = now();
loop {
state Transaction tr(cx);
try {
self->setupTransaction(tr);
wait(self->readOp(&tr, keys, self, false));

View File

@ -47,7 +47,7 @@ struct RollbackWorkload : FailureInjectionWorkload {
multiple = getOption(options, "multiple"_sr, multiple);
}
void initFailureInjectionMode(DeterministicRandom& random, unsigned count) override {
void initFailureInjectionMode(DeterministicRandom& random) override {
enabled = clientId == 0;
multiple = random.coinflip();
enableFailures = random.random01() < 0.2;
@ -102,11 +102,12 @@ struct RollbackWorkload : FailureInjectionWorkload {
wait(delay(self->clogDuration / 3));
system = self->dbInfo->get();
// Kill the proxy and clog the unclogged tlog
if (self->enableFailures) {
g_simulator->killProcess(g_simulator->getProcessByAddress(proxy.address()), ISimulator::KillInstantly);
// Reboot the proxy and clog the unclogged tlog.
g_simulator->rebootProcess(g_simulator->getProcessByAddress(proxy.address()), ISimulator::Reboot);
g_simulator->clogInterface(uncloggedTLog.ip, self->clogDuration, ClogAll);
} else {
// Alternatively, if we're not injecting machine failures, clog the proxy and the unclogged tlog.
g_simulator->clogInterface(proxy.address().ip, self->clogDuration, ClogAll);
g_simulator->clogInterface(uncloggedTLog.ip, self->clogDuration, ClogAll);
}

View File

@ -67,6 +67,8 @@ struct SSCheckpointRestoreWorkload : TestWorkload {
return _start(this, cx);
}
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override { out.insert("MoveKeysWorkload"); }
ACTOR Future<Void> _start(SSCheckpointRestoreWorkload* self, Database cx) {
state Key key = "TestKey"_sr;
state Key endKey = "TestKey0"_sr;

View File

@ -3729,6 +3729,7 @@ void registerCrashHandler() {
sigaction(SIGBUS, &action, nullptr);
sigaction(SIGUSR2, &action, nullptr);
sigaction(SIGTERM, &action, nullptr);
sigaction(SIGABRT, &action, nullptr);
#else
// No crash handler for other platforms!
#endif

View File

@ -81,11 +81,11 @@ set(FDB_PV_NETWORK_ADDRESS_HOSTNAME_FLAG "0x0FDB00B071010000LL")
set(FDB_PV_STORAGE_METADATA "0x0FDB00B071010000LL")
set(FDB_PV_PERPETUAL_WIGGLE_METADATA "0x0FDB00B071010000LL")
set(FDB_PV_STORAGE_INTERFACE_READINESS "0x0FDB00B071010000LL")
set(FDB_PV_RESOLVER_PRIVATE_MUTATIONS "0x0FDB00B071010000LL")
set(FDB_PV_TENANTS "0x0FDB00B071010000LL")
set(FDB_PV_RESOLVER_PRIVATE_MUTATIONS "0x0FDB00B072000000LL")
set(FDB_PV_OTEL_SPAN_CONTEXT "0x0FDB00B072000000LL")
set(FDB_PV_SW_VERSION_TRACKING "0x0FDB00B072000000LL")
set(FDB_PV_ENCRYPTION_AT_REST "0x0FDB00B072000000LL")
set(FDB_PV_SHARD_ENCODE_LOCATION_METADATA "0x0FDB00B072000000LL")
set(FDB_PV_TENANTS "0x0FDB00B072000000LL")
set(FDB_PV_BLOB_GRANULE_FILE "0x0FDB00B072000000LL")
set(FDB_PV_CLUSTER_ID_SPECIAL_KEY "0x0FDB00B072000000LL")
set(FDB_PV_CLUSTER_ID_SPECIAL_KEY "0x0FDB00B072000000LL")

View File

@ -678,7 +678,7 @@ inline static void flushOutputStreams() {
#error Missing symbol export
#endif
#define crashAndDie() (*(volatile int*)0 = 0)
#define crashAndDie() std::abort()
#ifdef _WIN32
#define strcasecmp stricmp

View File

@ -1,358 +0,0 @@
/*
* PriorityMultiLock.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source
// version.
#if defined(NO_INTELLISENSE) && !defined(FLOW_PRIORITYMULTILOCK_ACTOR_G_H)
#define FLOW_PRIORITYMULTILOCK_ACTOR_G_H
#include "flow/PriorityMultiLock.actor.g.h"
#elif !defined(PRIORITYMULTILOCK_ACTOR_H)
#define PRIORITYMULTILOCK_ACTOR_H
#include "flow/flow.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#define PRIORITYMULTILOCK_DEBUG 0
#if PRIORITYMULTILOCK_DEBUG
#define pml_debug_printf(...) printf(__VA_ARGS__)
#else
#define pml_debug_printf(...)
#endif
// A multi user lock with a concurrent holder limit where waiters request a lock with a priority
// id and are granted locks based on a total concurrency and relative importants of the priority
// ids defined.
//
// Scheduling logic
// launchLimits[n] = configured amount from the launchLimit vector for priority n
// waiters[n] = the number of waiters for priority n
// runnerCounts[n] = number of runners at priority n
//
// totalActiveLaunchLimits = sum of limits for all priorities with waiters
// When waiters[n] becomes == 0, totalActiveLaunchLimits -= launchLimits[n]
// When waiters[n] becomes > 0, totalActiveLaunchLimits += launchLimits[n]
//
// The total capacity of a priority to be considered when launching tasks is
// ceil(launchLimits[n] / totalLimits * concurrency)
//
// The interface is similar to FlowMutex except that lock holders can just drop the lock to release it.
//
// Usage:
// Lock lock = wait(prioritylock.lock(priorityLevel));
// lock.release(); // Explicit release, or
// // let lock and all copies of lock go out of scope to release
class PriorityMultiLock {
public:
// Waiting on the lock returns a Lock, which is really just a Promise<Void>
// Calling release() is not necessary, it exists in case the Lock holder wants to explicitly release
// the Lock before it goes out of scope.
struct Lock {
void release() { promise.send(Void()); }
// This is exposed in case the caller wants to use/copy it directly
Promise<Void> promise;
};
PriorityMultiLock(int concurrency, std::string launchLimits)
: PriorityMultiLock(concurrency, parseStringToVector<int>(launchLimits, ',')) {}
PriorityMultiLock(int concurrency, std::vector<int> launchLimitsByPriority)
: concurrency(concurrency), available(concurrency), waiting(0), totalActiveLaunchLimits(0) {
priorities.resize(launchLimitsByPriority.size());
for (int i = 0; i < priorities.size(); ++i) {
priorities[i].launchLimit = launchLimitsByPriority[i];
}
fRunner = runner(this);
}
~PriorityMultiLock() {}
Future<Lock> lock(int priority = 0) {
Priority& p = priorities[priority];
Queue& q = p.queue;
Waiter w;
// If this priority currently has no waiters
if (q.empty()) {
// Add this priority's launch limit to totalLimits
totalActiveLaunchLimits += p.launchLimit;
// If there are slots available and the priority has capacity then don't make the caller wait
if (available > 0 && p.runners < currentCapacity(p.launchLimit)) {
// Remove this priority's launch limit from the total since it will remain empty
totalActiveLaunchLimits -= p.launchLimit;
// Return a Lock to the caller
Lock lock;
addRunner(lock, &p);
pml_debug_printf("lock nowait line %d priority %d %s\n", __LINE__, priority, toString().c_str());
return lock;
}
}
q.push_back(w);
++waiting;
pml_debug_printf("lock wait line %d priority %d %s\n", __LINE__, priority, toString().c_str());
return w.lockPromise.getFuture();
}
void kill() {
for (int i = 0; i < runners.size(); ++i) {
if (!runners[i].isReady()) {
runners[i].cancel();
}
}
runners.clear();
brokenOnDestruct.sendError(broken_promise());
waiting = 0;
priorities.clear();
}
std::string toString() const {
int runnersDone = 0;
for (int i = 0; i < runners.size(); ++i) {
if (runners[i].isReady()) {
++runnersDone;
}
}
std::string s =
format("{ ptr=%p concurrency=%d available=%d running=%d waiting=%d runnersQueue=%d runnersDone=%d ",
this,
concurrency,
available,
concurrency - available,
waiting,
runners.size(),
runnersDone);
for (int i = 0; i < priorities.size(); ++i) {
s += format("p%d:{%s} ", i, priorities[i].toString(this).c_str());
}
s += "}";
return s;
}
int maxPriority() const { return priorities.size() - 1; }
int totalWaiters() const { return waiting; }
int numWaiters(const unsigned int priority) const {
ASSERT(priority < priorities.size());
return priorities[priority].queue.size();
}
int totalRunners() const { return concurrency - available; }
int numRunners(const unsigned int priority) const {
ASSERT(priority < priorities.size());
return priorities[priority].runners;
}
private:
struct Waiter {
Waiter() : queuedTime(now()) {}
Promise<Lock> lockPromise;
double queuedTime;
};
// Total execution slots allowed across all priorities
int concurrency;
// Current available execution slots
int available;
// Total waiters across all priorities
int waiting;
// Sum of launch limits for all priorities with 1 or more waiters
int totalActiveLaunchLimits;
typedef Deque<Waiter> Queue;
struct Priority {
Priority() : runners(0), launchLimit(0) {}
// Queue of waiters at this priority
Queue queue;
// Number of runners at this priority
int runners;
// Configured launch limit for this priority
int launchLimit;
std::string toString(const PriorityMultiLock* pml) const {
return format("limit=%d run=%d wait=%d cap=%d",
launchLimit,
runners,
queue.size(),
queue.empty() ? 0 : pml->currentCapacity(launchLimit));
}
};
std::vector<Priority> priorities;
// Current or recent (ended) runners
Deque<Future<Void>> runners;
Future<Void> fRunner;
AsyncTrigger wakeRunner;
Promise<Void> brokenOnDestruct;
ACTOR static Future<Void> handleRelease(PriorityMultiLock* self, Future<Void> f, Priority* priority) {
try {
wait(f);
} catch (Error& e) {
}
++self->available;
priority->runners -= 1;
pml_debug_printf("lock release line %d priority %d %s\n",
__LINE__,
(int)(priority - &self->priorities.front()),
self->toString().c_str());
// If there are any waiters or if the runners array is getting large, trigger the runner loop
if (self->waiting > 0 || self->runners.size() > 1000) {
self->wakeRunner.trigger();
}
return Void();
}
void addRunner(Lock& lock, Priority* p) {
p->runners += 1;
--available;
runners.push_back(handleRelease(this, lock.promise.getFuture(), p));
}
// Current maximum running tasks for the specified priority, which must have waiters
// or the result is undefined
int currentCapacity(int launchLimit) const {
// The total concurrency allowed for this priority at present is the total concurrency times
// priority's launch limit divided by the total launch limits for all priorities with waiters.
return ceil((float)launchLimit / totalActiveLaunchLimits * concurrency);
}
ACTOR static Future<Void> runner(PriorityMultiLock* self) {
state int sinceYield = 0;
state Future<Void> error = self->brokenOnDestruct.getFuture();
// Priority to try to run tasks from next
state int priority = 0;
loop {
pml_debug_printf(
"runner loop start line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
// Cleanup finished runner futures at the front of the runner queue.
while (!self->runners.empty() && self->runners.front().isReady()) {
self->runners.pop_front();
}
// Wait for a runner to release its lock
pml_debug_printf(
"runner loop waitTrigger line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
wait(self->wakeRunner.onTrigger());
pml_debug_printf(
"runner loop wake line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
if (++sinceYield == 100) {
sinceYield = 0;
pml_debug_printf(
" runner waitDelay line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
wait(delay(0));
pml_debug_printf(
" runner afterDelay line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
}
// While there are available slots and there are waiters, launch tasks
while (self->available > 0 && self->waiting > 0) {
pml_debug_printf(
" launch loop start line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
Priority* pPriority;
// Find the next priority with waiters and capacity. There must be at least one.
loop {
// Rotate to next priority
if (++priority == self->priorities.size()) {
priority = 0;
}
pPriority = &self->priorities[priority];
pml_debug_printf(" launch loop scan line %d priority=%d %s\n",
__LINE__,
priority,
self->toString().c_str());
if (!pPriority->queue.empty() &&
pPriority->runners < self->currentCapacity(pPriority->launchLimit)) {
break;
}
}
Queue& queue = pPriority->queue;
Waiter w = queue.front();
queue.pop_front();
// If this priority is now empty, subtract its launch limit from totalLimits
if (queue.empty()) {
self->totalActiveLaunchLimits -= pPriority->launchLimit;
pml_debug_printf(" emptied priority line %d priority=%d %s\n",
__LINE__,
priority,
self->toString().c_str());
}
--self->waiting;
Lock lock;
w.lockPromise.send(lock);
// Self may have been destructed during the lock callback
if (error.isReady()) {
throw error.getError();
}
// If the lock was not already released, add it to the runners future queue
if (lock.promise.canBeSet()) {
self->addRunner(lock, pPriority);
}
pml_debug_printf(" launched line %d alreadyDone=%d priority=%d %s\n",
__LINE__,
!lock.promise.canBeSet(),
priority,
self->toString().c_str());
}
}
}
};
#include "flow/unactorcompiler.h"
#endif

View File

@ -100,21 +100,6 @@ T sorted(T range) {
return range;
}
template <class T>
std::vector<T> parseStringToVector(std::string str, char delim) {
std::vector<T> result;
std::stringstream stream(str);
std::string token;
while (stream.good()) {
getline(stream, token, delim);
std::istringstream tokenStream(token);
T item;
tokenStream >> item;
result.push_back(item);
}
return result;
}
template <class T>
ErrorOr<T> errorOr(T t) {
return ErrorOr<T>(t);

View File

@ -1,87 +0,0 @@
/*
* BenchStream.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "benchmark/benchmark.h"
#include "flow/flow.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/PriorityMultiLock.actor.h"
#include <deque>
#include "flow/actorcompiler.h" // This must be the last #include.
ACTOR static Future<Void> benchPriorityMultiLock(benchmark::State* benchState) {
state std::vector<int> priorities;
// Set up priority list with limits 10, 20, 30, ...
while (priorities.size() < benchState->range(0)) {
priorities.push_back(10 * (priorities.size() + 1));
}
state int concurrency = priorities.size() * 10;
state PriorityMultiLock* pml = new PriorityMultiLock(concurrency, priorities);
state std::vector<int> counts;
counts.resize(priorities.size(), 0);
// Clog the lock buy taking concurrency locks
state std::deque<Future<PriorityMultiLock::Lock>> lockFutures;
for (int j = 0; j < concurrency; ++j) {
lockFutures.push_back(pml->lock(j % priorities.size()));
}
// Wait for all of the initial locks to be taken
// This will work regardless of their priorities as there are only n = concurrency of them
wait(waitForAll(std::vector<Future<PriorityMultiLock::Lock>>(lockFutures.begin(), lockFutures.end())));
// For each iteration of the loop, one new lock user is created, for a total of
// concurrency + 1 users. The new user replaces an old one, which is then waited
// on. This will succeed regardless of the lock priorities used because prior to
// new user there were only n = concurrency users so they will all be served before
// the new user.
state int p = 0;
state int i = 0;
while (benchState->KeepRunning()) {
// Get and replace the i'th lock future with a new lock waiter
Future<PriorityMultiLock::Lock> f = lockFutures[i];
lockFutures[i] = pml->lock(p);
PriorityMultiLock::Lock lock = wait(f);
// Rotate to another priority
if (++p == priorities.size()) {
p = 0;
}
// Rotate to next lock index
if (++i == lockFutures.size()) {
i = 0;
}
}
benchState->SetItemsProcessed(static_cast<long>(benchState->iterations()));
delete pml;
return Void();
}
static void bench_priorityMultiLock(benchmark::State& benchState) {
onMainThread([&benchState]() { return benchPriorityMultiLock(&benchState); }).blockUntilReady();
}
BENCHMARK(bench_priorityMultiLock)->DenseRange(1, 8)->ReportAggregatesOnly(true);