Merge branch 'main' of github.com:apple/foundationdb into metacluster-assigned-cluster

This commit is contained in:
Jon Fu 2022-09-22 14:39:27 -07:00
commit e342a9db43
35 changed files with 1582 additions and 390 deletions

View File

@ -845,6 +845,57 @@ def tenant_old_commands(logger):
assert rename_output == rename_output_old
assert delete_output == delete_output_old
@enable_logging()
def tenant_group_list(logger):
output = run_fdbcli_command('tenantgroup list')
assert output == 'The cluster has no tenant groups'
setup_tenants(['tenant', 'tenant2 tenant_group=tenant_group2', 'tenant3 tenant_group=tenant_group3'])
output = run_fdbcli_command('tenantgroup list')
assert output == '1. tenant_group2\n 2. tenant_group3'
output = run_fdbcli_command('tenantgroup list a z 1')
assert output == '1. tenant_group2'
output = run_fdbcli_command('tenantgroup list a tenant_group3')
assert output == '1. tenant_group2'
output = run_fdbcli_command('tenantgroup list tenant_group3 z')
assert output == '1. tenant_group3'
output = run_fdbcli_command('tenantgroup list a b')
assert output == 'The cluster has no tenant groups in the specified range'
output = run_fdbcli_command_and_get_error('tenantgroup list b a')
assert output == 'ERROR: end must be larger than begin'
output = run_fdbcli_command_and_get_error('tenantgroup list a b 12x')
assert output == 'ERROR: invalid limit `12x\''
@enable_logging()
def tenant_group_get(logger):
setup_tenants(['tenant tenant_group=tenant_group'])
output = run_fdbcli_command('tenantgroup get tenant_group')
assert output == 'The tenant group is present in the cluster'
output = run_fdbcli_command('tenantgroup get tenant_group JSON')
json_output = json.loads(output, strict=False)
assert(len(json_output) == 2)
assert('tenant_group' in json_output)
assert(json_output['type'] == 'success')
assert(len(json_output['tenant_group']) == 0)
output = run_fdbcli_command_and_get_error('tenantgroup get tenant_group2')
assert output == 'ERROR: tenant group not found'
output = run_fdbcli_command('tenantgroup get tenant_group2 JSON')
json_output = json.loads(output, strict=False)
assert(len(json_output) == 2)
assert(json_output['type'] == 'error')
assert(json_output['error'] == 'tenant group not found')
def tenants():
run_tenant_test(tenant_create)
run_tenant_test(tenant_delete)
@ -854,6 +905,8 @@ def tenants():
run_tenant_test(tenant_rename)
run_tenant_test(tenant_usetenant)
run_tenant_test(tenant_old_commands)
run_tenant_test(tenant_group_list)
run_tenant_test(tenant_group_get)
def integer_options():
process = subprocess.Popen(command_template[:-1], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=fdbcli_env)

View File

@ -57,19 +57,27 @@ function(compile_boost)
# Build boost
include(ExternalProject)
set(BOOST_INSTALL_DIR "${CMAKE_BINARY_DIR}/boost_install")
ExternalProject_add("${COMPILE_BOOST_TARGET}Project"
URL "https://boostorg.jfrog.io/artifactory/main/release/1.78.0/source/boost_1_78_0.tar.bz2"
URL_HASH SHA256=8681f175d4bdb26c52222665793eef08490d7758529330f98d3b29dd0735bccc
CONFIGURE_COMMAND ${BOOTSTRAP_COMMAND} ${BOOTSTRAP_ARGS} --with-libraries=${BOOTSTRAP_LIBRARIES} --with-toolset=${BOOST_TOOLSET}
BUILD_COMMAND ${B2_COMMAND} link=static ${COMPILE_BOOST_BUILD_ARGS} --prefix=${BOOST_INSTALL_DIR} ${USER_CONFIG_FLAG} install
BUILD_IN_SOURCE ON
INSTALL_COMMAND ""
UPDATE_COMMAND ""
BUILD_BYPRODUCTS "${BOOST_INSTALL_DIR}/boost/config.hpp"
"${BOOST_INSTALL_DIR}/lib/libboost_context.a"
"${BOOST_INSTALL_DIR}/lib/libboost_filesystem.a"
"${BOOST_INSTALL_DIR}/lib/libboost_iostreams.a")
URL "https://boostorg.jfrog.io/artifactory/main/release/1.78.0/source/boost_1_78_0.tar.bz2"
URL_HASH SHA256=8681f175d4bdb26c52222665793eef08490d7758529330f98d3b29dd0735bccc
CONFIGURE_COMMAND ${BOOTSTRAP_COMMAND}
${BOOTSTRAP_ARGS}
--with-libraries=${BOOTSTRAP_LIBRARIES}
--with-toolset=${BOOST_TOOLSET}
BUILD_COMMAND ${B2_COMMAND}
link=static
${COMPILE_BOOST_BUILD_ARGS}
--prefix=${BOOST_INSTALL_DIR}
${USER_CONFIG_FLAG} install
BUILD_IN_SOURCE ON
INSTALL_COMMAND ""
UPDATE_COMMAND ""
BUILD_BYPRODUCTS "${BOOST_INSTALL_DIR}/boost/config.hpp"
"${BOOST_INSTALL_DIR}/lib/libboost_context.a"
"${BOOST_INSTALL_DIR}/lib/libboost_filesystem.a"
"${BOOST_INSTALL_DIR}/lib/libboost_iostreams.a")
add_library(${COMPILE_BOOST_TARGET}_context STATIC IMPORTED)
add_dependencies(${COMPILE_BOOST_TARGET}_context ${COMPILE_BOOST_TARGET}Project)

23
cmake/CompileZstd.cmake Normal file
View File

@ -0,0 +1,23 @@
# Compile zstd
function(compile_zstd)
include(FetchContent)
set(ZSTD_SOURCE_DIR ${CMAKE_BINARY_DIR}/zstd)
FetchContent_Declare(
ZSTD
GIT_REPOSITORY https://github.com/facebook/zstd.git
GIT_TAG v1.5.2
SOURCE_DIR ${ZSTD_SOURCE_DIR}
BINARY_DIR ${ZSTD_SOURCE_DIR}
SOURCE_SUBDIR "build/cmake"
)
FetchContent_MakeAvailable(ZSTD)
add_library(ZSTD::ZSTD STATIC IMPORTED)
set_target_properties(ZSTD::ZSTD PROPERTIES IMPORTED_LOCATION "${CMAKE_BINARY_DIR}/lib/libzstd.a")
target_include_directories(ZSTD::ZSTD PUBLIC ${ZSTD_INCLUDE_DIRS})
endfunction(compile_zstd)

View File

@ -1 +1,2 @@
using @BOOST_TOOLSET@ : : @BOOST_CXX_COMPILER@ : @BOOST_ADDITIONAL_COMPILE_OPTIONS@ ;
using zstd : 1.5.2 : <include>/@CMAKE_BINARY_DIR@/zstd/lib <search>/@CMAKE_BINARY_DIR@/lib ;

View File

@ -522,6 +522,50 @@ Changes the name of an existing tenant.
``NEW_NAME`` - the desired name of the tenant. This name must not already be in use.
tenantgroup
-----------
The ``tenantgroup`` command is used to view details about the tenant groups in a cluster. The ``tenantgroup`` command has the following subcommands:
list
^^^^
``tenantgroup list [BEGIN] [END] [LIMIT]``
Lists the tenant groups present in the cluster.
``BEGIN`` - the first tenant group to list. Defaults to the empty tenant group name ``""``.
``END`` - the exclusive end tenant group to list. Defaults to ``\xff\xff``.
``LIMIT`` - the number of tenant groups to list. Defaults to 100.
get
^^^
``tenantgroup get <NAME> [JSON]``
Prints the metadata for a tenant group.
``NAME`` - the name of the tenant group to print.
``JSON`` - if specified, the output of the command will be printed in the form of a JSON string::
{
"tenant_group": {
"assigned_cluster": "cluster1",
},
"type": "success"
}
In the event of an error, the JSON output will include an error message::
{
"error": "...",
"type": "error"
}
throttle
--------

View File

@ -263,7 +263,7 @@ ACTOR Future<bool> tenantListCommand(Reference<IDatabase> db, std::vector<String
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
state ClusterType clusterType = wait(TenantAPI::getClusterType(tr));
state std::vector<TenantNameRef> tenantNames;
state std::vector<TenantName> tenantNames;
if (clusterType == ClusterType::METACLUSTER_MANAGEMENT) {
std::vector<std::pair<TenantName, TenantMapEntry>> tenants =
wait(MetaclusterAPI::listTenantsTransaction(tr, beginTenant, endTenant, limit));

View File

@ -0,0 +1,240 @@
/*
* TenantGroupCommands.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbcli/fdbcli.actor.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/GenericManagementAPI.actor.h"
#include "fdbclient/IClientApi.h"
#include "fdbclient/Knobs.h"
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/MetaclusterManagement.actor.h"
#include "fdbclient/TenantManagement.actor.h"
#include "fdbclient/Schemas.h"
#include "flow/Arena.h"
#include "flow/FastRef.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
namespace fdb_cli {
// tenantgroup list command
ACTOR Future<bool> tenantGroupListCommand(Reference<IDatabase> db, std::vector<StringRef> tokens) {
if (tokens.size() > 5) {
fmt::print("Usage: tenantgroup list [BEGIN] [END] [LIMIT]\n\n");
fmt::print("Lists the tenant groups in a cluster.\n");
fmt::print("Only tenant groups in the range BEGIN - END will be printed.\n");
fmt::print("An optional LIMIT can be specified to limit the number of results (default 100).\n");
return false;
}
state StringRef beginTenantGroup = ""_sr;
state StringRef endTenantGroup = "\xff\xff"_sr;
state int limit = 100;
if (tokens.size() >= 3) {
beginTenantGroup = tokens[2];
}
if (tokens.size() >= 4) {
endTenantGroup = tokens[3];
if (endTenantGroup <= beginTenantGroup) {
fmt::print(stderr, "ERROR: end must be larger than begin");
return false;
}
}
if (tokens.size() == 5) {
int n = 0;
if (sscanf(tokens[4].toString().c_str(), "%d%n", &limit, &n) != 1 || n != tokens[4].size() || limit <= 0) {
fmt::print(stderr, "ERROR: invalid limit `{}'\n", tokens[4].toString());
return false;
}
}
state Reference<ITransaction> tr = db->createTransaction();
loop {
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
state ClusterType clusterType = wait(TenantAPI::getClusterType(tr));
state std::vector<TenantGroupName> tenantGroupNames;
state std::vector<std::pair<TenantGroupName, TenantGroupEntry>> tenantGroups;
if (clusterType == ClusterType::METACLUSTER_MANAGEMENT) {
wait(store(tenantGroups,
MetaclusterAPI::listTenantGroupsTransaction(tr, beginTenantGroup, endTenantGroup, limit)));
} else {
wait(store(tenantGroups,
TenantAPI::listTenantGroupsTransaction(tr, beginTenantGroup, endTenantGroup, limit)));
}
if (tenantGroups.empty()) {
if (tokens.size() == 2) {
fmt::print("The cluster has no tenant groups\n");
} else {
fmt::print("The cluster has no tenant groups in the specified range\n");
}
}
int index = 0;
for (auto tenantGroup : tenantGroups) {
fmt::print(" {}. {}\n", ++index, printable(tenantGroup.first));
}
return true;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
// tenantgroup get command
ACTOR Future<bool> tenantGroupGetCommand(Reference<IDatabase> db, std::vector<StringRef> tokens) {
if (tokens.size() > 4 || (tokens.size() == 4 && tokens[3] != "JSON"_sr)) {
fmt::print("Usage: tenantgroup get <NAME> [JSON]\n\n");
fmt::print("Prints metadata associated with the given tenant group.\n");
fmt::print("If JSON is specified, then the output will be in JSON format.\n");
return false;
}
state bool useJson = tokens.size() == 4;
state Reference<ITransaction> tr = db->createTransaction();
loop {
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
state ClusterType clusterType = wait(TenantAPI::getClusterType(tr));
state std::string tenantJson;
state Optional<TenantGroupEntry> entry;
if (clusterType == ClusterType::METACLUSTER_MANAGEMENT) {
wait(store(entry, MetaclusterAPI::tryGetTenantGroupTransaction(tr, tokens[2])));
} else {
wait(store(entry, TenantAPI::tryGetTenantGroupTransaction(tr, tokens[2])));
Optional<MetaclusterRegistrationEntry> metaclusterRegistration =
wait(MetaclusterMetadata::metaclusterRegistration().get(tr));
// We don't store assigned clusters in the tenant group entry on data clusters, so we can instead
// populate it from the metacluster registration
if (entry.present() && metaclusterRegistration.present() &&
metaclusterRegistration.get().clusterType == ClusterType::METACLUSTER_DATA &&
!entry.get().assignedCluster.present()) {
entry.get().assignedCluster = metaclusterRegistration.get().name;
}
}
if (!entry.present()) {
throw tenant_not_found();
}
if (useJson) {
json_spirit::mObject resultObj;
resultObj["tenant_group"] = entry.get().toJson();
resultObj["type"] = "success";
fmt::print("{}\n",
json_spirit::write_string(json_spirit::mValue(resultObj), json_spirit::pretty_print));
} else {
if (entry.get().assignedCluster.present()) {
fmt::print(" assigned cluster: {}\n", printable(entry.get().assignedCluster));
} else {
// This is a placeholder output for when a tenant group is read in a non-metacluster, where
// it currently has no metadata. When metadata is eventually added, we can print that instead.
fmt::print("The tenant group is present in the cluster\n");
}
}
return true;
} catch (Error& e) {
try {
wait(safeThreadFutureToFuture(tr->onError(e)));
} catch (Error& finalErr) {
state std::string errorStr;
if (finalErr.code() == error_code_tenant_not_found) {
errorStr = "tenant group not found";
} else if (useJson) {
errorStr = finalErr.what();
} else {
throw finalErr;
}
if (useJson) {
json_spirit::mObject resultObj;
resultObj["type"] = "error";
resultObj["error"] = errorStr;
fmt::print("{}\n",
json_spirit::write_string(json_spirit::mValue(resultObj), json_spirit::pretty_print));
} else {
fmt::print(stderr, "ERROR: {}\n", errorStr);
}
return false;
}
}
}
}
// tenantgroup command
Future<bool> tenantGroupCommand(Reference<IDatabase> db, std::vector<StringRef> tokens) {
if (tokens.size() == 1) {
printUsage(tokens[0]);
return true;
} else if (tokencmp(tokens[1], "list")) {
return tenantGroupListCommand(db, tokens);
} else if (tokencmp(tokens[1], "get")) {
return tenantGroupGetCommand(db, tokens);
} else {
printUsage(tokens[0]);
return true;
}
}
void tenantGroupGenerator(const char* text,
const char* line,
std::vector<std::string>& lc,
std::vector<StringRef> const& tokens) {
if (tokens.size() == 1) {
const char* opts[] = { "list", "get", nullptr };
arrayGenerator(text, line, opts, lc);
} else if (tokens.size() == 3 && tokencmp(tokens[1], "get")) {
const char* opts[] = { "JSON", nullptr };
arrayGenerator(text, line, opts, lc);
}
}
std::vector<const char*> tenantGroupHintGenerator(std::vector<StringRef> const& tokens, bool inArgument) {
if (tokens.size() == 1) {
return { "<list|get>", "[ARGS]" };
} else if (tokencmp(tokens[1], "list") && tokens.size() < 5) {
static std::vector<const char*> opts = { "[BEGIN]", "[END]", "[LIMIT]" };
return std::vector<const char*>(opts.begin() + tokens.size() - 2, opts.end());
} else if (tokencmp(tokens[1], "get") && tokens.size() < 4) {
static std::vector<const char*> opts = { "<NAME>", "[JSON]" };
return std::vector<const char*>(opts.begin() + tokens.size() - 2, opts.end());
} else {
return {};
}
}
CommandFactory tenantGroupRegisterFactory("tenantgroup",
CommandHelp("tenantgroup <list|get> [ARGS]",
"view tenant group information",
"`list' prints a list of tenant groups in the cluster.\n"
"`get' prints the metadata for a particular tenant group.\n"),
&tenantGroupGenerator,
&tenantGroupHintGenerator);
} // namespace fdb_cli

View File

@ -1902,6 +1902,13 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
continue;
}
if (tokencmp(tokens[0], "tenantgroup")) {
bool _result = wait(makeInterruptable(tenantGroupCommand(db, tokens)));
if (!_result)
is_error = true;
continue;
}
if (tokencmp(tokens[0], "metacluster")) {
bool _result = wait(makeInterruptable(metaclusterCommand(db, tokens)));
if (!_result)

View File

@ -239,6 +239,8 @@ ACTOR Future<bool> suspendCommandActor(Reference<IDatabase> db,
Future<bool> tenantCommand(Reference<IDatabase> db, std::vector<StringRef> tokens);
// tenant command compatibility layer
Future<bool> tenantCommandForwarder(Reference<IDatabase> db, std::vector<StringRef> tokens);
// tenantgroup command
Future<bool> tenantGroupCommand(Reference<IDatabase> db, std::vector<StringRef> tokens);
// throttle command
ACTOR Future<bool> throttleCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
// triggerteaminfolog command

View File

@ -60,21 +60,6 @@ uint16_t MIN_SUPPORTED_BG_FORMAT_VERSION = 1;
const uint8_t SNAPSHOT_FILE_TYPE = 'S';
const uint8_t DELTA_FILE_TYPE = 'D';
static int getDefaultCompressionLevel(CompressionFilter filter) {
if (filter == CompressionFilter::NONE) {
return -1;
#ifdef ZLIB_LIB_SUPPORTED
} else if (filter == CompressionFilter::GZIP) {
// opt for high speed compression, larger levels have a high cpu cost and not much compression ratio
// improvement, according to benchmarks
return 1;
#endif
} else {
ASSERT(false);
return -1;
}
}
// Deltas in key order
// For key-ordered delta files, the format for both sets and range clears is that you store boundaries ordered by key.
@ -475,8 +460,10 @@ struct IndexBlobGranuleFileChunkRef {
const CompressionFilter compFilter,
Arena& arena) {
chunkRef.compressionFilter = compFilter;
chunkRef.buffer = CompressionUtils::compress(
chunkRef.compressionFilter.get(), chunk.contents(), getDefaultCompressionLevel(compFilter), arena);
chunkRef.buffer = CompressionUtils::compress(chunkRef.compressionFilter.get(),
chunk.contents(),
CompressionUtils::getDefaultCompressionLevel(compFilter),
arena);
if (BG_ENCRYPT_COMPRESS_DEBUG) {
XXH64_hash_t chunkChksum = XXH3_64bits(chunk.contents().begin(), chunk.contents().size());
@ -2015,11 +2002,7 @@ struct KeyValueGen {
cipherKeys = getCipherKeysCtx(ar);
}
if (deterministicRandom()->coinflip()) {
#ifdef ZLIB_LIB_SUPPORTED
compressFilter = CompressionFilter::GZIP;
#else
compressFilter = CompressionFilter::NONE;
#endif
compressFilter = CompressionUtils::getRandomFilter();
}
}
@ -2199,10 +2182,8 @@ TEST_CASE("/blobgranule/files/validateEncryptionCompression") {
BlobGranuleCipherKeysCtx cipherKeys = getCipherKeysCtx(ar);
std::vector<bool> encryptionModes = { false, true };
std::vector<Optional<CompressionFilter>> compressionModes;
compressionModes.push_back({});
#ifdef ZLIB_LIB_SUPPORTED
compressionModes.push_back(CompressionFilter::GZIP);
#endif
compressionModes.insert(
compressionModes.end(), CompressionUtils::supportedFilters.begin(), CompressionUtils::supportedFilters.end());
std::vector<Value> snapshotValues;
for (bool encryptionMode : encryptionModes) {
@ -2915,10 +2896,8 @@ TEST_CASE("!/blobgranule/files/benchFromFiles") {
std::vector<bool> chunkModes = { false, true };
std::vector<bool> encryptionModes = { false, true };
std::vector<Optional<CompressionFilter>> compressionModes;
compressionModes.push_back({});
#ifdef ZLIB_LIB_SUPPORTED
compressionModes.push_back(CompressionFilter::GZIP);
#endif
compressionModes.insert(
compressionModes.end(), CompressionUtils::supportedFilters.begin(), CompressionUtils::supportedFilters.end());
std::vector<std::string> runNames = { "logical" };
std::vector<std::pair<int64_t, double>> snapshotMetrics;

View File

@ -3936,6 +3936,7 @@ Future<RangeResultFamily> getExactRange(Reference<TransactionState> trState,
req.version = version;
req.begin = firstGreaterOrEqual(range.begin);
req.end = firstGreaterOrEqual(range.end);
setMatchIndex<GetKeyValuesFamilyRequest>(req, matchIndex);
req.spanContext = span.context;
trState->cx->getLatestCommitVersions(

View File

@ -19,6 +19,7 @@
*/
#include "fdbclient/ServerKnobs.h"
#include "flow/CompressionUtils.h"
#include "flow/IRandom.h"
#include "flow/flow.h"
@ -782,6 +783,10 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( QUICK_GET_KEY_VALUES_LIMIT, 2000 );
init( QUICK_GET_KEY_VALUES_LIMIT_BYTES, 1e7 );
init( STORAGE_FEED_QUERY_HARD_LIMIT, 100000 );
init( STORAGE_SERVER_READ_CONCURRENCY, 70 );
// Priorities which each ReadType maps to, in enumeration order
init( STORAGESERVER_READ_RANKS, "0,2,1,1,1" );
init( STORAGESERVER_READ_PRIORITIES, "48,32,8" );
//Wait Failure
init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2;
@ -893,7 +898,6 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( REDWOOD_DEFAULT_EXTENT_SIZE, 32 * 1024 * 1024 );
init( REDWOOD_DEFAULT_EXTENT_READ_SIZE, 1024 * 1024 );
init( REDWOOD_EXTENT_CONCURRENT_READS, 4 );
init( REDWOOD_KVSTORE_CONCURRENT_READS, 64 );
init( REDWOOD_KVSTORE_RANGE_PREFETCH, true );
init( REDWOOD_PAGE_REBUILD_MAX_SLACK, 0.33 );
init( REDWOOD_LAZY_CLEAR_BATCH_SIZE_PAGES, 10 );
@ -906,6 +910,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( REDWOOD_HISTOGRAM_INTERVAL, 30.0 );
init( REDWOOD_EVICT_UPDATED_PAGES, true ); if( randomize && BUGGIFY ) { REDWOOD_EVICT_UPDATED_PAGES = false; }
init( REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT, 2 ); if( randomize && BUGGIFY ) { REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT = deterministicRandom()->randomInt(1, 7); }
init( REDWOOD_PRIORITY_LAUNCHS, "32,32,32,32" );
// Server request latency measurement
init( LATENCY_SAMPLE_SIZE, 100000 );
@ -925,7 +930,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
// encrypt key proxy
init( ENABLE_BLOB_GRANULE_COMPRESSION, false ); if ( randomize && BUGGIFY ) { ENABLE_BLOB_GRANULE_COMPRESSION = deterministicRandom()->coinflip(); }
init( BLOB_GRANULE_COMPRESSION_FILTER, "NONE" );
init( BLOB_GRANULE_COMPRESSION_FILTER, "NONE" ); if ( randomize && BUGGIFY ) { BLOB_GRANULE_COMPRESSION_FILTER = CompressionUtils::toString(CompressionUtils::getRandomFilter()); }
// KMS connector type
init( KMS_CONNECTOR_TYPE, "RESTKmsConnector" );

View File

@ -180,6 +180,15 @@ void TenantMapEntry::configure(Standalone<StringRef> parameter, Optional<Value>
}
}
json_spirit::mObject TenantGroupEntry::toJson() const {
json_spirit::mObject tenantGroupEntry;
if (assignedCluster.present()) {
tenantGroupEntry["assigned_cluster"] = assignedCluster.get().toString();
}
return tenantGroupEntry;
}
TenantMetadataSpecification& TenantMetadata::instance() {
static TenantMetadataSpecification _instance = TenantMetadataSpecification("\xff/"_sr);
return _instance;

View File

@ -1581,13 +1581,7 @@ struct StorageWiggleValue {
}
};
enum class ReadType {
EAGER,
FETCH,
LOW,
NORMAL,
HIGH,
};
enum class ReadType { EAGER = 0, FETCH = 1, LOW = 2, NORMAL = 3, HIGH = 4, MIN = EAGER, MAX = HIGH };
FDB_DECLARE_BOOLEAN_PARAM(CacheResult);

View File

@ -1949,6 +1949,61 @@ Future<Void> renameTenant(Reference<DB> db, TenantName oldName, TenantName newNa
return Void();
}
template <class Transaction>
Future<Optional<TenantGroupEntry>> tryGetTenantGroupTransaction(Transaction tr, TenantGroupName name) {
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
return ManagementClusterMetadata::tenantMetadata().tenantGroupMap.get(tr, name);
}
ACTOR template <class DB>
Future<Optional<TenantGroupEntry>> tryGetTenantGroup(Reference<DB> db, TenantGroupName name) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
loop {
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
Optional<TenantGroupEntry> entry = wait(tryGetTenantGroupTransaction(tr, name));
return entry;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
ACTOR template <class Transaction>
Future<std::vector<std::pair<TenantGroupName, TenantGroupEntry>>> listTenantGroupsTransaction(Transaction tr,
TenantGroupName begin,
TenantGroupName end,
int limit) {
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
KeyBackedRangeResult<std::pair<TenantGroupName, TenantGroupEntry>> results =
wait(ManagementClusterMetadata::tenantMetadata().tenantGroupMap.getRange(tr, begin, end, limit));
return results.results;
}
ACTOR template <class DB>
Future<std::vector<std::pair<TenantGroupName, TenantGroupEntry>>> listTenantGroups(Reference<DB> db,
TenantGroupName begin,
TenantGroupName end,
int limit) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
loop {
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
std::vector<std::pair<TenantGroupName, TenantGroupEntry>> tenantGroups =
wait(listTenantGroupsTransaction(tr, begin, end, limit));
return tenantGroups;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
} // namespace MetaclusterAPI
#include "flow/unactorcompiler.h"

View File

@ -477,6 +477,7 @@ public:
Database getDatabase() const { return trState->cx; }
static Reference<TransactionLogInfo> createTrLogInfoProbabilistically(const Database& cx);
Transaction& getTransaction() { return *this; }
void setTransactionID(UID id);
void setToken(uint64_t token);

View File

@ -735,6 +735,9 @@ public:
int QUICK_GET_KEY_VALUES_LIMIT;
int QUICK_GET_KEY_VALUES_LIMIT_BYTES;
int STORAGE_FEED_QUERY_HARD_LIMIT;
int STORAGE_SERVER_READ_CONCURRENCY;
std::string STORAGESERVER_READ_RANKS;
std::string STORAGESERVER_READ_PRIORITIES;
// Wait Failure
int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS;
@ -864,7 +867,6 @@ public:
int REDWOOD_DEFAULT_EXTENT_SIZE; // Extent size for new Redwood files
int REDWOOD_DEFAULT_EXTENT_READ_SIZE; // Extent read size for Redwood files
int REDWOOD_EXTENT_CONCURRENT_READS; // Max number of simultaneous extent disk reads in progress.
int REDWOOD_KVSTORE_CONCURRENT_READS; // Max number of simultaneous point or range reads in progress.
bool REDWOOD_KVSTORE_RANGE_PREFETCH; // Whether to use range read prefetching
double REDWOOD_PAGE_REBUILD_MAX_SLACK; // When rebuilding pages, max slack to allow in page
int REDWOOD_LAZY_CLEAR_BATCH_SIZE_PAGES; // Number of pages to try to pop from the lazy delete queue and process at
@ -883,6 +885,8 @@ public:
bool REDWOOD_EVICT_UPDATED_PAGES; // Whether to prioritize eviction of updated pages from cache.
int REDWOOD_DECODECACHE_REUSE_MIN_HEIGHT; // Minimum height for which to keep and reuse page decode caches
std::string REDWOOD_PRIORITY_LAUNCHS;
// Server request latency measurement
int LATENCY_SAMPLE_SIZE;
double LATENCY_METRICS_LOGGING_INTERVAL;

View File

@ -138,6 +138,8 @@ struct TenantGroupEntry {
TenantGroupEntry() = default;
TenantGroupEntry(Optional<ClusterName> assignedCluster) : assignedCluster(assignedCluster) {}
json_spirit::mObject toJson() const;
Value encode() { return ObjectWriter::toValue(*this, IncludeVersion()); }
static TenantGroupEntry decode(ValueRef const& value) {
return ObjectReader::fromStringRef<TenantGroupEntry>(value, IncludeVersion());

View File

@ -462,8 +462,8 @@ Future<Void> configureTenantTransaction(Transaction tr,
ACTOR template <class Transaction>
Future<std::vector<std::pair<TenantName, TenantMapEntry>>> listTenantsTransaction(Transaction tr,
TenantNameRef begin,
TenantNameRef end,
TenantName begin,
TenantName end,
int limit) {
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
@ -598,6 +598,62 @@ Future<Void> renameTenant(Reference<DB> db,
}
}
}
template <class Transaction>
Future<Optional<TenantGroupEntry>> tryGetTenantGroupTransaction(Transaction tr, TenantGroupName name) {
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
return TenantMetadata::tenantGroupMap().get(tr, name);
}
ACTOR template <class DB>
Future<Optional<TenantGroupEntry>> tryGetTenantGroup(Reference<DB> db, TenantGroupName name) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
loop {
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
Optional<TenantGroupEntry> entry = wait(tryGetTenantGroupTransaction(tr, name));
return entry;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
ACTOR template <class Transaction>
Future<std::vector<std::pair<TenantGroupName, TenantGroupEntry>>> listTenantGroupsTransaction(Transaction tr,
TenantGroupName begin,
TenantGroupName end,
int limit) {
tr->setOption(FDBTransactionOptions::RAW_ACCESS);
KeyBackedRangeResult<std::pair<TenantGroupName, TenantGroupEntry>> results =
wait(TenantMetadata::tenantGroupMap().getRange(tr, begin, end, limit));
return results.results;
}
ACTOR template <class DB>
Future<std::vector<std::pair<TenantGroupName, TenantGroupEntry>>> listTenantGroups(Reference<DB> db,
TenantGroupName begin,
TenantGroupName end,
int limit) {
state Reference<typename DB::TransactionT> tr = db->createTransaction();
loop {
try {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
std::vector<std::pair<TenantGroupName, TenantGroupEntry>> tenantGroups =
wait(listTenantGroupsTransaction(tr, begin, end, limit));
return tenantGroups;
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
} // namespace TenantAPI
#include "flow/unactorcompiler.h"

View File

@ -28,6 +28,7 @@
#include "flow/Trace.h"
#include "flow/flow.h"
#include "flow/Histogram.h"
#include "flow/PriorityMultiLock.actor.h"
#include <limits>
#include <random>
#include "fdbrpc/ContinuousSample.h"
@ -102,201 +103,6 @@ std::string addPrefix(std::string prefix, std::string lines) {
return s;
}
#define PRIORITYMULTILOCK_DEBUG 0
// A multi user lock with a concurrent holder limit where waiters are granted the lock according to
// an integer priority from 0 to maxPriority, inclusive, where higher integers are given priority.
//
// The interface is similar to FlowMutex except that lock holders can drop the lock to release it.
//
// Usage:
// Lock lock = wait(prioritylock.lock(priorityLevel));
// lock.release(); // Explicit release, or
// // let lock and all copies of lock go out of scope to release
class PriorityMultiLock {
public:
// Waiting on the lock returns a Lock, which is really just a Promise<Void>
// Calling release() is not necessary, it exists in case the Lock holder wants to explicitly release
// the Lock before it goes out of scope.
struct Lock {
void release() { promise.send(Void()); }
// This is exposed in case the caller wants to use/copy it directly
Promise<Void> promise;
};
private:
struct Waiter {
Waiter() : queuedTime(now()) {}
Promise<Lock> lockPromise;
double queuedTime;
};
typedef Deque<Waiter> Queue;
#if PRIORITYMULTILOCK_DEBUG
#define prioritylock_printf(...) printf(__VA_ARGS__)
#else
#define prioritylock_printf(...)
#endif
public:
PriorityMultiLock(int concurrency, int maxPriority, int launchLimit = std::numeric_limits<int>::max())
: concurrency(concurrency), available(concurrency), waiting(0), launchLimit(launchLimit) {
waiters.resize(maxPriority + 1);
fRunner = runner(this);
}
~PriorityMultiLock() { prioritylock_printf("destruct"); }
Future<Lock> lock(int priority = 0) {
prioritylock_printf("lock begin %s\n", toString().c_str());
// This shortcut may enable a waiter to jump the line when the releaser loop yields
if (available > 0) {
--available;
Lock p;
addRunner(p);
prioritylock_printf("lock exit immediate %s\n", toString().c_str());
return p;
}
Waiter w;
waiters[priority].push_back(w);
++waiting;
prioritylock_printf("lock exit queued %s\n", toString().c_str());
return w.lockPromise.getFuture();
}
std::string toString() const {
int runnersDone = 0;
for (int i = 0; i < runners.size(); ++i) {
if (runners[i].isReady()) {
++runnersDone;
}
}
std::string s =
format("{ ptr=%p concurrency=%d available=%d running=%d waiting=%d runnersQueue=%d runnersDone=%d ",
this,
concurrency,
available,
concurrency - available,
waiting,
runners.size(),
runnersDone);
for (int i = 0; i < waiters.size(); ++i) {
s += format("p%d_waiters=%u ", i, waiters[i].size());
}
s += "}";
return s;
}
private:
void addRunner(Lock& lock) {
runners.push_back(map(ready(lock.promise.getFuture()), [=](Void) {
prioritylock_printf("Lock released\n");
++available;
if (waiting > 0 || runners.size() > 100) {
release.trigger();
}
return Void();
}));
}
ACTOR static Future<Void> runner(PriorityMultiLock* self) {
state int sinceYield = 0;
state Future<Void> error = self->brokenOnDestruct.getFuture();
state int maxPriority = self->waiters.size() - 1;
// Priority to try to run tasks from next
state int priority = maxPriority;
state Queue* pQueue = &self->waiters[maxPriority];
// Track the number of waiters unlocked at the same priority in a row
state int lastPriorityCount = 0;
loop {
// Cleanup finished runner futures at the front of the runner queue.
while (!self->runners.empty() && self->runners.front().isReady()) {
self->runners.pop_front();
}
// Wait for a runner to release its lock
wait(self->release.onTrigger());
prioritylock_printf("runner wakeup %s\n", self->toString().c_str());
if (++sinceYield == 1000) {
sinceYield = 0;
wait(delay(0));
}
// While there are available slots and there are waiters, launch tasks
while (self->available > 0 && self->waiting > 0) {
prioritylock_printf("Checking priority=%d lastPriorityCount=%d %s\n",
priority,
lastPriorityCount,
self->toString().c_str());
while (!pQueue->empty() && ++lastPriorityCount < self->launchLimit) {
Waiter w = pQueue->front();
pQueue->pop_front();
--self->waiting;
Lock lock;
prioritylock_printf(" Running waiter priority=%d wait=%f %s\n",
priority,
now() - w.queuedTime,
self->toString().c_str());
w.lockPromise.send(lock);
// Self may have been destructed during the lock callback
if (error.isReady()) {
throw error.getError();
}
// If the lock was not already released, add it to the runners future queue
if (lock.promise.canBeSet()) {
self->addRunner(lock);
// A slot has been consumed, so stop reading from this queue if there aren't any more
if (--self->available == 0) {
break;
}
}
}
// If there are no more slots available, then don't move to the next priority
if (self->available == 0) {
break;
}
// Decrease priority, wrapping around to max from 0
if (priority == 0) {
priority = maxPriority;
} else {
--priority;
}
pQueue = &self->waiters[priority];
lastPriorityCount = 0;
}
}
}
int concurrency;
int available;
int waiting;
int launchLimit;
std::vector<Queue> waiters;
Deque<Future<Void>> runners;
Future<Void> fRunner;
AsyncTrigger release;
Promise<Void> brokenOnDestruct;
};
// Some convenience functions for debugging to stringify various structures
// Classes can add compatibility by either specializing toString<T> or implementing
// std::string toString() const;
@ -1665,6 +1471,8 @@ struct RedwoodMetrics {
kvSizeReadByGetRange = Reference<Histogram>(
new Histogram(Reference<HistogramRegistry>(), "kvSize", "ReadByGetRange", Histogram::Unit::bytes));
ioLock = nullptr;
// These histograms are used for Btree events, hence level > 0
unsigned int levelCounter = 0;
for (RedwoodMetrics::Level& level : levels) {
@ -1707,6 +1515,8 @@ struct RedwoodMetrics {
// btree levels and one extra level for non btree level.
Level levels[btreeLevels + 1];
metrics metric;
// pointer to the priority multi lock used in pager
PriorityMultiLock* ioLock;
Reference<Histogram> kvSizeWritten;
Reference<Histogram> kvSizeReadByGet;
@ -1761,9 +1571,12 @@ struct RedwoodMetrics {
// The string is a reasonably well formatted page of information
void getFields(TraceEvent* e, std::string* s = nullptr, bool skipZeroes = false);
void getIOLockFields(TraceEvent* e, std::string* s = nullptr);
std::string toString(bool clearAfter) {
std::string s;
getFields(nullptr, &s);
getIOLockFields(nullptr, &s);
if (clearAfter) {
clear();
@ -1798,6 +1611,7 @@ ACTOR Future<Void> redwoodMetricsLogger() {
double elapsed = now() - g_redwoodMetrics.startTime;
e.detail("Elapsed", elapsed);
g_redwoodMetrics.getFields(&e);
g_redwoodMetrics.getIOLockFields(&e);
g_redwoodMetrics.clear();
}
}
@ -2194,7 +2008,7 @@ public:
bool memoryOnly,
Reference<IPageEncryptionKeyProvider> keyProvider,
Promise<Void> errorPromise = {})
: keyProvider(keyProvider), ioLock(FLOW_KNOBS->MAX_OUTSTANDING, ioMaxPriority, FLOW_KNOBS->MAX_OUTSTANDING / 2),
: keyProvider(keyProvider), ioLock(FLOW_KNOBS->MAX_OUTSTANDING, SERVER_KNOBS->REDWOOD_PRIORITY_LAUNCHS),
pageCacheBytes(pageCacheSizeBytes), desiredPageSize(desiredPageSize), desiredExtentSize(desiredExtentSize),
filename(filename), memoryOnly(memoryOnly), errorPromise(errorPromise),
remapCleanupWindowBytes(remapCleanupWindowBytes), concurrentExtentReads(new FlowLock(concurrentExtentReads)) {
@ -2206,6 +2020,7 @@ public:
// This sets the page cache size for all PageCacheT instances using the same evictor
pageCache.evictor().sizeLimit = pageCacheBytes;
g_redwoodMetrics.ioLock = &ioLock;
if (!g_redwoodMetricsActor.isValid()) {
g_redwoodMetricsActor = redwoodMetricsLogger();
}
@ -7695,8 +7510,7 @@ RedwoodRecordRef VersionedBTree::dbEnd("\xff\xff\xff\xff\xff"_sr);
class KeyValueStoreRedwood : public IKeyValueStore {
public:
KeyValueStoreRedwood(std::string filename, UID logID, Reference<IPageEncryptionKeyProvider> encryptionKeyProvider)
: m_filename(filename), m_concurrentReads(SERVER_KNOBS->REDWOOD_KVSTORE_CONCURRENT_READS, 0),
prefetch(SERVER_KNOBS->REDWOOD_KVSTORE_RANGE_PREFETCH) {
: m_filename(filename), prefetch(SERVER_KNOBS->REDWOOD_KVSTORE_RANGE_PREFETCH) {
int pageSize =
BUGGIFY ? deterministicRandom()->randomInt(1000, 4096 * 4) : SERVER_KNOBS->REDWOOD_DEFAULT_PAGE_SIZE;
@ -7756,6 +7570,8 @@ public:
ACTOR void shutdown(KeyValueStoreRedwood* self, bool dispose) {
TraceEvent(SevInfo, "RedwoodShutdown").detail("Filename", self->m_filename).detail("Dispose", dispose);
g_redwoodMetrics.ioLock = nullptr;
// In simulation, if the instance is being disposed of then sometimes run destructive sanity check.
if (g_network->isSimulated() && dispose && BUGGIFY) {
// Only proceed if the last commit is a success, but don't throw if it's not because shutdown
@ -7856,7 +7672,6 @@ public:
f.get();
} else {
CODE_PROBE(true, "Uncached forward range read seek");
wait(store(lock, self->m_concurrentReads.lock()));
wait(f);
}
@ -7912,7 +7727,6 @@ public:
f.get();
} else {
CODE_PROBE(true, "Uncached reverse range read seek");
wait(store(lock, self->m_concurrentReads.lock()));
wait(f);
}
@ -7979,9 +7793,6 @@ public:
wait(self->m_tree->initBTreeCursor(
&cur, self->m_tree->getLastCommittedVersion(), PagerEventReasons::PointRead, options));
// Not locking for point reads, instead relying on IO priority lock
// state PriorityMultiLock::Lock lock = wait(self->m_concurrentReads.lock());
++g_redwoodMetrics.metric.opGet;
wait(cur.seekGTE(key));
if (cur.isValid() && cur.get().key == key) {
@ -8017,7 +7828,6 @@ private:
Future<Void> m_init;
Promise<Void> m_closed;
Promise<Void> m_error;
PriorityMultiLock m_concurrentReads;
bool prefetch;
Version m_nextCommitVersion;
Reference<IPageEncryptionKeyProvider> m_keyProvider;
@ -8653,6 +8463,43 @@ void RedwoodMetrics::getFields(TraceEvent* e, std::string* s, bool skipZeroes) {
}
}
void RedwoodMetrics::getIOLockFields(TraceEvent* e, std::string* s) {
if (ioLock == nullptr)
return;
int maxPriority = ioLock->maxPriority();
if (e != nullptr) {
e->detail("ActiveReads", ioLock->totalRunners());
e->detail("AwaitReads", ioLock->totalWaiters());
for (int priority = 0; priority <= maxPriority; ++priority) {
e->detail(format("ActiveP%d", priority), ioLock->numRunners(priority));
e->detail(format("AwaitP%d", priority), ioLock->numWaiters(priority));
}
}
if (s != nullptr) {
std::string active = "Active";
std::string await = "Await";
*s += "\n";
*s += format("%-15s %-8u ", "ActiveReads", ioLock->totalRunners());
*s += format("%-15s %-8u ", "AwaitReads", ioLock->totalWaiters());
*s += "\n";
for (int priority = 0; priority <= maxPriority; ++priority) {
*s +=
format("%-15s %-8u ", (active + 'P' + std::to_string(priority)).c_str(), ioLock->numRunners(priority));
}
*s += "\n";
for (int priority = 0; priority <= maxPriority; ++priority) {
*s +=
format("%-15s %-8u ", (await + 'P' + std::to_string(priority)).c_str(), ioLock->numWaiters(priority));
}
}
}
TEST_CASE("/redwood/correctness/unit/RedwoodRecordRef") {
ASSERT(RedwoodRecordRef::Delta::LengthFormatSizes[0] == 3);
ASSERT(RedwoodRecordRef::Delta::LengthFormatSizes[1] == 4);
@ -11130,3 +10977,57 @@ TEST_CASE(":/redwood/performance/histograms") {
return Void();
}
ACTOR Future<Void> waitLockIncrement(PriorityMultiLock* pml, int priority, int* pout) {
state PriorityMultiLock::Lock lock = wait(pml->lock(priority));
wait(delay(deterministicRandom()->random01() * .1));
++*pout;
return Void();
}
TEST_CASE("/redwood/PriorityMultiLock") {
state std::vector<int> priorities = { 10, 20, 40 };
state int concurrency = 25;
state PriorityMultiLock* pml = new PriorityMultiLock(concurrency, priorities);
state std::vector<int> counts;
counts.resize(priorities.size(), 0);
// Clog the lock buy taking concurrency locks at each level
state std::vector<Future<PriorityMultiLock::Lock>> lockFutures;
for (int i = 0; i < priorities.size(); ++i) {
for (int j = 0; j < concurrency; ++j) {
lockFutures.push_back(pml->lock(i));
}
}
// Wait for n = concurrency locks to be acquired
wait(quorum(lockFutures, concurrency));
state std::vector<Future<Void>> futures;
for (int i = 0; i < 10e3; ++i) {
int p = i % priorities.size();
futures.push_back(waitLockIncrement(pml, p, &counts[p]));
}
state Future<Void> f = waitForAll(futures);
// Release the locks
lockFutures.clear();
// Print stats and wait for all futures to be ready
loop {
choose {
when(wait(delay(1))) {
printf("counts: ");
for (auto c : counts) {
printf("%d ", c);
}
printf(" pml: %s\n", pml->toString().c_str());
}
when(wait(f)) { break; }
}
}
delete pml;
return Void();
}

View File

@ -198,7 +198,8 @@ Future<Void> waitForLowInFlight(Database cx, T* workload) {
break;
}
} catch (Error& e) {
if (e.code() == error_code_attribute_not_found) {
if (e.code() == error_code_attribute_not_found ||
(e.code() == error_code_timed_out && !timeout.isReady())) {
// DD may not be initialized yet and attribute "DataInFlight" can be missing
wait(delay(1.0));
} else {

View File

@ -37,6 +37,7 @@
#include "flow/Error.h"
#include "flow/Hash3.h"
#include "flow/Histogram.h"
#include "flow/PriorityMultiLock.actor.h"
#include "flow/IRandom.h"
#include "flow/IndexedSet.h"
#include "flow/SystemMonitor.h"
@ -1022,6 +1023,9 @@ public:
FlowLock serveFetchCheckpointParallelismLock;
PriorityMultiLock ssLock;
std::vector<int> readPriorityRanks;
int64_t instanceID;
Promise<Void> otherError;
@ -1287,13 +1291,15 @@ public:
changeFeedDiskReadsLock(SERVER_KNOBS->CHANGE_FEED_DISK_READS_PARALLELISM),
fetchKeysBytesBudget(SERVER_KNOBS->STORAGE_FETCH_BYTES), fetchKeysBudgetUsed(false),
serveFetchCheckpointParallelismLock(SERVER_KNOBS->SERVE_FETCH_CHECKPOINT_PARALLELISM),
ssLock(SERVER_KNOBS->STORAGE_SERVER_READ_CONCURRENCY, SERVER_KNOBS->STORAGESERVER_READ_PRIORITIES),
instanceID(deterministicRandom()->randomUniqueID().first()), shuttingDown(false), behind(false),
versionBehind(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), lastBytesInputEBrake(0),
lastDurableVersionEBrake(0), maxQueryQueue(0), transactionTagCounter(ssi.id()),
busiestWriteTagContext(ssi.id()), counters(this),
storageServerSourceTLogIDEventHolder(
makeReference<EventCacheHolder>(ssi.id().toString() + "/StorageServerSourceTLogID")) {
readPriorityRanks = parseStringToVector<int>(SERVER_KNOBS->STORAGESERVER_READ_RANKS, ',');
ASSERT(readPriorityRanks.size() > (int)ReadType::MAX);
version.initMetric("StorageServer.Version"_sr, counters.cc.id);
oldestVersion.initMetric("StorageServer.OldestVersion"_sr, counters.cc.id);
durableVersion.initMetric("StorageServer.DurableVersion"_sr, counters.cc.id);
@ -1850,6 +1856,7 @@ std::vector<StorageServerShard> StorageServer::getStorageServerShards(KeyRangeRe
ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
state int64_t resultSize = 0;
state PriorityMultiLock::Lock lock;
Span span("SS:getValue"_loc, req.spanContext);
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
@ -1858,6 +1865,8 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
// Temporarily disabled -- this path is hit a lot
// getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first();
state ReadType type = req.options.present() ? req.options.get().type : ReadType::NORMAL;
try {
++data->counters.getValueQueries;
++data->counters.allQueries;
@ -1868,6 +1877,8 @@ ACTOR Future<Void> getValueQ(StorageServer* data, GetValueRequest req) {
// so we need to downgrade here
wait(data->getQueryDelay());
wait(store(lock, data->ssLock.lock(data->readPriorityRanks[(int)type])));
// Track time from requestTime through now as read queueing wait time
state double queueWaitEnd = g_network->timer();
data->counters.readQueueWaitSample.addMeasurement(queueWaitEnd - req.requestTime());
@ -3721,6 +3732,7 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
state Span span("SS:getKeyValues"_loc, req.spanContext);
state int64_t resultSize = 0;
state Optional<ReadOptions> options = req.options;
state ReadType type = options.present() ? options.get().type : ReadType::NORMAL;
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
@ -3735,12 +3747,13 @@ ACTOR Future<Void> getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req)
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
// so we need to downgrade here
if (SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && req.options.present() && req.options.get().type == ReadType::FETCH) {
wait(delay(0, TaskPriority::FetchKeys));
} else {
wait(data->getQueryDelay());
wait(data->getQueryDelay());
if (!SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && type == ReadType::FETCH) {
type = ReadType::NORMAL;
}
state PriorityMultiLock::Lock lock = wait(data->ssLock.lock(data->readPriorityRanks[(int)type]));
// Track time from requestTime through now as read queueing wait time
state double queueWaitEnd = g_network->timer();
data->counters.readQueueWaitSample.addMeasurement(queueWaitEnd - req.requestTime());
@ -4465,6 +4478,7 @@ ACTOR Future<Void> getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe
state Span span("SS:getMappedKeyValues"_loc, req.spanContext);
state int64_t resultSize = 0;
state Optional<ReadOptions> options = req.options;
state ReadType type = options.present() ? options.get().type : ReadType::NORMAL;
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
@ -4479,12 +4493,13 @@ ACTOR Future<Void> getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
// so we need to downgrade here
if (SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && req.options.present() && req.options.get().type == ReadType::FETCH) {
wait(delay(0, TaskPriority::FetchKeys));
} else {
wait(data->getQueryDelay());
wait(data->getQueryDelay());
if (!SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && type == ReadType::FETCH) {
type = ReadType::NORMAL;
}
state PriorityMultiLock::Lock lock = wait(data->ssLock.lock(data->readPriorityRanks[(int)type]));
// Track time from requestTime through now as read queueing wait time
state double queueWaitEnd = g_network->timer();
data->counters.readQueueWaitSample.addMeasurement(queueWaitEnd - req.requestTime());
@ -4681,6 +4696,7 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
state Span span("SS:getKeyValuesStream"_loc, req.spanContext);
state int64_t resultSize = 0;
state Optional<ReadOptions> options = req.options;
state ReadType type = options.present() ? options.get().type : ReadType::NORMAL;
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
@ -4694,12 +4710,14 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
// Active load balancing runs at a very high priority (to obtain accurate queue lengths)
// so we need to downgrade here
if (SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && req.options.present() && req.options.get().type == ReadType::FETCH) {
wait(delay(0, TaskPriority::FetchKeys));
} else {
wait(delay(0, TaskPriority::DefaultEndpoint));
wait(delay(0, TaskPriority::DefaultEndpoint));
if (!SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && type == ReadType::FETCH) {
type = ReadType::NORMAL;
}
state int readPriority = data->readPriorityRanks[(int)type];
state PriorityMultiLock::Lock lock = wait(data->ssLock.lock(readPriority));
try {
if (req.options.present() && req.options.get().debugID.present())
g_traceBatch.addEvent(
@ -4870,12 +4888,8 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
end = lastKey;
}
if (SERVER_KNOBS->FETCH_KEYS_LOWER_PRIORITY && req.options.present() &&
req.options.get().type == ReadType::FETCH) {
wait(delay(0, TaskPriority::FetchKeys));
} else {
wait(delay(0, TaskPriority::DefaultEndpoint));
}
lock.release();
wait(store(lock, data->ssLock.lock(readPriority)));
data->transactionTagCounter.addRequest(req.tags, resultSize);
}
@ -4896,14 +4910,19 @@ ACTOR Future<Void> getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe
ACTOR Future<Void> getKeyQ(StorageServer* data, GetKeyRequest req) {
state Span span("SS:getKey"_loc, req.spanContext);
state PriorityMultiLock::Lock lock;
if (req.tenantInfo.name.present()) {
span.addAttribute("tenant"_sr, req.tenantInfo.name.get());
}
state int64_t resultSize = 0;
state ReadOptions options;
state ReadType type = ReadType::NORMAL;
if (req.options.present()) {
options = req.options.get();
type = options.type;
}
getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID;
++data->counters.getKeyQueries;
@ -4915,6 +4934,8 @@ ACTOR Future<Void> getKeyQ(StorageServer* data, GetKeyRequest req) {
// so we need to downgrade here
wait(data->getQueryDelay());
wait(store(lock, data->ssLock.lock(data->readPriorityRanks[(int)type])));
// Track time from requestTime through now as read queueing wait time
state double queueWaitEnd = g_network->timer();
data->counters.readQueueWaitSample.addMeasurement(queueWaitEnd - req.requestTime());
@ -6469,11 +6490,8 @@ ACTOR Future<Void> fetchKeys(StorageServer* data, AddingShard* shard) {
state int debug_nextRetryToLog = 1;
state Error lastError;
// TODO: update to FETCH once the priority multi lock is used.
// leaving the readtype off for now to prevent data fetches stall under heavy load
// it is used to inform the storage that the rangeRead is for Fetch
// state ReadOptions options = ReadOptions(Optional<UID>(), ReadType::FETCH);
state ReadOptions options = ReadOptions(Optional<UID>(), ReadType::NORMAL);
state ReadOptions options = ReadOptions(fetchKeysID, ReadType::FETCH);
// FIXME: The client cache does not notice when servers are added to a team. To read from a local storage server
// we must refresh the cache manually.
@ -9824,6 +9842,21 @@ ACTOR Future<Void> metricsCore(StorageServer* self, StorageServerInterface ssi)
[self = self](TraceEvent& te) {
te.detail("StorageEngine", self->storage.getKeyValueStoreType().toString());
te.detail("Tag", self->tag.toString());
std::vector<int> rpr = self->readPriorityRanks;
te.detail("ReadsActive", self->ssLock.totalRunners());
te.detail("ReadsWaiting", self->ssLock.totalWaiters());
int type = (int)ReadType::FETCH;
te.detail("ReadFetchActive", self->ssLock.numRunners(rpr[type]));
te.detail("ReadFetchWaiting", self->ssLock.numWaiters(rpr[type]));
type = (int)ReadType::LOW;
te.detail("ReadLowActive", self->ssLock.numRunners(rpr[type]));
te.detail("ReadLowWaiting", self->ssLock.numWaiters(rpr[type]));
type = (int)ReadType::NORMAL;
te.detail("ReadNormalActive", self->ssLock.numRunners(rpr[type]));
te.detail("ReadNormalWaiting", self->ssLock.numWaiters(rpr[type]));
type = (int)ReadType::HIGH;
te.detail("ReadHighActive", self->ssLock.numRunners(rpr[type]));
te.detail("ReadHighWaiting", self->ssLock.numWaiters(rpr[type]));
StorageBytes sb = self->storage.getStorageBytes();
te.detail("KvstoreBytesUsed", sb.used);
te.detail("KvstoreBytesFree", sb.free);
@ -10680,6 +10713,9 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
// If the storage server dies while something that uses self is still on the stack,
// we want that actor to complete before we terminate and that memory goes out of scope
self.ssLock.kill();
state Error err = e;
if (storageServerTerminated(self, persistentData, err)) {
ssCore.cancel();
@ -10800,6 +10836,9 @@ ACTOR Future<Void> storageServer(IKeyValueStore* persistentData,
throw internal_error();
} catch (Error& e) {
self.ssLock.kill();
if (self.byteSampleRecovery.isValid()) {
self.byteSampleRecovery.cancel();
}

View File

@ -248,6 +248,10 @@ struct ConfigureDatabaseWorkload : TestWorkload {
std::string description() const override { return "DestroyDatabaseWorkload"; }
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override {
out.insert("MachineAttritionWorkload");
}
Future<Void> setup(Database const& cx) override { return _setup(cx, this); }
Future<Void> start(Database const& cx) override { return _start(this, cx); }

View File

@ -40,6 +40,9 @@ struct KillRegionWorkload : TestWorkload {
}
std::string description() const override { return "KillRegionWorkload"; }
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override { out.insert("all"); }
Future<Void> setup(Database const& cx) override {
if (enabled) {
return _setup(this, cx);

View File

@ -81,6 +81,10 @@ struct MetaclusterManagementWorkload : TestWorkload {
std::string description() const override { return "MetaclusterManagement"; }
void disableFailureInjectionWorkloads(std::set<std::string>& out) const override {
out.insert("MachineAttritionWorkload");
}
Future<Void> setup(Database const& cx) override {
if (clientId == 0) {
if (g_network->isSimulated() && BUGGIFY) {

View File

@ -22,6 +22,7 @@
#include <utility>
#include <vector>
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/ContinuousSample.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
@ -377,6 +378,8 @@ struct ReadWriteWorkload : ReadWriteCommon {
bool adjacentReads; // keys are adjacent within a transaction
bool adjacentWrites;
int extraReadConflictRangesPerTransaction, extraWriteConflictRangesPerTransaction;
int readType;
bool cacheResult;
Optional<Key> transactionTag;
int transactionsTagThrottled{ 0 };
@ -399,6 +402,8 @@ struct ReadWriteWorkload : ReadWriteCommon {
rampUpConcurrency = getOption(options, "rampUpConcurrency"_sr, false);
batchPriority = getOption(options, "batchPriority"_sr, false);
descriptionString = getOption(options, "description"_sr, "ReadWrite"_sr);
readType = getOption(options, "readType"_sr, 3);
cacheResult = getOption(options, "cacheResult"_sr, true);
if (hasOption(options, "transactionTag"_sr)) {
transactionTag = getOption(options, "transactionTag"_sr, ""_sr);
}
@ -428,6 +433,10 @@ struct ReadWriteWorkload : ReadWriteCommon {
if (transactionTag.present() && tr.getTags().size() == 0) {
tr.setOption(FDBTransactionOptions::AUTO_THROTTLE_TAG, transactionTag.get());
}
ReadOptions options;
options.type = static_cast<ReadType>(readType);
options.cacheResult = cacheResult;
tr.getTransaction().trState->readOptions = options;
}
std::string description() const override { return descriptionString.toString(); }
@ -503,7 +512,6 @@ struct ReadWriteWorkload : ReadWriteCommon {
state double startTime = now();
loop {
state Transaction tr(cx);
try {
self->setupTransaction(tr);
wait(self->readOp(&tr, keys, self, false));

View File

@ -223,8 +223,8 @@ struct TenantEntryCacheWorkload : TestWorkload {
CLIENT_KNOBS->TENANT_ENTRY_CACHE_LIST_REFRESH_INTERVAL * 10; // initial delay + multiple refresh runs
wait(delay(refreshWait));
// InitRefresh + multiple timer based invocations
ASSERT_GE(cache->numCacheRefreshes(), 3);
// InitRefresh + multiple timer based invocations (at least 2 invocations of cache->refresh())
ASSERT_GE(cache->numCacheRefreshes(), 2);
TraceEvent("TestCacheRefreshEnd");
return Void();

View File

@ -253,9 +253,9 @@ struct TenantManagementWorkload : TestWorkload {
return tenant;
}
Optional<TenantGroupName> chooseTenantGroup(bool allowSystemTenantGroup) {
Optional<TenantGroupName> chooseTenantGroup(bool allowSystemTenantGroup, bool allowEmptyGroup = true) {
Optional<TenantGroupName> tenantGroup;
if (deterministicRandom()->coinflip()) {
if (!allowEmptyGroup || deterministicRandom()->coinflip()) {
tenantGroup = TenantGroupNameRef(format("%s%08d",
localTenantGroupNamePrefix.toString().c_str(),
deterministicRandom()->randomInt(0, maxTenantGroups)));
@ -276,10 +276,10 @@ struct TenantManagementWorkload : TestWorkload {
}
// Creates tenant(s) using the specified operation type
ACTOR static Future<Void> createImpl(Reference<ReadYourWritesTransaction> tr,
std::map<TenantName, TenantMapEntry> tenantsToCreate,
OperationType operationType,
TenantManagementWorkload* self) {
ACTOR static Future<Void> createTenantImpl(Reference<ReadYourWritesTransaction> tr,
std::map<TenantName, TenantMapEntry> tenantsToCreate,
OperationType operationType,
TenantManagementWorkload* self) {
if (operationType == OperationType::SPECIAL_KEYS) {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
for (auto [tenant, entry] : tenantsToCreate) {
@ -387,7 +387,7 @@ struct TenantManagementWorkload : TestWorkload {
}
try {
Optional<Void> result = wait(timeout(createImpl(tr, tenantsToCreate, operationType, self),
Optional<Void> result = wait(timeout(createTenantImpl(tr, tenantsToCreate, operationType, self),
deterministicRandom()->randomInt(1, 30)));
if (result.present()) {
@ -574,12 +574,12 @@ struct TenantManagementWorkload : TestWorkload {
}
// Deletes the tenant or tenant range using the specified operation type
ACTOR static Future<Void> deleteImpl(Reference<ReadYourWritesTransaction> tr,
TenantName beginTenant,
Optional<TenantName> endTenant,
std::vector<TenantName> tenants,
OperationType operationType,
TenantManagementWorkload* self) {
ACTOR static Future<Void> deleteTenantImpl(Reference<ReadYourWritesTransaction> tr,
TenantName beginTenant,
Optional<TenantName> endTenant,
std::vector<TenantName> tenants,
OperationType operationType,
TenantManagementWorkload* self) {
state int tenantIndex;
if (operationType == OperationType::SPECIAL_KEYS) {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
@ -718,7 +718,7 @@ struct TenantManagementWorkload : TestWorkload {
try {
state Version beforeVersion = wait(self->getReadVersion(tr));
Optional<Void> result =
wait(timeout(deleteImpl(tr, beginTenant, endTenant, tenants, operationType, self),
wait(timeout(deleteTenantImpl(tr, beginTenant, endTenant, tenants, operationType, self),
deterministicRandom()->randomInt(1, 30)));
if (result.present()) {
@ -936,10 +936,10 @@ struct TenantManagementWorkload : TestWorkload {
}
// Gets the metadata for a tenant using the specified operation type
ACTOR static Future<TenantMapEntry> getImpl(Reference<ReadYourWritesTransaction> tr,
TenantName tenant,
OperationType operationType,
TenantManagementWorkload* self) {
ACTOR static Future<TenantMapEntry> getTenantImpl(Reference<ReadYourWritesTransaction> tr,
TenantName tenant,
OperationType operationType,
TenantManagementWorkload* self) {
state TenantMapEntry entry;
if (operationType == OperationType::SPECIAL_KEYS) {
Key key = self->specialKeysTenantMapPrefix.withSuffix(tenant);
@ -949,15 +949,12 @@ struct TenantManagementWorkload : TestWorkload {
}
entry = TenantManagementWorkload::jsonToTenantMapEntry(value.get());
} else if (operationType == OperationType::MANAGEMENT_DATABASE) {
TenantMapEntry _entry = wait(TenantAPI::getTenant(self->dataDb.getReference(), tenant));
entry = _entry;
wait(store(entry, TenantAPI::getTenant(self->dataDb.getReference(), tenant)));
} else if (operationType == OperationType::MANAGEMENT_TRANSACTION) {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
TenantMapEntry _entry = wait(TenantAPI::getTenantTransaction(tr, tenant));
entry = _entry;
wait(store(entry, TenantAPI::getTenantTransaction(tr, tenant)));
} else {
TenantMapEntry _entry = wait(MetaclusterAPI::getTenant(self->mvDb, tenant));
entry = _entry;
wait(store(entry, MetaclusterAPI::getTenant(self->mvDb, tenant)));
}
return entry;
@ -977,7 +974,7 @@ struct TenantManagementWorkload : TestWorkload {
loop {
try {
// Get the tenant metadata and check that it matches our local state
state TenantMapEntry entry = wait(getImpl(tr, tenant, operationType, self));
state TenantMapEntry entry = wait(getTenantImpl(tr, tenant, operationType, self));
ASSERT(alreadyExists);
ASSERT(entry.id == tenantData.id);
ASSERT(entry.tenantGroup == tenantData.tenantGroup);
@ -1014,7 +1011,7 @@ struct TenantManagementWorkload : TestWorkload {
}
// Gets a list of tenants using the specified operation type
ACTOR static Future<std::vector<std::pair<TenantName, TenantMapEntry>>> listImpl(
ACTOR static Future<std::vector<std::pair<TenantName, TenantMapEntry>>> listTenantsImpl(
Reference<ReadYourWritesTransaction> tr,
TenantName beginTenant,
TenantName endTenant,
@ -1031,18 +1028,12 @@ struct TenantManagementWorkload : TestWorkload {
TenantManagementWorkload::jsonToTenantMapEntry(result.value)));
}
} else if (operationType == OperationType::MANAGEMENT_DATABASE) {
std::vector<std::pair<TenantName, TenantMapEntry>> _tenants =
wait(TenantAPI::listTenants(self->dataDb.getReference(), beginTenant, endTenant, limit));
tenants = _tenants;
wait(store(tenants, TenantAPI::listTenants(self->dataDb.getReference(), beginTenant, endTenant, limit)));
} else if (operationType == OperationType::MANAGEMENT_TRANSACTION) {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
std::vector<std::pair<TenantName, TenantMapEntry>> _tenants =
wait(TenantAPI::listTenantsTransaction(tr, beginTenant, endTenant, limit));
tenants = _tenants;
wait(store(tenants, TenantAPI::listTenantsTransaction(tr, beginTenant, endTenant, limit)));
} else {
std::vector<std::pair<TenantName, TenantMapEntry>> _tenants =
wait(MetaclusterAPI::listTenants(self->mvDb, beginTenant, endTenant, limit));
tenants = _tenants;
wait(store(tenants, MetaclusterAPI::listTenants(self->mvDb, beginTenant, endTenant, limit)));
}
return tenants;
@ -1064,7 +1055,7 @@ struct TenantManagementWorkload : TestWorkload {
try {
// Attempt to read the chosen list of tenants
state std::vector<std::pair<TenantName, TenantMapEntry>> tenants =
wait(listImpl(tr, beginTenant, endTenant, limit, operationType, self));
wait(listTenantsImpl(tr, beginTenant, endTenant, limit, operationType, self));
// Attempting to read the list of tenants using the metacluster API in a non-metacluster should
// return nothing in this test
@ -1154,13 +1145,13 @@ struct TenantManagementWorkload : TestWorkload {
return Void();
}
ACTOR static Future<Void> renameImpl(Reference<ReadYourWritesTransaction> tr,
OperationType operationType,
std::map<TenantName, TenantName> tenantRenames,
bool tenantNotFound,
bool tenantExists,
bool tenantOverlap,
TenantManagementWorkload* self) {
ACTOR static Future<Void> renameTenantImpl(Reference<ReadYourWritesTransaction> tr,
OperationType operationType,
std::map<TenantName, TenantName> tenantRenames,
bool tenantNotFound,
bool tenantExists,
bool tenantOverlap,
TenantManagementWorkload* self) {
if (operationType == OperationType::SPECIAL_KEYS) {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
for (auto& iter : tenantRenames) {
@ -1233,7 +1224,8 @@ struct TenantManagementWorkload : TestWorkload {
loop {
try {
wait(renameImpl(tr, operationType, tenantRenames, tenantNotFound, tenantExists, tenantOverlap, self));
wait(renameTenantImpl(
tr, operationType, tenantRenames, tenantNotFound, tenantExists, tenantOverlap, self));
wait(verifyTenantRenames(self, tenantRenames));
// Check that using the wrong rename API fails depending on whether we are using a metacluster
ASSERT(self->useMetacluster == (operationType == OperationType::METACLUSTER));
@ -1287,12 +1279,12 @@ struct TenantManagementWorkload : TestWorkload {
}
// Changes the configuration of a tenant
ACTOR static Future<Void> configureImpl(Reference<ReadYourWritesTransaction> tr,
TenantName tenant,
std::map<Standalone<StringRef>, Optional<Value>> configParameters,
OperationType operationType,
bool specialKeysUseInvalidTuple,
TenantManagementWorkload* self) {
ACTOR static Future<Void> configureTenantImpl(Reference<ReadYourWritesTransaction> tr,
TenantName tenant,
std::map<Standalone<StringRef>, Optional<Value>> configParameters,
OperationType operationType,
bool specialKeysUseInvalidTuple,
TenantManagementWorkload* self) {
if (operationType == OperationType::SPECIAL_KEYS) {
tr->setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES);
for (auto const& [config, value] : configParameters) {
@ -1372,7 +1364,7 @@ struct TenantManagementWorkload : TestWorkload {
loop {
try {
wait(configureImpl(tr, tenant, configuration, operationType, specialKeysUseInvalidTuple, self));
wait(configureTenantImpl(tr, tenant, configuration, operationType, specialKeysUseInvalidTuple, self));
ASSERT(exists);
ASSERT(!hasInvalidOption);
@ -1421,6 +1413,164 @@ struct TenantManagementWorkload : TestWorkload {
}
}
// Gets the metadata for a tenant group using the specified operation type
ACTOR static Future<Optional<TenantGroupEntry>> getTenantGroupImpl(Reference<ReadYourWritesTransaction> tr,
TenantGroupName tenant,
OperationType operationType,
TenantManagementWorkload* self) {
state Optional<TenantGroupEntry> entry;
if (operationType == OperationType::MANAGEMENT_DATABASE) {
wait(store(entry, TenantAPI::tryGetTenantGroup(self->dataDb.getReference(), tenant)));
} else if (operationType == OperationType::MANAGEMENT_TRANSACTION ||
operationType == OperationType::SPECIAL_KEYS) {
// There is no special-keys interface for reading tenant groups currently, so read them
// using the TenantAPI.
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
wait(store(entry, TenantAPI::tryGetTenantGroupTransaction(tr, tenant)));
} else {
wait(store(entry, MetaclusterAPI::tryGetTenantGroup(self->mvDb, tenant)));
}
return entry;
}
ACTOR static Future<Void> getTenantGroup(TenantManagementWorkload* self) {
state TenantGroupName tenantGroup = self->chooseTenantGroup(true, false).get();
state OperationType operationType = self->randomOperationType();
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->dataDb);
// True if the tenant group should should exist and return a result
auto itr = self->createdTenantGroups.find(tenantGroup);
state bool alreadyExists = itr != self->createdTenantGroups.end() &&
!(operationType == OperationType::METACLUSTER && !self->useMetacluster);
loop {
try {
// Get the tenant group metadata and check that it matches our local state
state Optional<TenantGroupEntry> entry = wait(getTenantGroupImpl(tr, tenantGroup, operationType, self));
ASSERT(alreadyExists == entry.present());
if (entry.present()) {
ASSERT(entry.get().assignedCluster.present() == (operationType == OperationType::METACLUSTER));
}
return Void();
} catch (Error& e) {
state bool retry = false;
state Error error = e;
// Transaction-based operations should retry
if (operationType == OperationType::MANAGEMENT_TRANSACTION ||
operationType == OperationType::SPECIAL_KEYS) {
try {
wait(tr->onError(e));
retry = true;
} catch (Error& e) {
error = e;
retry = false;
}
}
if (!retry) {
TraceEvent(SevError, "GetTenantGroupFailure").error(error).detail("TenantGroupName", tenantGroup);
return Void();
}
}
}
}
// Gets a list of tenant groups using the specified operation type
ACTOR static Future<std::vector<std::pair<TenantGroupName, TenantGroupEntry>>> listTenantGroupsImpl(
Reference<ReadYourWritesTransaction> tr,
TenantGroupName beginTenantGroup,
TenantGroupName endTenantGroup,
int limit,
OperationType operationType,
TenantManagementWorkload* self) {
state std::vector<std::pair<TenantGroupName, TenantGroupEntry>> tenantGroups;
if (operationType == OperationType::MANAGEMENT_DATABASE) {
wait(store(
tenantGroups,
TenantAPI::listTenantGroups(self->dataDb.getReference(), beginTenantGroup, endTenantGroup, limit)));
} else if (operationType == OperationType::MANAGEMENT_TRANSACTION ||
operationType == OperationType::SPECIAL_KEYS) {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
wait(store(tenantGroups,
TenantAPI::listTenantGroupsTransaction(tr, beginTenantGroup, endTenantGroup, limit)));
} else {
wait(store(tenantGroups,
MetaclusterAPI::listTenantGroups(self->mvDb, beginTenantGroup, endTenantGroup, limit)));
}
return tenantGroups;
}
ACTOR static Future<Void> listTenantGroups(TenantManagementWorkload* self) {
state TenantGroupName beginTenantGroup = self->chooseTenantGroup(false, false).get();
state TenantGroupName endTenantGroup = self->chooseTenantGroup(false, false).get();
state int limit = std::min(CLIENT_KNOBS->MAX_TENANTS_PER_CLUSTER + 1,
deterministicRandom()->randomInt(1, self->maxTenants * 2));
state OperationType operationType = self->randomOperationType();
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(self->dataDb);
if (beginTenantGroup > endTenantGroup) {
std::swap(beginTenantGroup, endTenantGroup);
}
loop {
try {
// Attempt to read the chosen list of tenant groups
state std::vector<std::pair<TenantGroupName, TenantGroupEntry>> tenantGroups =
wait(listTenantGroupsImpl(tr, beginTenantGroup, endTenantGroup, limit, operationType, self));
// Attempting to read the list of tenant groups using the metacluster API in a non-metacluster should
// return nothing in this test
if (operationType == OperationType::METACLUSTER && !self->useMetacluster) {
ASSERT(tenantGroups.size() == 0);
return Void();
}
ASSERT(tenantGroups.size() <= limit);
// Compare the resulting tenant list to the list we expected to get
auto localItr = self->createdTenantGroups.lower_bound(beginTenantGroup);
auto tenantMapItr = tenantGroups.begin();
for (; tenantMapItr != tenantGroups.end(); ++tenantMapItr, ++localItr) {
ASSERT(localItr != self->createdTenantGroups.end());
ASSERT(localItr->first == tenantMapItr->first);
}
// Make sure the list terminated at the right spot
ASSERT(tenantGroups.size() == limit || localItr == self->createdTenantGroups.end() ||
localItr->first >= endTenantGroup);
return Void();
} catch (Error& e) {
state bool retry = false;
state Error error = e;
// Transaction-based operations need to be retried
if (operationType == OperationType::MANAGEMENT_TRANSACTION ||
operationType == OperationType::SPECIAL_KEYS) {
try {
retry = true;
wait(tr->onError(e));
} catch (Error& e) {
error = e;
retry = false;
}
}
if (!retry) {
TraceEvent(SevError, "ListTenantGroupFailure")
.error(error)
.detail("BeginTenant", beginTenantGroup)
.detail("EndTenant", endTenantGroup);
return Void();
}
}
}
}
Future<Void> start(Database const& cx) override {
if (clientId == 0 || !singleClient) {
return _start(cx, this);
@ -1434,7 +1584,7 @@ struct TenantManagementWorkload : TestWorkload {
// Run a random sequence of tenant management operations for the duration of the test
while (now() < start + self->testDuration) {
state int operation = deterministicRandom()->randomInt(0, 6);
state int operation = deterministicRandom()->randomInt(0, 8);
if (operation == 0) {
wait(createTenant(self));
} else if (operation == 1) {
@ -1447,6 +1597,10 @@ struct TenantManagementWorkload : TestWorkload {
wait(renameTenant(self));
} else if (operation == 5) {
wait(configureTenant(self));
} else if (operation == 6) {
wait(getTenantGroup(self));
} else if (operation == 7) {
wait(listTenantGroups(self));
}
}

View File

@ -38,6 +38,14 @@ target_link_libraries(flowlinktest PRIVATE flow stacktrace)
# message(STATUS "ZLIB package not found")
#endif()
# TODO: Limit ZSTD availability to CLANG, for gcc resolve library link ordering
if (CLANG)
include(CompileZstd)
compile_zstd()
target_link_libraries(flow PUBLIC ZSTD::ZSTD)
target_compile_definitions(flow PUBLIC ZSTD_LIB_SUPPORTED)
endif()
foreach(ft flow flow_sampling flowlinktest)
target_include_directories(${ft} PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include")

View File

@ -21,6 +21,7 @@
#include "flow/CompressionUtils.h"
#include "flow/Arena.h"
#include "flow/Error.h"
#include "flow/IRandom.h"
#include "flow/UnitTest.h"
@ -29,9 +30,32 @@
#include <boost/iostreams/filter/gzip.hpp>
#endif
#include <boost/iostreams/filtering_streambuf.hpp>
#ifdef ZSTD_LIB_SUPPORTED
#include <boost/iostreams/filter/zstd.hpp>
#endif
#include <sstream>
namespace {
std::unordered_set<CompressionFilter> getSupportedFilters() {
std::unordered_set<CompressionFilter> filters;
filters.insert(CompressionFilter::NONE);
#ifdef ZLIB_LIB_SUPPORTED
filters.insert(CompressionFilter::GZIP);
#endif
#ifdef ZSTD_LIB_SUPPORTED
filters.insert(CompressionFilter::ZSTD);
#endif
ASSERT_GE(filters.size(), 1);
return filters;
}
} // namespace
std::unordered_set<CompressionFilter> CompressionUtils::supportedFilters = getSupportedFilters();
StringRef CompressionUtils::compress(const CompressionFilter filter, const StringRef& data, Arena& arena) {
checkFilterSupported(filter);
if (filter == CompressionFilter::NONE) {
return StringRef(arena, data);
}
@ -42,11 +66,17 @@ StringRef CompressionUtils::compress(const CompressionFilter filter, const Strin
return CompressionUtils::compress(filter, data, bio::gzip::default_compression, arena);
}
#endif
throw not_implemented();
#ifdef ZSTD_LIB_SUPPORTED
if (filter == CompressionFilter::ZSTD) {
return CompressionUtils::compress(filter, data, bio::zstd::default_compression, arena);
}
#endif
throw internal_error(); // We should never get here
}
StringRef CompressionUtils::compress(const CompressionFilter filter, const StringRef& data, int level, Arena& arena) {
ASSERT(filter < CompressionFilter::LAST);
checkFilterSupported(filter);
if (filter == CompressionFilter::NONE) {
return StringRef(arena, data);
@ -62,6 +92,12 @@ StringRef CompressionUtils::compress(const CompressionFilter filter, const Strin
out.push(bio::gzip_compressor(bio::gzip_params(level)));
}
#endif
#ifdef ZSTD_LIB_SUPPORTED
if (filter == CompressionFilter::ZSTD) {
out.push(bio::zstd_compressor(bio::zstd_params(level)));
}
#endif
out.push(decomStream);
bio::copy(out, compStream);
@ -69,7 +105,7 @@ StringRef CompressionUtils::compress(const CompressionFilter filter, const Strin
}
StringRef CompressionUtils::decompress(const CompressionFilter filter, const StringRef& data, Arena& arena) {
ASSERT(filter < CompressionFilter::LAST);
checkFilterSupported(filter);
if (filter == CompressionFilter::NONE) {
return StringRef(arena, data);
@ -85,15 +121,102 @@ StringRef CompressionUtils::decompress(const CompressionFilter filter, const Str
out.push(bio::gzip_decompressor());
}
#endif
#ifdef ZSTD_LIB_SUPPORTED
if (filter == CompressionFilter::ZSTD) {
out.push(bio::zstd_decompressor());
}
#endif
out.push(compStream);
bio::copy(out, decompStream);
return StringRef(arena, decompStream.str());
}
int CompressionUtils::getDefaultCompressionLevel(CompressionFilter filter) {
checkFilterSupported(filter);
if (filter == CompressionFilter::NONE) {
return -1;
}
#ifdef ZLIB_LIB_SUPPORTED
if (filter == CompressionFilter::GZIP) {
// opt for high speed compression, larger levels have a high cpu cost and not much compression ratio
// improvement, according to benchmarks
// return boost::iostream::gzip::default_compression;
// return boost::iostream::gzip::best_compression;
return boost::iostreams::gzip::best_speed;
}
#endif
#ifdef ZSTD_LIB_SUPPORTED
if (filter == CompressionFilter::ZSTD) {
// opt for high speed compression, larger levels have a high cpu cost and not much compression ratio
// improvement, according to benchmarks
// return boost::iostreams::zstd::default_compression;
// return boost::iostreams::zstd::best_compression;
return boost::iostreams::zstd::best_speed;
}
#endif
throw internal_error(); // We should never get here
}
CompressionFilter CompressionUtils::getRandomFilter() {
ASSERT_GE(supportedFilters.size(), 1);
std::vector<CompressionFilter> filters;
filters.insert(filters.end(), CompressionUtils::supportedFilters.begin(), CompressionUtils::supportedFilters.end());
ASSERT_GE(filters.size(), 1);
CompressionFilter res;
if (filters.size() == 1) {
res = filters[0];
} else {
int idx = deterministicRandom()->randomInt(0, filters.size());
res = filters[idx];
}
ASSERT(supportedFilters.find(res) != supportedFilters.end());
return res;
}
// Only used to link unit tests
void forceLinkCompressionUtilsTest() {}
namespace {
void testCompression(CompressionFilter filter) {
Arena arena;
const int size = deterministicRandom()->randomInt(512, 1024);
Standalone<StringRef> uncompressed = makeString(size);
deterministicRandom()->randomBytes(mutateString(uncompressed), size);
Standalone<StringRef> compressed = CompressionUtils::compress(filter, uncompressed, arena);
ASSERT_NE(compressed.compare(uncompressed), 0);
StringRef verify = CompressionUtils::decompress(filter, compressed, arena);
ASSERT_EQ(verify.compare(uncompressed), 0);
}
void testCompression2(CompressionFilter filter) {
Arena arena;
const int size = deterministicRandom()->randomInt(512, 1024);
std::string s(size, 'x');
Standalone<StringRef> uncompressed = Standalone<StringRef>(StringRef(s));
printf("Size before: %d\n", (int)uncompressed.size());
Standalone<StringRef> compressed = CompressionUtils::compress(filter, uncompressed, arena);
ASSERT_NE(compressed.compare(uncompressed), 0);
printf("Size after: %d\n", (int)compressed.size());
// Assert compressed size is less than half.
ASSERT(compressed.size() * 2 < uncompressed.size());
StringRef verify = CompressionUtils::decompress(filter, compressed, arena);
ASSERT_EQ(verify.compare(uncompressed), 0);
}
} // namespace
TEST_CASE("/CompressionUtils/noCompression") {
Arena arena;
const int size = deterministicRandom()->randomInt(512, 1024);
@ -106,46 +229,38 @@ TEST_CASE("/CompressionUtils/noCompression") {
StringRef verify = CompressionUtils::decompress(CompressionFilter::NONE, compressed, arena);
ASSERT_EQ(verify.compare(uncompressed), 0);
TraceEvent("NoCompression_Done").log();
TraceEvent("NoCompressionDone");
return Void();
}
#ifdef ZLIB_LIB_SUPPORTED
TEST_CASE("/CompressionUtils/gzipCompression") {
Arena arena;
const int size = deterministicRandom()->randomInt(512, 1024);
Standalone<StringRef> uncompressed = makeString(size);
deterministicRandom()->randomBytes(mutateString(uncompressed), size);
Standalone<StringRef> compressed = CompressionUtils::compress(CompressionFilter::GZIP, uncompressed, arena);
ASSERT_NE(compressed.compare(uncompressed), 0);
StringRef verify = CompressionUtils::decompress(CompressionFilter::GZIP, compressed, arena);
ASSERT_EQ(verify.compare(uncompressed), 0);
TraceEvent("GzipCompression_Done").log();
testCompression(CompressionFilter::GZIP);
TraceEvent("GzipCompressionDone");
return Void();
}
TEST_CASE("/CompressionUtils/gzipCompression2") {
Arena arena;
const int size = deterministicRandom()->randomInt(512, 1024);
std::string s(size, 'x');
Standalone<StringRef> uncompressed = Standalone<StringRef>(StringRef(s));
printf("Size before: %d\n", (int)uncompressed.size());
Standalone<StringRef> compressed = CompressionUtils::compress(CompressionFilter::GZIP, uncompressed, arena);
ASSERT_NE(compressed.compare(uncompressed), 0);
printf("Size after: %d\n", (int)compressed.size());
// Assert compressed size is less than half.
ASSERT(compressed.size() * 2 < uncompressed.size());
StringRef verify = CompressionUtils::decompress(CompressionFilter::GZIP, compressed, arena);
ASSERT_EQ(verify.compare(uncompressed), 0);
TraceEvent("GzipCompression_Done").log();
testCompression2(CompressionFilter::GZIP);
TraceEvent("GzipCompression2Done");
return Void();
}
#endif
#ifdef ZSTD_LIB_SUPPORTED
TEST_CASE("/CompressionUtils/zstdCompression") {
testCompression(CompressionFilter::ZSTD);
TraceEvent("ZstdCompressionDone");
return Void();
}
TEST_CASE("/CompressionUtils/zstdCompression2") {
testCompression2(CompressionFilter::ZSTD);
TraceEvent("ZstdCompression2Done");
return Void();
}

View File

@ -24,11 +24,12 @@
#include "flow/Arena.h"
#include <unordered_set>
enum class CompressionFilter {
NONE,
#ifdef ZLIB_LIB_SUPPORTED
GZIP,
#endif
ZSTD,
LAST // Always the last member
};
@ -37,16 +38,17 @@ struct CompressionUtils {
static StringRef compress(const CompressionFilter filter, const StringRef& data, int level, Arena& arena);
static StringRef decompress(const CompressionFilter filter, const StringRef& data, Arena& arena);
static int getDefaultCompressionLevel(CompressionFilter filter);
static CompressionFilter getRandomFilter();
static CompressionFilter fromFilterString(const std::string& filter) {
if (filter == "NONE") {
return CompressionFilter::NONE;
}
#ifdef ZLIB_LIB_SUPPORTED
else if (filter == "GZIP") {
} else if (filter == "GZIP") {
return CompressionFilter::GZIP;
}
#endif
else {
} else if (filter == "ZSTD") {
return CompressionFilter::ZSTD;
} else {
throw not_implemented();
}
}
@ -54,16 +56,22 @@ struct CompressionUtils {
static std::string toString(const CompressionFilter filter) {
if (filter == CompressionFilter::NONE) {
return "NONE";
}
#ifdef ZLIB_LIB_SUPPORTED
else if (filter == CompressionFilter::GZIP) {
} else if (filter == CompressionFilter::GZIP) {
return "GZP";
}
#endif
else {
} else if (filter == CompressionFilter::ZSTD) {
return "ZSTD";
} else {
throw not_implemented();
}
}
static void checkFilterSupported(const CompressionFilter filter) {
if (CompressionUtils::supportedFilters.find(filter) == CompressionUtils::supportedFilters.end()) {
throw not_implemented();
}
}
static std::unordered_set<CompressionFilter> supportedFilters;
};
#endif // FLOW_COMPRRESSION_UTILS_H

View File

@ -0,0 +1,358 @@
/*
* PriorityMultiLock.actor.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
// When actually compiled (NO_INTELLISENSE), include the generated version of this file. In intellisense use the source
// version.
#if defined(NO_INTELLISENSE) && !defined(FLOW_PRIORITYMULTILOCK_ACTOR_G_H)
#define FLOW_PRIORITYMULTILOCK_ACTOR_G_H
#include "flow/PriorityMultiLock.actor.g.h"
#elif !defined(PRIORITYMULTILOCK_ACTOR_H)
#define PRIORITYMULTILOCK_ACTOR_H
#include "flow/flow.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#define PRIORITYMULTILOCK_DEBUG 0
#if PRIORITYMULTILOCK_DEBUG
#define pml_debug_printf(...) printf(__VA_ARGS__)
#else
#define pml_debug_printf(...)
#endif
// A multi user lock with a concurrent holder limit where waiters request a lock with a priority
// id and are granted locks based on a total concurrency and relative importants of the priority
// ids defined.
//
// Scheduling logic
// launchLimits[n] = configured amount from the launchLimit vector for priority n
// waiters[n] = the number of waiters for priority n
// runnerCounts[n] = number of runners at priority n
//
// totalActiveLaunchLimits = sum of limits for all priorities with waiters
// When waiters[n] becomes == 0, totalActiveLaunchLimits -= launchLimits[n]
// When waiters[n] becomes > 0, totalActiveLaunchLimits += launchLimits[n]
//
// The total capacity of a priority to be considered when launching tasks is
// ceil(launchLimits[n] / totalLimits * concurrency)
//
// The interface is similar to FlowMutex except that lock holders can just drop the lock to release it.
//
// Usage:
// Lock lock = wait(prioritylock.lock(priorityLevel));
// lock.release(); // Explicit release, or
// // let lock and all copies of lock go out of scope to release
class PriorityMultiLock {
public:
// Waiting on the lock returns a Lock, which is really just a Promise<Void>
// Calling release() is not necessary, it exists in case the Lock holder wants to explicitly release
// the Lock before it goes out of scope.
struct Lock {
void release() { promise.send(Void()); }
// This is exposed in case the caller wants to use/copy it directly
Promise<Void> promise;
};
PriorityMultiLock(int concurrency, std::string launchLimits)
: PriorityMultiLock(concurrency, parseStringToVector<int>(launchLimits, ',')) {}
PriorityMultiLock(int concurrency, std::vector<int> launchLimitsByPriority)
: concurrency(concurrency), available(concurrency), waiting(0), totalActiveLaunchLimits(0) {
priorities.resize(launchLimitsByPriority.size());
for (int i = 0; i < priorities.size(); ++i) {
priorities[i].launchLimit = launchLimitsByPriority[i];
}
fRunner = runner(this);
}
~PriorityMultiLock() {}
Future<Lock> lock(int priority = 0) {
Priority& p = priorities[priority];
Queue& q = p.queue;
Waiter w;
// If this priority currently has no waiters
if (q.empty()) {
// Add this priority's launch limit to totalLimits
totalActiveLaunchLimits += p.launchLimit;
// If there are slots available and the priority has capacity then don't make the caller wait
if (available > 0 && p.runners < currentCapacity(p.launchLimit)) {
// Remove this priority's launch limit from the total since it will remain empty
totalActiveLaunchLimits -= p.launchLimit;
// Return a Lock to the caller
Lock lock;
addRunner(lock, &p);
pml_debug_printf("lock nowait line %d priority %d %s\n", __LINE__, priority, toString().c_str());
return lock;
}
}
q.push_back(w);
++waiting;
pml_debug_printf("lock wait line %d priority %d %s\n", __LINE__, priority, toString().c_str());
return w.lockPromise.getFuture();
}
void kill() {
for (int i = 0; i < runners.size(); ++i) {
if (!runners[i].isReady()) {
runners[i].cancel();
}
}
runners.clear();
brokenOnDestruct.sendError(broken_promise());
waiting = 0;
priorities.clear();
}
std::string toString() const {
int runnersDone = 0;
for (int i = 0; i < runners.size(); ++i) {
if (runners[i].isReady()) {
++runnersDone;
}
}
std::string s =
format("{ ptr=%p concurrency=%d available=%d running=%d waiting=%d runnersQueue=%d runnersDone=%d ",
this,
concurrency,
available,
concurrency - available,
waiting,
runners.size(),
runnersDone);
for (int i = 0; i < priorities.size(); ++i) {
s += format("p%d:{%s} ", i, priorities[i].toString(this).c_str());
}
s += "}";
return s;
}
int maxPriority() const { return priorities.size() - 1; }
int totalWaiters() const { return waiting; }
int numWaiters(const unsigned int priority) const {
ASSERT(priority < priorities.size());
return priorities[priority].queue.size();
}
int totalRunners() const { return concurrency - available; }
int numRunners(const unsigned int priority) const {
ASSERT(priority < priorities.size());
return priorities[priority].runners;
}
private:
struct Waiter {
Waiter() : queuedTime(now()) {}
Promise<Lock> lockPromise;
double queuedTime;
};
// Total execution slots allowed across all priorities
int concurrency;
// Current available execution slots
int available;
// Total waiters across all priorities
int waiting;
// Sum of launch limits for all priorities with 1 or more waiters
int totalActiveLaunchLimits;
typedef Deque<Waiter> Queue;
struct Priority {
Priority() : runners(0), launchLimit(0) {}
// Queue of waiters at this priority
Queue queue;
// Number of runners at this priority
int runners;
// Configured launch limit for this priority
int launchLimit;
std::string toString(const PriorityMultiLock* pml) const {
return format("limit=%d run=%d wait=%d cap=%d",
launchLimit,
runners,
queue.size(),
queue.empty() ? 0 : pml->currentCapacity(launchLimit));
}
};
std::vector<Priority> priorities;
// Current or recent (ended) runners
Deque<Future<Void>> runners;
Future<Void> fRunner;
AsyncTrigger wakeRunner;
Promise<Void> brokenOnDestruct;
ACTOR static Future<Void> handleRelease(PriorityMultiLock* self, Future<Void> f, Priority* priority) {
try {
wait(f);
} catch (Error& e) {
}
++self->available;
priority->runners -= 1;
pml_debug_printf("lock release line %d priority %d %s\n",
__LINE__,
(int)(priority - &self->priorities.front()),
self->toString().c_str());
// If there are any waiters or if the runners array is getting large, trigger the runner loop
if (self->waiting > 0 || self->runners.size() > 1000) {
self->wakeRunner.trigger();
}
return Void();
}
void addRunner(Lock& lock, Priority* p) {
p->runners += 1;
--available;
runners.push_back(handleRelease(this, lock.promise.getFuture(), p));
}
// Current maximum running tasks for the specified priority, which must have waiters
// or the result is undefined
int currentCapacity(int launchLimit) const {
// The total concurrency allowed for this priority at present is the total concurrency times
// priority's launch limit divided by the total launch limits for all priorities with waiters.
return ceil((float)launchLimit / totalActiveLaunchLimits * concurrency);
}
ACTOR static Future<Void> runner(PriorityMultiLock* self) {
state int sinceYield = 0;
state Future<Void> error = self->brokenOnDestruct.getFuture();
// Priority to try to run tasks from next
state int priority = 0;
loop {
pml_debug_printf(
"runner loop start line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
// Cleanup finished runner futures at the front of the runner queue.
while (!self->runners.empty() && self->runners.front().isReady()) {
self->runners.pop_front();
}
// Wait for a runner to release its lock
pml_debug_printf(
"runner loop waitTrigger line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
wait(self->wakeRunner.onTrigger());
pml_debug_printf(
"runner loop wake line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
if (++sinceYield == 100) {
sinceYield = 0;
pml_debug_printf(
" runner waitDelay line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
wait(delay(0));
pml_debug_printf(
" runner afterDelay line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
}
// While there are available slots and there are waiters, launch tasks
while (self->available > 0 && self->waiting > 0) {
pml_debug_printf(
" launch loop start line %d priority=%d %s\n", __LINE__, priority, self->toString().c_str());
Priority* pPriority;
// Find the next priority with waiters and capacity. There must be at least one.
loop {
// Rotate to next priority
if (++priority == self->priorities.size()) {
priority = 0;
}
pPriority = &self->priorities[priority];
pml_debug_printf(" launch loop scan line %d priority=%d %s\n",
__LINE__,
priority,
self->toString().c_str());
if (!pPriority->queue.empty() &&
pPriority->runners < self->currentCapacity(pPriority->launchLimit)) {
break;
}
}
Queue& queue = pPriority->queue;
Waiter w = queue.front();
queue.pop_front();
// If this priority is now empty, subtract its launch limit from totalLimits
if (queue.empty()) {
self->totalActiveLaunchLimits -= pPriority->launchLimit;
pml_debug_printf(" emptied priority line %d priority=%d %s\n",
__LINE__,
priority,
self->toString().c_str());
}
--self->waiting;
Lock lock;
w.lockPromise.send(lock);
// Self may have been destructed during the lock callback
if (error.isReady()) {
throw error.getError();
}
// If the lock was not already released, add it to the runners future queue
if (lock.promise.canBeSet()) {
self->addRunner(lock, pPriority);
}
pml_debug_printf(" launched line %d alreadyDone=%d priority=%d %s\n",
__LINE__,
!lock.promise.canBeSet(),
priority,
self->toString().c_str());
}
}
}
};
#include "flow/unactorcompiler.h"
#endif

View File

@ -100,6 +100,21 @@ T sorted(T range) {
return range;
}
template <class T>
std::vector<T> parseStringToVector(std::string str, char delim) {
std::vector<T> result;
std::stringstream stream(str);
std::string token;
while (stream.good()) {
getline(stream, token, delim);
std::istringstream tokenStream(token);
T item;
tokenStream >> item;
result.push_back(item);
}
return result;
}
template <class T>
ErrorOr<T> errorOr(T t) {
return ErrorOr<T>(t);

View File

@ -0,0 +1,87 @@
/*
* BenchStream.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "benchmark/benchmark.h"
#include "flow/flow.h"
#include "flow/ThreadHelper.actor.h"
#include "flow/PriorityMultiLock.actor.h"
#include <deque>
#include "flow/actorcompiler.h" // This must be the last #include.
ACTOR static Future<Void> benchPriorityMultiLock(benchmark::State* benchState) {
state std::vector<int> priorities;
// Set up priority list with limits 10, 20, 30, ...
while (priorities.size() < benchState->range(0)) {
priorities.push_back(10 * (priorities.size() + 1));
}
state int concurrency = priorities.size() * 10;
state PriorityMultiLock* pml = new PriorityMultiLock(concurrency, priorities);
state std::vector<int> counts;
counts.resize(priorities.size(), 0);
// Clog the lock buy taking concurrency locks
state std::deque<Future<PriorityMultiLock::Lock>> lockFutures;
for (int j = 0; j < concurrency; ++j) {
lockFutures.push_back(pml->lock(j % priorities.size()));
}
// Wait for all of the initial locks to be taken
// This will work regardless of their priorities as there are only n = concurrency of them
wait(waitForAll(std::vector<Future<PriorityMultiLock::Lock>>(lockFutures.begin(), lockFutures.end())));
// For each iteration of the loop, one new lock user is created, for a total of
// concurrency + 1 users. The new user replaces an old one, which is then waited
// on. This will succeed regardless of the lock priorities used because prior to
// new user there were only n = concurrency users so they will all be served before
// the new user.
state int p = 0;
state int i = 0;
while (benchState->KeepRunning()) {
// Get and replace the i'th lock future with a new lock waiter
Future<PriorityMultiLock::Lock> f = lockFutures[i];
lockFutures[i] = pml->lock(p);
PriorityMultiLock::Lock lock = wait(f);
// Rotate to another priority
if (++p == priorities.size()) {
p = 0;
}
// Rotate to next lock index
if (++i == lockFutures.size()) {
i = 0;
}
}
benchState->SetItemsProcessed(static_cast<long>(benchState->iterations()));
delete pml;
return Void();
}
static void bench_priorityMultiLock(benchmark::State& benchState) {
onMainThread([&benchState]() { return benchPriorityMultiLock(&benchState); }).blockUntilReady();
}
BENCHMARK(bench_priorityMultiLock)->DenseRange(1, 8)->ReportAggregatesOnly(true);

View File

@ -402,8 +402,13 @@ if(WITH_PYTHON)
endif()
endif()
if (NOT WIN32)
if (NOT OPEN_FOR_IDE AND NOT WIN32 AND NOT USE_SANITIZER)
# setup venv for testing token-based authorization
if (APPLE)
set(ld_env_name "DYLD_LIBRARY_PATH")
else()
set(ld_env_name "LD_LIBRARY_PATH")
endif()
set(authz_venv_dir ${CMAKE_CURRENT_BINARY_DIR}/authorization_test_venv)
set(authz_venv_activate ". ${authz_venv_dir}/bin/activate")
set(authz_venv_stamp_file ${authz_venv_dir}/venv.ready)
@ -422,15 +427,13 @@ if(WITH_PYTHON)
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
set_tests_properties(authorization_venv_setup PROPERTIES FIXTURES_SETUP authz_virtual_env TIMEOUT 60)
set(authz_script_dir ${CMAKE_SOURCE_DIR}/tests/authorization)
set(authz_test_cmd "")
string(APPEND authz_test_cmd "${authz_venv_activate} && ")
string(APPEND authz_test_cmd "LD_LIBRARY_PATH=${CMAKE_BINARY_DIR}/lib pytest ${authz_script_dir}/authz_test.py -rA --build-dir ${CMAKE_BINARY_DIR} -vvv")
set(authz_script_dir ${CMAKE_SOURCE_DIR}/tests/authorization)
set(authz_test_cmd "${authz_venv_activate} && pytest ${authz_script_dir}/authz_test.py -rA --build-dir ${CMAKE_BINARY_DIR} -vvv")
add_test(
NAME token_based_tenant_authorization
WORKING_DIRECTORY ${authz_script_dir}
WORKING_DIRECTORY ${authz_venv_dir}
COMMAND bash -c ${authz_test_cmd})
set_tests_properties(token_based_tenant_authorization PROPERTIES ENVIRONMENT PYTHONPATH=${CMAKE_SOURCE_DIR}/tests/TestRunner) # (local|tmp)_cluster.py
set_tests_properties(token_based_tenant_authorization PROPERTIES ENVIRONMENT "PYTHONPATH=${CMAKE_SOURCE_DIR}/tests/TestRunner;${ld_env_name}=${CMAKE_BINARY_DIR}/lib")
set_tests_properties(token_based_tenant_authorization PROPERTIES FIXTURES_REQUIRED authz_virtual_env)
set_tests_properties(token_based_tenant_authorization PROPERTIES TIMEOUT 120)
endif()