Merge pull request #2555 from etschannen/master

Merge Release 6.2 into Master
This commit is contained in:
Evan Tschannen 2020-01-16 18:58:07 -08:00 committed by GitHub
commit 725ab1b996
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 542 additions and 361 deletions

View File

@ -347,7 +347,7 @@ bool FDBLibTLSSession::verify_peer() {
if(now() - lastVerifyFailureLogged > 1.0) {
for (std::string reason : verify_failure_reasons) {
lastVerifyFailureLogged = now();
TraceEvent("FDBLibTLSVerifyFailure", uid).detail("Reason", reason);
TraceEvent("FDBLibTLSVerifyFailure", uid).suppressFor(1.0).detail("Reason", reason);
}
}
}

View File

@ -133,10 +133,10 @@ function(strip_debug_symbols target)
COMMENT "Stripping symbols from ${target}")
set(out_files "${out_file}")
if(is_exec AND NOT APPLE)
add_custom_command(OUTPUT "${out_file}.debug"
COMMAND objcopy --only-keep-debug $<TARGET_FILE:${target}> "${out_file}.debug" &&
objcopy --add-gnu-debuglink="${out_file}.debug" ${out_file}
COMMENT "Copy debug symbols to ${out_name}.debug")
add_custom_command(OUTPUT "${out_file}.debug"
COMMAND objcopy --verbose --only-keep-debug $<TARGET_FILE:${target}> "${out_file}.debug"
COMMAND objcopy --verbose --add-gnu-debuglink="${out_file}.debug" "${out_file}"
COMMENT "Copy debug symbols to ${out_name}.debug")
list(APPEND out_files "${out_file}.debug")
endif()
add_custom_target(strip_${target} DEPENDS ${out_files})

View File

@ -492,6 +492,19 @@ If a process has had more than 10 TCP segments retransmitted in the last 5 secon
10.0.4.1:4500 ( 3% cpu; 2% machine; 0.004 Gbps; 0% disk; REXMIT! 2.5 GB / 4.1 GB RAM )
Machine-readable status
--------------------------------
The status command can provide a complete summary of statistics about the cluster and the database with the ``json`` argument. Full documentation for ``status json`` output can be found :doc:`here <mr-status>`.
From the output of ``status json``, operators can find useful health metrics to determine whether or not their cluster is hitting performance limits.
====================== ==============================================================================================================
Ratekeeper limit ``cluster.qos.transactions_per_second_limit`` contains the number of read versions per second that the cluster can give out. A low ratekeeper limit indicates that the cluster performance is limited in some way. The reason for a low ratekeeper limit can be found at ``cluster.qos.performance_limited_by``. ``cluster.qos.released_transactions_per_second`` describes the number of read versions given out per second, and can be used to tell how close the ratekeeper is to throttling.
Storage queue size ``cluster.qos.worst_queue_bytes_storage_server`` contains the maximum size in bytes of a storage queue. Each storage server has mutations that have not yet been made durable, stored in its storage queue. If this value gets too large, it indicates a storage server is falling behind. A large storage queue will cause the ratekeeper to increase throttling. However, depending on the configuration, the ratekeeper can ignore the worst storage queue from one fault domain. Thus, ratekeeper uses ``cluster.qos.limiting_queue_bytes_storage_server`` to determine the throttling level.
Durable version lag ``cluster.qos.worst_durability_lag_storage_server`` contains information about the worst storage server durability lag. The ``versions`` subfield contains the maximum number of versions in a storage queue. Ideally, this should be near 5 million. The ``seconds`` subfield contains the maximum number of seconds of non-durable data in a storage queue. Ideally, this should be near 5 seconds. If a storage server is overwhelmed, the durability lag could rise, causing performance issues.
Transaction log queue ``cluster.qos.worst_queue_bytes_log_server`` contains the maximum size in bytes of the mutations stored on a transaction log that have not yet been popped by storage servers. A large transaction log queue size can potentially cause the ratekeeper to increase throttling.
====================== ==============================================================================================================
.. _administration_fdbmonitor:
``fdbmonitor`` and ``fdbserver``

View File

@ -10,38 +10,38 @@ macOS
The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server.
* `FoundationDB-6.2.11.pkg <https://www.foundationdb.org/downloads/6.2.11/macOS/installers/FoundationDB-6.2.11.pkg>`_
* `FoundationDB-6.2.13.pkg <https://www.foundationdb.org/downloads/6.2.13/macOS/installers/FoundationDB-6.2.13.pkg>`_
Ubuntu
------
The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x.
* `foundationdb-clients-6.2.11-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.11/ubuntu/installers/foundationdb-clients_6.2.11-1_amd64.deb>`_
* `foundationdb-server-6.2.11-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.11/ubuntu/installers/foundationdb-server_6.2.11-1_amd64.deb>`_ (depends on the clients package)
* `foundationdb-clients-6.2.13-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.13/ubuntu/installers/foundationdb-clients_6.2.13-1_amd64.deb>`_
* `foundationdb-server-6.2.13-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.13/ubuntu/installers/foundationdb-server_6.2.13-1_amd64.deb>`_ (depends on the clients package)
RHEL/CentOS EL6
---------------
The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x.
* `foundationdb-clients-6.2.11-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.11/rhel6/installers/foundationdb-clients-6.2.11-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.2.11-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.11/rhel6/installers/foundationdb-server-6.2.11-1.el6.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.2.13-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.13/rhel6/installers/foundationdb-clients-6.2.13-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.2.13-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.13/rhel6/installers/foundationdb-server-6.2.13-1.el6.x86_64.rpm>`_ (depends on the clients package)
RHEL/CentOS EL7
---------------
The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x.
* `foundationdb-clients-6.2.11-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.11/rhel7/installers/foundationdb-clients-6.2.11-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.2.11-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.11/rhel7/installers/foundationdb-server-6.2.11-1.el7.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.2.13-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.13/rhel7/installers/foundationdb-clients-6.2.13-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.2.13-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.13/rhel7/installers/foundationdb-server-6.2.13-1.el7.x86_64.rpm>`_ (depends on the clients package)
Windows
-------
The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server.
* `foundationdb-6.2.11-x64.msi <https://www.foundationdb.org/downloads/6.2.11/windows/installers/foundationdb-6.2.11-x64.msi>`_
* `foundationdb-6.2.13-x64.msi <https://www.foundationdb.org/downloads/6.2.13/windows/installers/foundationdb-6.2.13-x64.msi>`_
API Language Bindings
=====================
@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part
If you need to use the FoundationDB Python API from other Python installations or paths, download the Python package:
* `foundationdb-6.2.11.tar.gz <https://www.foundationdb.org/downloads/6.2.11/bindings/python/foundationdb-6.2.11.tar.gz>`_
* `foundationdb-6.2.13.tar.gz <https://www.foundationdb.org/downloads/6.2.13/bindings/python/foundationdb-6.2.13.tar.gz>`_
Ruby 1.9.3/2.0.0+
-----------------
* `fdb-6.2.11.gem <https://www.foundationdb.org/downloads/6.2.11/bindings/ruby/fdb-6.2.11.gem>`_
* `fdb-6.2.13.gem <https://www.foundationdb.org/downloads/6.2.13/bindings/ruby/fdb-6.2.13.gem>`_
Java 8+
-------
* `fdb-java-6.2.11.jar <https://www.foundationdb.org/downloads/6.2.11/bindings/java/fdb-java-6.2.11.jar>`_
* `fdb-java-6.2.11-javadoc.jar <https://www.foundationdb.org/downloads/6.2.11/bindings/java/fdb-java-6.2.11-javadoc.jar>`_
* `fdb-java-6.2.13.jar <https://www.foundationdb.org/downloads/6.2.13/bindings/java/fdb-java-6.2.13.jar>`_
* `fdb-java-6.2.13-javadoc.jar <https://www.foundationdb.org/downloads/6.2.13/bindings/java/fdb-java-6.2.13-javadoc.jar>`_
Go 1.11+
--------

View File

@ -138,12 +138,14 @@ Default Peer Verification
The default peer verification is ``Check.Valid=1``.
Default Password
^^^^^^^^^^^^^^^^^^^^^^^^^
^^^^^^^^^^^^^^^^
There is no default password. If no password is specified, it is assumed that the private key is unencrypted.
Parameters and client bindings
------------------------------
Permissions
-----------
All files used by TLS must have sufficient read permissions such that the user running the FoundationDB server or client process can access them. It may also be necessary to have similar read permissions on the parent directories of the files used in the TLS configuration.
Automatic TLS certificate refresh
---------------------------------

View File

@ -2587,6 +2587,27 @@ Future<T> stopNetworkAfter( Future<T> what ) {
}
}
ACTOR Future<Void> addInterface( std::map<Key,std::pair<Value,ClientLeaderRegInterface>>* address_interface, Reference<FlowLock> connectLock, KeyValue kv) {
wait(connectLock->take());
state FlowLock::Releaser releaser(*connectLock);
state ClientWorkerInterface workerInterf = BinaryReader::fromStringRef<ClientWorkerInterface>(kv.value, IncludeVersion());
state ClientLeaderRegInterface leaderInterf(workerInterf.address());
choose {
when( Optional<LeaderInfo> rep = wait( brokenPromiseToNever(leaderInterf.getLeader.getReply(GetLeaderRequest())) ) ) {
StringRef ip_port = kv.key.endsWith(LiteralStringRef(":tls")) ? kv.key.removeSuffix(LiteralStringRef(":tls")) : kv.key;
(*address_interface)[ip_port] = std::make_pair(kv.value, leaderInterf);
if(workerInterf.reboot.getEndpoint().addresses.secondaryAddress.present()) {
Key full_ip_port2 = StringRef(workerInterf.reboot.getEndpoint().addresses.secondaryAddress.get().toString());
StringRef ip_port2 = full_ip_port2.endsWith(LiteralStringRef(":tls")) ? full_ip_port2.removeSuffix(LiteralStringRef(":tls")) : full_ip_port2;
(*address_interface)[ip_port2] = std::make_pair(kv.value, leaderInterf);
}
}
when( wait(delay(1.0)) ) {}
}
return Void();
}
ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
state LineNoise& linenoise = *plinenoise;
state bool intrans = false;
@ -2597,7 +2618,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
state bool writeMode = false;
state std::string clusterConnectString;
state std::map<Key,Value> address_interface;
state std::map<Key,std::pair<Value,ClientLeaderRegInterface>> address_interface;
state FdbOptions globalOptions;
state FdbOptions activeOptions;
@ -2990,10 +3011,12 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
getTransaction(db, tr, options, intrans);
if (tokens.size() == 1) {
Standalone<RangeResultRef> kvs = wait( makeInterruptable( tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces"), LiteralStringRef("\xff\xff\xff")), 1) ) );
Reference<FlowLock> connectLock(new FlowLock(CLIENT_KNOBS->CLI_CONNECT_PARALLELISM));
std::vector<Future<Void>> addInterfs;
for( auto it : kvs ) {
auto ip_port = it.key.endsWith(LiteralStringRef(":tls")) ? it.key.removeSuffix(LiteralStringRef(":tls")) : it.key;
address_interface[ip_port] = it.value;
addInterfs.push_back(addInterface(&address_interface, connectLock, it));
}
wait( waitForAll(addInterfs) );
}
if (tokens.size() == 1 || tokencmp(tokens[1], "list")) {
if(address_interface.size() == 0) {
@ -3009,7 +3032,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
printf("\n");
} else if (tokencmp(tokens[1], "all")) {
for( auto it : address_interface ) {
tr->set(LiteralStringRef("\xff\xff/reboot_worker"), it.second);
tr->set(LiteralStringRef("\xff\xff/reboot_worker"), it.second.first);
}
if (address_interface.size() == 0) {
printf("ERROR: no processes to kill. You must run the `kill command before running `kill all.\n");
@ -3027,7 +3050,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
if(!is_error) {
for(int i = 1; i < tokens.size(); i++) {
tr->set(LiteralStringRef("\xff\xff/reboot_worker"), address_interface[tokens[i]]);
tr->set(LiteralStringRef("\xff\xff/reboot_worker"), address_interface[tokens[i]].first);
}
printf("Attempted to kill %zu processes\n", tokens.size() - 1);
}
@ -3302,9 +3325,12 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
getTransaction(db, tr, options, intrans);
if (tokens.size() == 1) {
Standalone<RangeResultRef> kvs = wait( makeInterruptable( tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces"), LiteralStringRef("\xff\xff\xff")), 1) ) );
Reference<FlowLock> connectLock(new FlowLock(CLIENT_KNOBS->CLI_CONNECT_PARALLELISM));
std::vector<Future<Void>> addInterfs;
for( auto it : kvs ) {
address_interface[it.key] = it.value;
addInterfs.push_back(addInterface(&address_interface, connectLock, it));
}
wait( waitForAll(addInterfs) );
}
if (tokens.size() == 1 || tokencmp(tokens[1], "list")) {
if(address_interface.size() == 0) {
@ -3320,7 +3346,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
printf("\n");
} else if (tokencmp(tokens[1], "all")) {
for( auto it : address_interface ) {
tr->set(LiteralStringRef("\xff\xff/reboot_and_check_worker"), it.second);
tr->set(LiteralStringRef("\xff\xff/reboot_and_check_worker"), it.second.first);
}
if (address_interface.size() == 0) {
printf("ERROR: no processes to check. You must run the `expensive_data_check command before running `expensive_data_check all.\n");
@ -3338,7 +3364,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
if(!is_error) {
for(int i = 1; i < tokens.size(); i++) {
tr->set(LiteralStringRef("\xff\xff/reboot_and_check_worker"), address_interface[tokens[i]]);
tr->set(LiteralStringRef("\xff\xff/reboot_and_check_worker"), address_interface[tokens[i]].first);
}
printf("Attempted to kill and check %zu processes\n", tokens.size() - 1);
}

View File

@ -85,10 +85,10 @@
<PreprocessorDefinitions>TLS_DISABLED;WIN32;_WIN32_WINNT=0x0502;WINVER=0x0502;BOOST_ALL_NO_LIB;NTDDI_VERSION=0x05020000;_DEBUG;_HAS_ITERATOR_DEBUGGING=0;_CONSOLE;_CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<AdditionalIncludeDirectories>..\zookeeper\win32;..\zookeeper\generated;..\zookeeper\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<AdditionalOptions>/bigobj @../flow/no_intellisense.opt %(AdditionalOptions)</AdditionalOptions>
<MinimalRebuild>false</MinimalRebuild>
<RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
<AdditionalOptions> @../flow/no_intellisense.opt %(AdditionalOptions)</AdditionalOptions>
<LanguageStandard>stdcpp17</LanguageStandard>
<LanguageStandard>stdcpp17</LanguageStandard>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
@ -110,12 +110,12 @@
<PreprocessorDefinitions>TLS_DISABLED;WIN32;_WIN32_WINNT=0x0502;WINVER=0x0502;BOOST_ALL_NO_LIB;NTDDI_VERSION=0x05020000;NDEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<AdditionalIncludeDirectories>..\zookeeper\win32;..\zookeeper\generated;..\zookeeper\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<AdditionalOptions>/bigobj @../flow/no_intellisense.opt %(AdditionalOptions)</AdditionalOptions>
<FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
<RuntimeLibrary>MultiThreaded</RuntimeLibrary>
<BufferSecurityCheck>false</BufferSecurityCheck>
<EnableEnhancedInstructionSet>StreamingSIMDExtensions2</EnableEnhancedInstructionSet>
<AdditionalOptions> @../flow/no_intellisense.opt %(AdditionalOptions)</AdditionalOptions>
<LanguageStandard>stdcpp17</LanguageStandard>
<LanguageStandard>stdcpp17</LanguageStandard>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>

View File

@ -45,7 +45,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( COORDINATOR_RECONNECTION_DELAY, 1.0 );
init( CLIENT_EXAMPLE_AMOUNT, 20 );
init( MAX_CLIENT_STATUS_AGE, 1.0 );
init( MAX_CLIENT_PROXY_CONNECTIONS, 5 ); if( randomize && BUGGIFY ) MAX_CLIENT_PROXY_CONNECTIONS = 1;
init( MAX_PROXY_CONNECTIONS, 5 ); if( randomize && BUGGIFY ) MAX_PROXY_CONNECTIONS = 1;
// wrong_shard_server sometimes comes from the only nonfailed server, so we need to avoid a fast spin
@ -76,6 +76,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( GET_RANGE_SHARD_LIMIT, 2 );
init( WARM_RANGE_SHARD_LIMIT, 100 );
init( STORAGE_METRICS_SHARD_LIMIT, 100 ); if( randomize && BUGGIFY ) STORAGE_METRICS_SHARD_LIMIT = 3;
init( SHARD_COUNT_LIMIT, 80 ); if( randomize && BUGGIFY ) SHARD_COUNT_LIMIT = 3;
init( STORAGE_METRICS_UNFAIR_SPLIT_LIMIT, 2.0/3.0 );
init( STORAGE_METRICS_TOO_MANY_SHARDS_DELAY, 15.0 );
init( AGGREGATE_HEALTH_METRICS_MAX_STALENESS, 0.5 );
@ -197,6 +198,9 @@ ClientKnobs::ClientKnobs(bool randomize) {
}
init(CSI_STATUS_DELAY, 10.0 );
init( CONSISTENCY_CHECK_RATE_LIMIT_MAX, 50e6 ); // Limit in per sec
init( CONSISTENCY_CHECK_RATE_LIMIT_MAX, 50e6 ); // Limit in per sec
init( CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME, 7 * 24 * 60 * 60 ); // 7 days
//fdbcli
init( CLI_CONNECT_PARALLELISM, 20 );
}

View File

@ -44,7 +44,7 @@ public:
double COORDINATOR_RECONNECTION_DELAY;
int CLIENT_EXAMPLE_AMOUNT;
double MAX_CLIENT_STATUS_AGE;
int MAX_CLIENT_PROXY_CONNECTIONS;
int MAX_PROXY_CONNECTIONS;
// wrong_shard_server sometimes comes from the only nonfailed server, so we need to avoid a fast spin
double WRONG_SHARD_SERVER_DELAY; // SOMEDAY: This delay can limit performance of retrieving data when the cache is mostly wrong (e.g. dumping the database after a test)
@ -75,6 +75,7 @@ public:
int GET_RANGE_SHARD_LIMIT;
int WARM_RANGE_SHARD_LIMIT;
int STORAGE_METRICS_SHARD_LIMIT;
int SHARD_COUNT_LIMIT;
double STORAGE_METRICS_UNFAIR_SPLIT_LIMIT;
double STORAGE_METRICS_TOO_MANY_SHARDS_DELAY;
double AGGREGATE_HEALTH_METRICS_MAX_STALENESS;
@ -189,6 +190,9 @@ public:
int CONSISTENCY_CHECK_RATE_LIMIT_MAX;
int CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME;
//fdbcli
int CLI_CONNECT_PARALLELISM;
ClientKnobs(bool randomize = false);
};

View File

@ -670,6 +670,25 @@ ACTOR Future<Void> monitorLeaderForProxies( Key clusterKey, vector<NetworkAddres
}
}
void shrinkProxyList( ClientDBInfo& ni, std::vector<UID>& lastProxyUIDs, std::vector<MasterProxyInterface>& lastProxies ) {
if(ni.proxies.size() > CLIENT_KNOBS->MAX_PROXY_CONNECTIONS) {
std::vector<UID> proxyUIDs;
for(auto& proxy : ni.proxies) {
proxyUIDs.push_back(proxy.id());
}
if(proxyUIDs != lastProxyUIDs) {
lastProxyUIDs = proxyUIDs;
lastProxies = ni.proxies;
deterministicRandom()->randomShuffle(lastProxies);
lastProxies.resize(CLIENT_KNOBS->MAX_PROXY_CONNECTIONS);
for(int i = 0; i < lastProxies.size(); i++) {
TraceEvent("ConnectedProxy").detail("Proxy", lastProxies[i].id());
}
}
ni.proxies = lastProxies;
}
}
// Leader is the process that will be elected by coordinators as the cluster controller
ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration( Reference<ClusterConnectionFile> connFile, Reference<AsyncVar<ClientDBInfo>> clientInfo, MonitorLeaderInfo info, Standalone<VectorRef<ClientVersionRef>> supportedVersions, Key traceLogGroup) {
state ClusterConnectionString cs = info.intermediateConnFile->getConnectionString();
@ -730,24 +749,8 @@ ACTOR Future<MonitorLeaderInfo> monitorProxiesOneGeneration( Reference<ClusterCo
connFile->notifyConnected();
auto& ni = rep.get().mutate();
if(ni.proxies.size() > CLIENT_KNOBS->MAX_CLIENT_PROXY_CONNECTIONS) {
std::vector<UID> proxyUIDs;
for(auto& proxy : ni.proxies) {
proxyUIDs.push_back(proxy.id());
}
if(proxyUIDs != lastProxyUIDs) {
lastProxyUIDs = proxyUIDs;
lastProxies = ni.proxies;
deterministicRandom()->randomShuffle(lastProxies);
lastProxies.resize(CLIENT_KNOBS->MAX_CLIENT_PROXY_CONNECTIONS);
for(int i = 0; i < lastProxies.size(); i++) {
TraceEvent("ClientConnectedProxy").detail("Proxy", lastProxies[i].id());
}
}
ni.proxies = lastProxies;
}
clientInfo->set( rep.get().read() );
shrinkProxyList(ni, lastProxyUIDs, lastProxies);
clientInfo->set( ni );
successIdx = idx;
} else if(idx == successIdx) {
wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY));

View File

@ -59,6 +59,8 @@ Future<Void> monitorLeaderForProxies( Value const& key, vector<NetworkAddress> c
Future<Void> monitorProxies( Reference<AsyncVar<Reference<ClusterConnectionFile>>> const& connFile, Reference<AsyncVar<ClientDBInfo>> const& clientInfo, Standalone<VectorRef<ClientVersionRef>> const& supportedVersions, Key const& traceLogGroup );
void shrinkProxyList( ClientDBInfo& ni, std::vector<UID>& lastProxyUIDs, std::vector<MasterProxyInterface>& lastProxies );
#pragma region Implementation
Future<Void> monitorLeaderInternal( Reference<ClusterConnectionFile> const& connFile, Reference<AsyncVar<Value>> const& outSerializedLeaderInfo );

View File

@ -1453,6 +1453,17 @@ ACTOR Future<Version> waitForCommittedVersion( Database cx, Version version ) {
}
}
ACTOR Future<Version> getRawVersion( Database cx ) {
loop {
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetReadVersionReply v = wait( loadBalance( cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, GetReadVersionRequest( 0, GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE ), cx->taskID ) ) ) {
return v.version;
}
}
}
}
ACTOR Future<Void> readVersionBatcher(
DatabaseContext* cx, FutureStream<std::pair<Promise<GetReadVersionReply>, Optional<UID>>> versionStream,
uint32_t flags);
@ -2133,6 +2144,10 @@ ACTOR Future<Void> watch( Reference<Watch> watch, Database cx, Transaction *self
return Void();
}
Future<Version> Transaction::getRawReadVersion() {
return ::getRawVersion(cx);
}
Future< Void > Transaction::watch( Reference<Watch> watch ) {
return ::watch(watch, cx, this);
}
@ -3215,16 +3230,25 @@ ACTOR Future< StorageMetrics > waitStorageMetricsMultipleLocations(
}
}
ACTOR Future< StorageMetrics > waitStorageMetrics(
ACTOR Future< StorageMetrics > extractMetrics( Future<std::pair<Optional<StorageMetrics>, int>> fMetrics ) {
std::pair<Optional<StorageMetrics>, int> x = wait(fMetrics);
return x.first.get();
}
ACTOR Future< std::pair<Optional<StorageMetrics>, int> > waitStorageMetrics(
Database cx,
KeyRange keys,
StorageMetrics min,
StorageMetrics max,
StorageMetrics permittedError,
int shardLimit )
int shardLimit,
int expectedShardCount )
{
loop {
vector< pair<KeyRange, Reference<LocationInfo>> > locations = wait( getKeyRangeLocations( cx, keys, shardLimit, false, &StorageServerInterface::waitMetrics, TransactionInfo(TaskPriority::DataDistribution) ) );
if(expectedShardCount >= 0 && locations.size() != expectedShardCount) {
return std::make_pair(Optional<StorageMetrics>(), locations.size());
}
//SOMEDAY: Right now, if there are too many shards we delay and check again later. There may be a better solution to this.
if(locations.size() < shardLimit) {
@ -3237,7 +3261,7 @@ ACTOR Future< StorageMetrics > waitStorageMetrics(
fx = loadBalance( locations[0].second, &StorageServerInterface::waitMetrics, req, TaskPriority::DataDistribution );
}
StorageMetrics x = wait(fx);
return x;
return std::make_pair(x,-1);
} catch (Error& e) {
if (e.code() != error_code_wrong_shard_server && e.code() != error_code_all_alternatives_failed) {
TraceEvent(SevError, "WaitStorageMetricsError").error(e);
@ -3258,20 +3282,21 @@ ACTOR Future< StorageMetrics > waitStorageMetrics(
}
}
Future< StorageMetrics > Transaction::waitStorageMetrics(
Future< std::pair<Optional<StorageMetrics>, int> > Transaction::waitStorageMetrics(
KeyRange const& keys,
StorageMetrics const& min,
StorageMetrics const& max,
StorageMetrics const& permittedError,
int shardLimit )
int shardLimit,
int expectedShardCount )
{
return ::waitStorageMetrics( cx, keys, min, max, permittedError, shardLimit );
return ::waitStorageMetrics( cx, keys, min, max, permittedError, shardLimit, expectedShardCount );
}
Future< StorageMetrics > Transaction::getStorageMetrics( KeyRange const& keys, int shardLimit ) {
StorageMetrics m;
m.bytes = -1;
return ::waitStorageMetrics( cx, keys, StorageMetrics(), m, StorageMetrics(), shardLimit );
return extractMetrics( ::waitStorageMetrics( cx, keys, StorageMetrics(), m, StorageMetrics(), shardLimit, -1 ) );
}
ACTOR Future< Standalone<VectorRef<KeyRef>> > splitStorageMetrics( Database cx, KeyRange keys, StorageMetrics limit, StorageMetrics estimated )

View File

@ -211,6 +211,7 @@ public:
void setVersion( Version v );
Future<Version> getReadVersion() { return getReadVersion(0); }
Future<Version> getRawReadVersion();
[[nodiscard]] Future<Optional<Value>> get(const Key& key, bool snapshot = false);
[[nodiscard]] Future<Void> watch(Reference<Watch> watch);
@ -241,7 +242,7 @@ public:
Future< Void > warmRange( Database cx, KeyRange keys );
Future< StorageMetrics > waitStorageMetrics( KeyRange const& keys, StorageMetrics const& min, StorageMetrics const& max, StorageMetrics const& permittedError, int shardLimit );
Future< std::pair<Optional<StorageMetrics>, int> > waitStorageMetrics( KeyRange const& keys, StorageMetrics const& min, StorageMetrics const& max, StorageMetrics const& permittedError, int shardLimit, int expectedShardCount );
Future< StorageMetrics > getStorageMetrics( KeyRange const& keys, int shardLimit );
Future< Standalone<VectorRef<KeyRef>> > splitStorageMetrics( KeyRange const& keys, StorageMetrics const& limit, StorageMetrics const& estimated );

View File

@ -282,15 +282,13 @@ struct GetShardStateRequest {
struct StorageMetrics {
constexpr static FileIdentifier file_identifier = 13622226;
int64_t bytes; // total storage
int64_t bytesPerKSecond; // network bandwidth (average over 10s)
int64_t iosPerKSecond;
int64_t bytesReadPerKSecond;
int64_t bytes = 0; // total storage
int64_t bytesPerKSecond = 0; // network bandwidth (average over 10s)
int64_t iosPerKSecond = 0;
int64_t bytesReadPerKSecond = 0;
static const int64_t infinity = 1LL<<60;
StorageMetrics() : bytes(0), bytesPerKSecond(0), iosPerKSecond(0), bytesReadPerKSecond(0) {}
bool allLessOrEqual( const StorageMetrics& rhs ) const {
return bytes <= rhs.bytes && bytesPerKSecond <= rhs.bytesPerKSecond && iosPerKSecond <= rhs.iosPerKSecond &&
bytesReadPerKSecond <= rhs.bytesReadPerKSecond;

View File

@ -416,7 +416,7 @@ public:
++ctx.countAIOSubmit;
double elapsed = timer_monotonic() - begin;
g_network->networkMetrics.secSquaredSubmit += elapsed*elapsed/2;
g_network->networkInfo.metrics.secSquaredSubmit += elapsed*elapsed/2;
//TraceEvent("Launched").detail("N", rc).detail("Queued", ctx.queue.size()).detail("Elapsed", elapsed).detail("Outstanding", ctx.outstanding+rc);
//printf("launched: %d/%d in %f us (%d outstanding; lowest prio %d)\n", rc, ctx.queue.size(), elapsed*1e6, ctx.outstanding + rc, toStart[n-1]->getTask());
@ -672,7 +672,7 @@ private:
double t = timer_monotonic();
double elapsed = t - ctx.ioStallBegin;
ctx.ioStallBegin = t;
g_network->networkMetrics.secSquaredDiskStall += elapsed*elapsed/2;
g_network->networkInfo.metrics.secSquaredDiskStall += elapsed*elapsed/2;
}
ctx.outstanding -= n;

View File

@ -321,28 +321,28 @@ Future< REPLY_TYPE(Request) > loadBalance(
}
if(!alternatives->alwaysFresh()) {
if(now() - g_network->networkMetrics.newestAlternativesFailure > FLOW_KNOBS->ALTERNATIVES_FAILURE_RESET_TIME) {
g_network->networkMetrics.oldestAlternativesFailure = now();
if(now() - g_network->networkInfo.newestAlternativesFailure > FLOW_KNOBS->ALTERNATIVES_FAILURE_RESET_TIME) {
g_network->networkInfo.oldestAlternativesFailure = now();
}
double delay = FLOW_KNOBS->ALTERNATIVES_FAILURE_MIN_DELAY;
if(now() - g_network->networkMetrics.lastAlternativesFailureSkipDelay > FLOW_KNOBS->ALTERNATIVES_FAILURE_SKIP_DELAY) {
g_network->networkMetrics.lastAlternativesFailureSkipDelay = now();
if(now() - g_network->networkInfo.lastAlternativesFailureSkipDelay > FLOW_KNOBS->ALTERNATIVES_FAILURE_SKIP_DELAY) {
g_network->networkInfo.lastAlternativesFailureSkipDelay = now();
} else {
double elapsed = now()-g_network->networkMetrics.oldestAlternativesFailure;
double elapsed = now()-g_network->networkInfo.oldestAlternativesFailure;
delay = std::max(delay, std::min(elapsed*FLOW_KNOBS->ALTERNATIVES_FAILURE_DELAY_RATIO, FLOW_KNOBS->ALTERNATIVES_FAILURE_MAX_DELAY));
delay = std::max(delay, std::min(elapsed*FLOW_KNOBS->ALTERNATIVES_FAILURE_SLOW_DELAY_RATIO, FLOW_KNOBS->ALTERNATIVES_FAILURE_SLOW_MAX_DELAY));
}
// Making this SevWarn means a lot of clutter
if(now() - g_network->networkMetrics.newestAlternativesFailure > 1 || deterministicRandom()->random01() < 0.01) {
if(now() - g_network->networkInfo.newestAlternativesFailure > 1 || deterministicRandom()->random01() < 0.01) {
TraceEvent("AllAlternativesFailed")
.detail("Interval", FLOW_KNOBS->CACHE_REFRESH_INTERVAL_WHEN_ALL_ALTERNATIVES_FAILED)
.detail("Alternatives", alternatives->description())
.detail("Delay", delay);
}
g_network->networkMetrics.newestAlternativesFailure = now();
g_network->networkInfo.newestAlternativesFailure = now();
choose {
when ( wait( quorum( ok, 1 ) ) ) {}

View File

@ -71,11 +71,29 @@ static int recv_func(void* ctx, uint8_t* buf, int len) {
}
ACTOR static Future<Void> handshake( TLSConnection* self ) {
state std::pair<IPAddress,uint16_t> peerIP = std::make_pair(self->conn->getPeerAddress().ip, self->is_client ? self->conn->getPeerAddress().port : static_cast<uint16_t>(0));
if(!self->is_client) {
auto iter(g_network->networkInfo.serverTLSConnectionThrottler.find(peerIP));
if(iter != g_network->networkInfo.serverTLSConnectionThrottler.end()) {
if (now() < iter->second) {
TraceEvent("TLSIncomingConnectionThrottlingWarning", self->getDebugID()).suppressFor(1.0).detail("PeerIP", peerIP.first.toString());
wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT));
throw connection_failed();
} else {
g_network->networkInfo.serverTLSConnectionThrottler.erase(peerIP);
}
}
}
loop {
int r = self->session->handshake();
if(BUGGIFY_WITH_PROB(0.001)) {
r = ITLSSession::FAILED;
}
if ( r == ITLSSession::SUCCESS ) break;
if ( r == ITLSSession::FAILED ) {
TraceEvent("TLSConnectionHandshakeError", self->getDebugID()).suppressFor(1.0).detail("Peer", self->getPeerAddress());
g_network->networkInfo.serverTLSConnectionThrottler[peerIP] = now() + (self->is_client ? FLOW_KNOBS->TLS_CLIENT_CONNECTION_THROTTLE_TIMEOUT : FLOW_KNOBS->TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT);
throw connection_failed();
}
ASSERT( r == ITLSSession::WANT_WRITE || r == ITLSSession::WANT_READ );
@ -87,7 +105,7 @@ ACTOR static Future<Void> handshake( TLSConnection* self ) {
return Void();
}
TLSConnection::TLSConnection( Reference<IConnection> const& conn, Reference<ITLSPolicy> const& policy, bool is_client, std::string host) : conn(conn), write_wants(0), read_wants(0), uid(conn->getDebugID()) {
TLSConnection::TLSConnection( Reference<IConnection> const& conn, Reference<ITLSPolicy> const& policy, bool is_client, std::string host) : conn(conn), write_wants(0), read_wants(0), uid(conn->getDebugID()), is_client(is_client) {
const char * serverName = host.empty() ? NULL : host.c_str();
session = Reference<ITLSSession>( policy->create_session(is_client, serverName, send_func, this, recv_func, this, (void*)&uid) );
if ( !session ) {
@ -169,9 +187,25 @@ TLSNetworkConnections::TLSNetworkConnections( Reference<TLSOptions> options ) :
g_network->setGlobal(INetwork::enumGlobal::enNetworkConnections, (flowGlobalType) this);
}
ACTOR Future<Reference<IConnection>> waitAndFailConnection() {
wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT));
throw connection_failed();
}
Future<Reference<IConnection>> TLSNetworkConnections::connect( NetworkAddress toAddr, std::string host) {
if ( toAddr.isTLS() ) {
NetworkAddress clearAddr( toAddr.ip, toAddr.port, toAddr.isPublic(), false );
std::pair<IPAddress,uint16_t> peerIP = std::make_pair(toAddr.ip, toAddr.port);
auto iter(g_network->networkInfo.serverTLSConnectionThrottler.find(peerIP));
if(iter != g_network->networkInfo.serverTLSConnectionThrottler.end()) {
if (now() < iter->second) {
TraceEvent("TLSOutgoingConnectionThrottlingWarning").suppressFor(1.0).detail("PeerIP", toAddr);
return waitAndFailConnection();
} else {
g_network->networkInfo.serverTLSConnectionThrottler.erase(peerIP);
}
}
TraceEvent("TLSConnectionConnecting").suppressFor(1.0).detail("ToAddr", toAddr);
// For FDB<->FDB connections, we don't have hostnames and can't verify IP
// addresses against certificates, so we have our own peer verifying logic

View File

@ -36,6 +36,7 @@ struct TLSConnection : IConnection, ReferenceCounted<TLSConnection> {
int write_wants, read_wants;
UID uid;
bool is_client;
virtual void addref() { ReferenceCounted<TLSConnection>::addref(); }
virtual void delref() { ReferenceCounted<TLSConnection>::delref(); }

View File

@ -1170,7 +1170,7 @@ public:
std::transform(newTLogs.begin(), newTLogs.end(), std::back_inserter(exclusionWorkerIds), fn);
std::transform(newSatelliteTLogs.begin(), newSatelliteTLogs.end(), std::back_inserter(exclusionWorkerIds), fn);
RoleFitness newRemoteTLogFit(
(db.config.usableRegions > 1 && dbi.recoveryState == RecoveryState::FULLY_RECOVERED) ?
(db.config.usableRegions > 1 && (dbi.recoveryState == RecoveryState::ALL_LOGS_RECRUITED || dbi.recoveryState == RecoveryState::FULLY_RECOVERED)) ?
getWorkersForTlogs(db.config, db.config.getRemoteTLogReplicationFactor(), db.config.getDesiredRemoteLogs(), db.config.getRemoteTLogPolicy(), id_used, true, remoteDC, exclusionWorkerIds)
: remote_tlogs, ProcessClass::TLog);
if(oldRemoteTLogFit < newRemoteTLogFit) return false;

View File

@ -774,59 +774,28 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
int64_t bestLoadBytes = 0;
Optional<Reference<IDataDistributionTeam>> bestOption;
std::vector<std::pair<int, Reference<IDataDistributionTeam>>> randomTeams;
std::set< UID > sources;
std::vector<Reference<IDataDistributionTeam>> randomTeams;
const std::set<UID> completeSources(req.completeSources.begin(), req.completeSources.end());
if( !req.wantsNewServers ) {
std::vector<Reference<IDataDistributionTeam>> similarTeams;
bool foundExact = false;
for( int i = 0; i < req.sources.size(); i++ )
sources.insert( req.sources[i] );
for( int i = 0; i < req.sources.size(); i++ ) {
if( self->server_info.count( req.sources[i] ) ) {
auto& teamList = self->server_info[ req.sources[i] ]->teams;
for( int j = 0; j < teamList.size(); j++ ) {
if( teamList[j]->isHealthy() && (!req.preferLowerUtilization || teamList[j]->hasHealthyFreeSpace())) {
int sharedMembers = 0;
for( const UID& id : teamList[j]->getServerIDs() )
if( sources.count( id ) )
sharedMembers++;
if( !foundExact && sharedMembers == teamList[j]->size() ) {
foundExact = true;
bestOption = Optional<Reference<IDataDistributionTeam>>();
similarTeams.clear();
}
if( (sharedMembers == teamList[j]->size()) || (!foundExact && req.wantsTrueBest) ) {
int64_t loadBytes = SOME_SHARED * teamList[j]->getLoadBytes(true, req.inflightPenalty);
if( !bestOption.present() || ( req.preferLowerUtilization && loadBytes < bestLoadBytes ) || ( !req.preferLowerUtilization && loadBytes > bestLoadBytes ) ) {
bestLoadBytes = loadBytes;
bestOption = teamList[j];
}
}
else if( !req.wantsTrueBest && !foundExact )
similarTeams.push_back( teamList[j] );
for( int i = 0; i < req.completeSources.size(); i++ ) {
if( !self->server_info.count( req.completeSources[i] ) ) {
continue;
}
auto& teamList = self->server_info[ req.completeSources[i] ]->teams;
for( int j = 0; j < teamList.size(); j++ ) {
bool found = true;
auto serverIDs = teamList[j]->getServerIDs();
for( int k = 0; k < teamList[j]->size(); k++ ) {
if( !completeSources.count( serverIDs[k] ) ) {
found = false;
break;
}
}
}
}
if( foundExact || (req.wantsTrueBest && bestOption.present() ) ) {
ASSERT( bestOption.present() );
// Check the team size: be sure team size is correct
ASSERT(bestOption.get()->size() == self->configuration.storageTeamSize);
req.reply.send( bestOption );
return Void();
}
if( !req.wantsTrueBest ) {
while( similarTeams.size() && randomTeams.size() < SERVER_KNOBS->BEST_TEAM_OPTION_COUNT ) {
int randomTeam = deterministicRandom()->randomInt( 0, similarTeams.size() );
randomTeams.push_back( std::make_pair( SOME_SHARED, similarTeams[randomTeam] ) );
swapAndPop( &similarTeams, randomTeam );
if(found && teamList[j]->isHealthy()) {
req.reply.send( teamList[j] );
return Void();
}
}
}
}
@ -835,7 +804,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
ASSERT( !bestOption.present() );
for( int i = 0; i < self->teams.size(); i++ ) {
if( self->teams[i]->isHealthy() && (!req.preferLowerUtilization || self->teams[i]->hasHealthyFreeSpace()) ) {
int64_t loadBytes = NONE_SHARED * self->teams[i]->getLoadBytes(true, req.inflightPenalty);
int64_t loadBytes = self->teams[i]->getLoadBytes(true, req.inflightPenalty);
if( !bestOption.present() || ( req.preferLowerUtilization && loadBytes < bestLoadBytes ) || ( !req.preferLowerUtilization && loadBytes > bestLoadBytes ) ) {
bestLoadBytes = loadBytes;
bestOption = self->teams[i];
@ -850,12 +819,15 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
Reference<IDataDistributionTeam> dest = deterministicRandom()->randomChoice(self->teams);
bool ok = dest->isHealthy() && (!req.preferLowerUtilization || dest->hasHealthyFreeSpace());
for(int i=0; ok && i<randomTeams.size(); i++)
if (randomTeams[i].second->getServerIDs() == dest->getServerIDs())
for(int i=0; ok && i<randomTeams.size(); i++) {
if (randomTeams[i]->getServerIDs() == dest->getServerIDs()) {
ok = false;
break;
}
}
if (ok)
randomTeams.push_back( std::make_pair( NONE_SHARED, dest ) );
randomTeams.push_back( dest );
else
nTries++;
}
@ -866,10 +838,10 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
for( int i = 0; i < randomTeams.size(); i++ ) {
int64_t loadBytes = randomTeams[i].first * randomTeams[i].second->getLoadBytes(true, req.inflightPenalty);
int64_t loadBytes = randomTeams[i]->getLoadBytes(true, req.inflightPenalty);
if( !bestOption.present() || ( req.preferLowerUtilization && loadBytes < bestLoadBytes ) || ( !req.preferLowerUtilization && loadBytes > bestLoadBytes ) ) {
bestLoadBytes = loadBytes;
bestOption = randomTeams[i].second;
bestOption = randomTeams[i];
}
}
}
@ -878,30 +850,24 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// We will get stuck at this! This only happens when a DC fails. No need to consider it right now.
if(!bestOption.present() && self->zeroHealthyTeams->get()) {
//Attempt to find the unhealthy source server team and return it
std::set<UID> completeSources;
for( int i = 0; i < req.completeSources.size(); i++ ) {
completeSources.insert( req.completeSources[i] );
}
int bestSize = 0;
for( int i = 0; i < req.completeSources.size(); i++ ) {
if( self->server_info.count( req.completeSources[i] ) ) {
auto& teamList = self->server_info[ req.completeSources[i] ]->teams;
for( int j = 0; j < teamList.size(); j++ ) {
bool found = true;
auto serverIDs = teamList[j]->getServerIDs();
for( int k = 0; k < teamList[j]->size(); k++ ) {
if( !completeSources.count( serverIDs[k] ) ) {
found = false;
break;
}
}
if(found && teamList[j]->size() > bestSize) {
bestOption = teamList[j];
bestSize = teamList[j]->size();
if( !self->server_info.count( req.completeSources[i] ) ) {
continue;
}
auto& teamList = self->server_info[ req.completeSources[i] ]->teams;
for( int j = 0; j < teamList.size(); j++ ) {
bool found = true;
auto serverIDs = teamList[j]->getServerIDs();
for( int k = 0; k < teamList[j]->size(); k++ ) {
if( !completeSources.count( serverIDs[k] ) ) {
found = false;
break;
}
}
break;
if(found) {
req.reply.send( teamList[j] );
return Void();
}
}
}
}

View File

@ -38,11 +38,6 @@ struct RelocateShard {
RelocateShard( KeyRange const& keys, int priority ) : keys(keys), priority(priority) {}
};
enum {
SOME_SHARED = 2,
NONE_SHARED = 3
};
struct IDataDistributionTeam {
virtual vector<StorageServerInterface> getLastKnownServerInterfaces() = 0;
virtual int size() = 0;
@ -81,7 +76,6 @@ struct GetTeamRequest {
bool wantsTrueBest;
bool preferLowerUtilization;
double inflightPenalty;
std::vector<UID> sources;
std::vector<UID> completeSources;
Promise< Optional< Reference<IDataDistributionTeam> > > reply;
@ -93,10 +87,6 @@ struct GetTeamRequest {
ss << "WantsNewServers:" << wantsNewServers << " WantsTrueBest:" << wantsTrueBest
<< " PreferLowerUtilization:" << preferLowerUtilization << " inflightPenalty:" << inflightPenalty << ";";
ss << "Sources:";
for (auto& s : sources) {
ss << s.toString() << ",";
}
ss << "CompleteSources:";
for (auto& cs : completeSources) {
ss << cs.toString() << ",";

View File

@ -54,14 +54,7 @@ struct RelocateData {
rs.priority == SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM ||
rs.priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM ||
rs.priority == SERVER_KNOBS->PRIORITY_SPLIT_SHARD ||
rs.priority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT ||
mergeWantsNewServers(rs.keys, rs.priority)), interval("QueuedRelocation") {}
static bool mergeWantsNewServers(KeyRangeRef keys, int priority) {
return priority == SERVER_KNOBS->PRIORITY_MERGE_SHARD &&
(SERVER_KNOBS->MERGE_ONTO_NEW_TEAM == 2 ||
(SERVER_KNOBS->MERGE_ONTO_NEW_TEAM == 1 && keys.begin.startsWith(LiteralStringRef("\xff"))));
}
rs.priority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT), interval("QueuedRelocation") {}
static bool isHealthPriority(int priority) {
return priority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY ||
@ -946,7 +939,6 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
if(rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_ONE_LEFT;
auto req = GetTeamRequest(rd.wantsNewServers, rd.priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, true, inflightPenalty);
req.sources = rd.src;
req.completeSources = rd.completeSources;
Optional<Reference<IDataDistributionTeam>> bestTeam = wait(brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req)));
// If a DC has no healthy team, we stop checking the other DCs until
@ -1450,7 +1442,7 @@ ACTOR Future<Void> dataDistributionQueue(
.detail( "BytesWritten", self.bytesWritten )
.detail( "PriorityRecoverMove", self.priority_relocations[SERVER_KNOBS->PRIORITY_RECOVER_MOVE] )
.detail( "PriorityRebalanceUnderutilizedTeam", self.priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM] )
.detail( "PriorityRebalannceOverutilizedTeam", self.priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM] )
.detail( "PriorityRebalanceOverutilizedTeam", self.priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM] )
.detail( "PriorityTeamHealthy", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_HEALTHY] )
.detail( "PriorityTeamContainsUndesiredServer", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER] )
.detail( "PriorityTeamRedundant", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT] )

View File

@ -35,6 +35,18 @@ enum BandwidthStatus {
enum ReadBandwidthStatus { ReadBandwidthStatusNormal, ReadBandwidthStatusHigh };
struct ShardMetrics {
StorageMetrics metrics;
double lastLowBandwidthStartTime;
int shardCount;
bool operator == ( ShardMetrics const& rhs ) const {
return metrics == rhs.metrics && lastLowBandwidthStartTime == rhs.lastLowBandwidthStartTime && shardCount == rhs.shardCount;
}
ShardMetrics(StorageMetrics const& metrics, double lastLowBandwidthStartTime, int shardCount) : metrics(metrics), lastLowBandwidthStartTime(lastLowBandwidthStartTime), shardCount(shardCount) {}
};
BandwidthStatus getBandwidthStatus( StorageMetrics const& metrics ) {
if( metrics.bytesPerKSecond > SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC )
return BandwidthStatusHigh;
@ -69,7 +81,7 @@ ACTOR Future<Void> updateMaxShardSize( Reference<AsyncVar<int64_t>> dbSizeEstima
struct ShardTrackedData {
Future<Void> trackShard;
Future<Void> trackBytes;
Reference<AsyncVar<Optional<StorageMetrics>>> stats;
Reference<AsyncVar<Optional<ShardMetrics>>> stats;
};
struct DataDistributionTracker {
@ -106,7 +118,7 @@ struct DataDistributionTracker {
void restartShardTrackers(
DataDistributionTracker* self,
KeyRangeRef keys,
Optional<StorageMetrics> startingSize = Optional<StorageMetrics>());
Optional<ShardMetrics> startingSize = Optional<ShardMetrics>());
// Gets the permitted size and IO bounds for a shard. A shard that starts at allKeys.begin
// (i.e. '') will have a permitted size of 0, since the database can contain no data.
@ -151,8 +163,13 @@ int64_t getMaxShardSize( double dbSizeEstimate ) {
ACTOR Future<Void> trackShardBytes(
DataDistributionTracker* self,
KeyRange keys,
Reference<AsyncVar<Optional<StorageMetrics>>> shardMetrics)
Reference<AsyncVar<Optional<ShardMetrics>>> shardMetrics)
{
state BandwidthStatus bandwidthStatus = shardMetrics->get().present() ? getBandwidthStatus( shardMetrics->get().get().metrics ) : BandwidthStatusNormal;
state double lastLowBandwidthStartTime = shardMetrics->get().present() ? shardMetrics->get().get().lastLowBandwidthStartTime : now();
state int shardCount = shardMetrics->get().present() ? shardMetrics->get().get().shardCount : 1;
state ReadBandwidthStatus readBandwidthStatus = shardMetrics->get().present() ? getReadBandwidthStatus(shardMetrics->get().get().metrics) : ReadBandwidthStatusNormal;
wait( delay( 0, TaskPriority::DataDistribution ) );
/*TraceEvent("TrackShardBytesStarting")
@ -162,15 +179,12 @@ ACTOR Future<Void> trackShardBytes(
.detail("StartingMetrics", shardMetrics->get().present() ? shardMetrics->get().get().metrics.bytes : 0)
.detail("StartingMerges", shardMetrics->get().present() ? shardMetrics->get().get().merges : 0);*/
state ReadBandwidthStatus readBandwidthStatus;
try {
loop {
ShardSizeBounds bounds;
if (shardMetrics->get().present()) {
auto bytes = shardMetrics->get().get().bytes;
auto bandwidthStatus = getBandwidthStatus(shardMetrics->get().get());
auto newReadBandwidthStatus = getReadBandwidthStatus(shardMetrics->get().get());
state ShardSizeBounds bounds;
if( shardMetrics->get().present() ) {
auto bytes = shardMetrics->get().get().metrics.bytes;
auto newReadBandwidthStatus = getReadBandwidthStatus(shardMetrics->get().get().metrics);
bounds.max.bytes = std::max( int64_t(bytes * 1.1), (int64_t)SERVER_KNOBS->MIN_SHARD_BYTES );
bounds.min.bytes = std::min( int64_t(bytes * 0.9), std::max(int64_t(bytes - (SERVER_KNOBS->MIN_SHARD_BYTES * 0.1)), (int64_t)0) );
bounds.permittedError.bytes = bytes * 0.1;
@ -227,30 +241,47 @@ ACTOR Future<Void> trackShardBytes(
bounds.min.iosPerKSecond = 0;
bounds.permittedError.iosPerKSecond = bounds.permittedError.infinity;
Transaction tr(self->cx);
StorageMetrics metrics = wait( tr.waitStorageMetrics( keys, bounds.min, bounds.max, bounds.permittedError, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT ) );
loop {
Transaction tr(self->cx);
std::pair<Optional<StorageMetrics>, int> metrics = wait( tr.waitStorageMetrics( keys, bounds.min, bounds.max, bounds.permittedError, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, shardCount ) );
if(metrics.first.present()) {
BandwidthStatus newBandwidthStatus = getBandwidthStatus( metrics.first.get() );
if(newBandwidthStatus == BandwidthStatusLow && bandwidthStatus != BandwidthStatusLow) {
lastLowBandwidthStartTime = now();
}
bandwidthStatus = newBandwidthStatus;
/*TraceEvent("ShardSizeUpdate")
.detail("Keys", keys)
.detail("UpdatedSize", metrics.metrics.bytes)
.detail("Bandwidth", metrics.metrics.bytesPerKSecond)
.detail("BandwidthStatus", getBandwidthStatus(metrics))
.detail("BytesLower", bounds.min.bytes)
.detail("BytesUpper", bounds.max.bytes)
.detail("BandwidthLower", bounds.min.bytesPerKSecond)
.detail("BandwidthUpper", bounds.max.bytesPerKSecond)
.detail("ShardSizePresent", shardSize->get().present())
.detail("OldShardSize", shardSize->get().present() ? shardSize->get().get().metrics.bytes : 0)
.detail("TrackerID", trackerID);*/
/*TraceEvent("ShardSizeUpdate")
.detail("Keys", keys)
.detail("UpdatedSize", metrics.metrics.bytes)
.detail("Bandwidth", metrics.metrics.bytesPerKSecond)
.detail("BandwithStatus", getBandwidthStatus(metrics))
.detail("BytesLower", bounds.min.bytes)
.detail("BytesUpper", bounds.max.bytes)
.detail("BandwidthLower", bounds.min.bytesPerKSecond)
.detail("BandwidthUpper", bounds.max.bytesPerKSecond)
.detail("ShardSizePresent", shardSize->get().present())
.detail("OldShardSize", shardSize->get().present() ? shardSize->get().get().metrics.bytes : 0)
.detail("TrackerID", trackerID);*/
if( shardMetrics->get().present() ) {
self->dbSizeEstimate->set( self->dbSizeEstimate->get() + metrics.bytes - shardMetrics->get().get().bytes );
if(keys.begin >= systemKeys.begin) {
self->systemSizeEstimate += metrics.bytes - shardMetrics->get().get().bytes;
if( shardMetrics->get().present() ) {
self->dbSizeEstimate->set( self->dbSizeEstimate->get() + metrics.first.get().bytes - shardMetrics->get().get().metrics.bytes );
if(keys.begin >= systemKeys.begin) {
self->systemSizeEstimate += metrics.first.get().bytes - shardMetrics->get().get().metrics.bytes;
}
}
shardMetrics->set( ShardMetrics(metrics.first.get(), lastLowBandwidthStartTime, shardCount) );
break;
} else {
shardCount = metrics.second;
if(shardMetrics->get().present()) {
auto newShardMetrics = shardMetrics->get().get();
newShardMetrics.shardCount = shardCount;
shardMetrics->set( newShardMetrics );
}
}
}
shardMetrics->set( metrics );
}
} catch( Error &e ) {
if (e.code() != error_code_actor_cancelled)
@ -290,10 +321,10 @@ ACTOR Future<Standalone<VectorRef<KeyRef>>> getSplitKeys( DataDistributionTracke
}
}
ACTOR Future<int64_t> getFirstSize( Reference<AsyncVar<Optional<StorageMetrics>>> stats ) {
ACTOR Future<int64_t> getFirstSize( Reference<AsyncVar<Optional<ShardMetrics>>> stats ) {
loop {
if(stats->get().present())
return stats->get().get().bytes;
return stats->get().get().metrics.bytes;
wait( stats->onChange() );
}
}
@ -333,8 +364,12 @@ ACTOR Future<Void> changeSizes( DataDistributionTracker* self, KeyRange keys, in
return Void();
}
struct HasBeenTrueFor : NonCopyable {
explicit HasBeenTrueFor( bool value ) : trigger( value ? Void() : Future<Void>() ) {}
struct HasBeenTrueFor : ReferenceCounted<HasBeenTrueFor> {
explicit HasBeenTrueFor( Optional<ShardMetrics> value ) {
if(value.present()) {
trigger = delayJittered(std::max(0.0, SERVER_KNOBS->DD_MERGE_COALESCE_DELAY + value.get().lastLowBandwidthStartTime - now()), decrementPriority(TaskPriority::DataDistribution) ) || cleared.getFuture();
}
}
Future<Void> set() {
if( !trigger.isValid() ) {
@ -364,11 +399,11 @@ private:
ACTOR Future<Void> shardSplitter(
DataDistributionTracker* self,
KeyRange keys,
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize,
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize,
ShardSizeBounds shardBounds )
{
state StorageMetrics metrics = shardSize->get().get();
state BandwidthStatus bandwidthStatus = getBandwidthStatus( shardSize->get().get() );
state StorageMetrics metrics = shardSize->get().get().metrics;
state BandwidthStatus bandwidthStatus = getBandwidthStatus( metrics );
//Split
TEST(true); // shard to be split
@ -418,17 +453,28 @@ ACTOR Future<Void> shardSplitter(
self->output.send( RelocateShard( r, SERVER_KNOBS->PRIORITY_SPLIT_SHARD) );
}
self->sizeChanges.add( changeSizes( self, keys, shardSize->get().get().bytes ) );
self->sizeChanges.add( changeSizes( self, keys, shardSize->get().get().metrics.bytes ) );
} else {
wait( delay(1.0, TaskPriority::DataDistribution) ); //In case the reason the split point was off was due to a discrepancy between storage servers
}
return Void();
}
ACTOR Future<Void> brokenPromiseToReady( Future<Void> f ) {
try {
wait(f);
} catch( Error &e ) {
if(e.code() != error_code_broken_promise) {
throw;
}
}
return Void();
}
Future<Void> shardMerger(
DataDistributionTracker* self,
KeyRange const& keys,
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize )
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize )
{
int64_t maxShardSize = self->maxShardSize->get().get();
@ -442,11 +488,17 @@ Future<Void> shardMerger(
int shardsMerged = 1;
bool forwardComplete = false;
KeyRangeRef merged;
StorageMetrics endingStats = shardSize->get().get();
int64_t systemBytes = keys.begin >= systemKeys.begin ? shardSize->get().get().bytes : 0;
StorageMetrics endingStats = shardSize->get().get().metrics;
int shardCount = shardSize->get().get().shardCount;
double lastLowBandwidthStartTime = shardSize->get().get().lastLowBandwidthStartTime;
if(FLOW_KNOBS->DELAY_JITTER_OFFSET*SERVER_KNOBS->DD_MERGE_COALESCE_DELAY > SERVER_KNOBS->DD_LOW_BANDWIDTH_DELAY && now() - lastLowBandwidthStartTime < SERVER_KNOBS->DD_LOW_BANDWIDTH_DELAY) {
TraceEvent( g_network->isSimulated() ? SevError : SevWarnAlways, "ShardMergeTooSoon", self->distributorId).detail("Keys", keys).detail("LastLowBandwidthStartTime", lastLowBandwidthStartTime);
}
int64_t systemBytes = keys.begin >= systemKeys.begin ? shardSize->get().get().metrics.bytes : 0;
loop {
Optional<StorageMetrics> newMetrics;
Optional<ShardMetrics> newMetrics;
if( !forwardComplete ) {
if( nextIter->range().end == allKeys.end ) {
forwardComplete = true;
@ -456,7 +508,7 @@ Future<Void> shardMerger(
newMetrics = nextIter->value().stats->get();
// If going forward, give up when the next shard's stats are not yet present.
if( !newMetrics.present() ) {
if( !newMetrics.present() || shardCount + newMetrics.get().shardCount >= CLIENT_KNOBS->SHARD_COUNT_LIMIT ) {
--nextIter;
forwardComplete = true;
continue;
@ -468,10 +520,10 @@ Future<Void> shardMerger(
// If going backward, stop when the stats are not present or if the shard is already over the merge
// bounds. If this check triggers right away (if we have not merged anything) then return a trigger
// on the previous shard changing "size".
if( !newMetrics.present() ) {
if( !newMetrics.present() || shardCount + newMetrics.get().shardCount >= CLIENT_KNOBS->SHARD_COUNT_LIMIT ) {
if( shardsMerged == 1 ) {
TEST( true ); // shardMerger cannot merge anything
return prevIter->value().stats->onChange();
return brokenPromiseToReady( prevIter->value().stats->onChange() );
}
++prevIter;
@ -480,15 +532,18 @@ Future<Void> shardMerger(
}
merged = KeyRangeRef( prevIter->range().begin, nextIter->range().end );
endingStats += newMetrics.get();
endingStats += newMetrics.get().metrics;
shardCount += newMetrics.get().shardCount;
lastLowBandwidthStartTime = newMetrics.get().lastLowBandwidthStartTime;
if((forwardComplete ? prevIter->range().begin : nextIter->range().begin) >= systemKeys.begin) {
systemBytes += newMetrics.get().bytes;
systemBytes += newMetrics.get().metrics.bytes;
}
shardsMerged++;
auto shardBounds = getShardSizeBounds( merged, maxShardSize );
if( endingStats.bytes >= shardBounds.min.bytes ||
getBandwidthStatus( endingStats ) != BandwidthStatusLow ||
now() - lastLowBandwidthStartTime < SERVER_KNOBS->DD_LOW_BANDWIDTH_DELAY ||
shardsMerged >= SERVER_KNOBS->DD_MERGE_LIMIT ) {
// The merged range is larger than the min bounds so we cannot continue merging in this direction.
// This means that:
@ -501,9 +556,10 @@ Future<Void> shardMerger(
break;
// If going forward, remove most recently added range
endingStats -= newMetrics.get();
endingStats -= newMetrics.get().metrics;
shardCount -= newMetrics.get().shardCount;
if(nextIter->range().begin >= systemKeys.begin) {
systemBytes -= newMetrics.get().bytes;
systemBytes -= newMetrics.get().metrics.bytes;
}
shardsMerged--;
--nextIter;
@ -519,12 +575,14 @@ Future<Void> shardMerger(
.detail("OldKeys", keys)
.detail("NewKeys", mergeRange)
.detail("EndingSize", endingStats.bytes)
.detail("BatchedMerges", shardsMerged);
.detail("BatchedMerges", shardsMerged)
.detail("LastLowBandwidthStartTime", lastLowBandwidthStartTime)
.detail("ShardCount", shardCount);
if(mergeRange.begin < systemKeys.begin) {
self->systemSizeEstimate -= systemBytes;
}
restartShardTrackers( self, mergeRange, endingStats );
restartShardTrackers( self, mergeRange, ShardMetrics(endingStats, lastLowBandwidthStartTime, shardCount) );
self->shardsAffectedByTeamFailure->defineShard( mergeRange );
self->output.send( RelocateShard( mergeRange, SERVER_KNOBS->PRIORITY_MERGE_SHARD ) );
@ -535,8 +593,8 @@ Future<Void> shardMerger(
ACTOR Future<Void> shardEvaluator(
DataDistributionTracker* self,
KeyRange keys,
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize,
HasBeenTrueFor *wantsToMerge)
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize,
Reference<HasBeenTrueFor> wantsToMerge)
{
Future<Void> onChange = shardSize->onChange() || yieldedFuture(self->maxShardSize->onChange());
@ -544,7 +602,7 @@ ACTOR Future<Void> shardEvaluator(
// getShardSizeBounds() will allways have shardBounds.min.bytes == 0 for shards that start at allKeys.begin,
// so will will never attempt to merge that shard with the one previous.
ShardSizeBounds shardBounds = getShardSizeBounds(keys, self->maxShardSize->get().get());
StorageMetrics const& stats = shardSize->get().get();
StorageMetrics const& stats = shardSize->get().get().metrics;
auto bandwidthStatus = getBandwidthStatus( stats );
bool shouldSplit = stats.bytes > shardBounds.max.bytes ||
@ -592,11 +650,8 @@ ACTOR Future<Void> shardEvaluator(
ACTOR Future<Void> shardTracker(
DataDistributionTracker* self,
KeyRange keys,
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize)
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize)
{
// Survives multiple calls to shardEvaluator and keeps merges from happening too quickly.
state HasBeenTrueFor wantsToMerge( shardSize->get().present() );
wait( yieldedFuture(self->readyToStart.getFuture()) );
if( !shardSize->get().present() )
@ -608,6 +663,9 @@ ACTOR Future<Void> shardTracker(
// Since maxShardSize will become present for all shards at once, avoid slow tasks with a short delay
wait( delay( 0, TaskPriority::DataDistribution ) );
// Survives multiple calls to shardEvaluator and keeps merges from happening too quickly.
state Reference<HasBeenTrueFor> wantsToMerge( new HasBeenTrueFor( shardSize->get() ) );
/*TraceEvent("ShardTracker", self->distributorId)
.detail("Begin", keys.begin)
.detail("End", keys.end)
@ -619,7 +677,7 @@ ACTOR Future<Void> shardTracker(
try {
loop {
// Use the current known size to check for (and start) splits and merges.
wait( shardEvaluator( self, keys, shardSize, &wantsToMerge ) );
wait( shardEvaluator( self, keys, shardSize, wantsToMerge ) );
// We could have a lot of actors being released from the previous wait at the same time. Immediately calling
// delay(0) mitigates the resulting SlowTask
@ -632,7 +690,7 @@ ACTOR Future<Void> shardTracker(
}
}
void restartShardTrackers( DataDistributionTracker* self, KeyRangeRef keys, Optional<StorageMetrics> startingSize ) {
void restartShardTrackers( DataDistributionTracker* self, KeyRangeRef keys, Optional<ShardMetrics> startingSize ) {
auto ranges = self->shards.getAffectedRangesAfterInsertion( keys, ShardTrackedData() );
for(int i=0; i<ranges.size(); i++) {
if( !ranges[i].value.trackShard.isValid() && ranges[i].begin != keys.begin ) {
@ -642,7 +700,7 @@ void restartShardTrackers( DataDistributionTracker* self, KeyRangeRef keys, Opti
continue;
}
Reference<AsyncVar<Optional<StorageMetrics>>> shardSize( new AsyncVar<Optional<StorageMetrics>>() );
Reference<AsyncVar<Optional<ShardMetrics>>> shardSize( new AsyncVar<Optional<ShardMetrics>>() );
// For the case where the new tracker will take over at the boundaries of current shard(s)
// we can use the old size if it is available. This will be the case when merging shards.
@ -697,7 +755,7 @@ ACTOR Future<Void> fetchShardMetrics_impl( DataDistributionTracker* self, GetMet
onChange = stats->onChange();
break;
}
returnMetrics += t.value().stats->get().get();
returnMetrics += t.value().stats->get().get().metrics;
}
if( !onChange.isValid() ) {

View File

@ -26,7 +26,7 @@ ServerKnobs const* SERVER_KNOBS = new ServerKnobs();
#define init( knob, value ) initKnob( knob, value, #knob )
ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimulated) {
// clang-format off
// Versions
init( VERSIONS_PER_SECOND, 1e6 );
@ -106,7 +106,6 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( INFLIGHT_PENALTY_HEALTHY, 1.0 );
init( INFLIGHT_PENALTY_UNHEALTHY, 10.0 );
init( INFLIGHT_PENALTY_ONE_LEFT, 1000.0 );
init( MERGE_ONTO_NEW_TEAM, 1 ); if( randomize && BUGGIFY ) MERGE_ONTO_NEW_TEAM = deterministicRandom()->coinflip() ? 0 : 2;
init( PRIORITY_RECOVER_MOVE, 110 );
init( PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, 120 );
@ -114,13 +113,13 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( PRIORITY_TEAM_HEALTHY, 140 );
init( PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER, 150 );
init( PRIORITY_TEAM_REDUNDANT, 200 );
init( PRIORITY_MERGE_SHARD, 340 );
init( PRIORITY_TEAM_UNHEALTHY, 700 );
init( PRIORITY_TEAM_2_LEFT, 709 );
init( PRIORITY_TEAM_1_LEFT, 800 );
init( PRIORITY_TEAM_FAILED, 805 );
init( PRIORITY_TEAM_0_LEFT, 809 );
init( PRIORITY_SPLIT_SHARD, 900 ); if( randomize && BUGGIFY ) PRIORITY_SPLIT_SHARD = 350;
init( PRIORITY_MERGE_SHARD, 940 ); if( randomize && BUGGIFY ) PRIORITY_SPLIT_SHARD = 340;
init( PRIORITY_SPLIT_SHARD, 950 ); if( randomize && BUGGIFY ) PRIORITY_SPLIT_SHARD = 350;
// Data distribution
init( RETRY_RELOCATESHARD_DELAY, 0.1 );
@ -190,7 +189,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( DATA_DISTRIBUTION_LOGGING_INTERVAL, 5.0 );
init( DD_ENABLED_CHECK_DELAY, 1.0 );
init( DD_STALL_CHECK_DELAY, 0.4 ); //Must be larger than 2*MAX_BUGGIFIED_DELAY
init( DD_MERGE_COALESCE_DELAY, 120.0 ); if( randomize && BUGGIFY ) DD_MERGE_COALESCE_DELAY = 0.001;
init( DD_LOW_BANDWIDTH_DELAY, isSimulated ? 90.0 : 240.0 ); if( randomize && BUGGIFY ) DD_LOW_BANDWIDTH_DELAY = 0; //Because of delayJitter, this should be less than 0.9 * DD_MERGE_COALESCE_DELAY
init( DD_MERGE_COALESCE_DELAY, isSimulated ? 120.0 : 300.0 ); if( randomize && BUGGIFY ) DD_MERGE_COALESCE_DELAY = 0.001;
init( STORAGE_METRICS_POLLING_DELAY, 2.0 ); if( randomize && BUGGIFY ) STORAGE_METRICS_POLLING_DELAY = 15.0;
init( STORAGE_METRICS_RANDOM_DELAY, 0.2 );
init( FREE_SPACE_RATIO_CUTOFF, 0.1 );
@ -352,7 +352,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( SAMPLE_EXPIRATION_TIME, 1.0 );
init( SAMPLE_POLL_TIME, 0.1 );
init( RESOLVER_STATE_MEMORY_LIMIT, 1e6 );
init( LAST_LIMITED_RATIO, 0.6 );
init( LAST_LIMITED_RATIO, 2.0 );
//Cluster Controller
init( CLUSTER_CONTROLLER_LOGGING_DELAY, 5.0 );
@ -488,6 +488,10 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( BYTE_SAMPLE_LOAD_DELAY, 0.0 ); if( randomize && BUGGIFY ) BYTE_SAMPLE_LOAD_DELAY = 0.1;
init( BYTE_SAMPLE_START_DELAY, 1.0 ); if( randomize && BUGGIFY ) BYTE_SAMPLE_START_DELAY = 0.0;
init( UPDATE_STORAGE_PROCESS_STATS_INTERVAL, 5.0 );
init( BEHIND_CHECK_DELAY, 2.0 );
init( BEHIND_CHECK_COUNT, 2 );
init( BEHIND_CHECK_VERSIONS, 5 * VERSIONS_PER_SECOND );
init( WAIT_METRICS_WRONG_SHARD_CHANCE, 0.1 );
//Wait Failure
init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2;

View File

@ -106,8 +106,7 @@ public:
double INFLIGHT_PENALTY_REDUNDANT;
double INFLIGHT_PENALTY_UNHEALTHY;
double INFLIGHT_PENALTY_ONE_LEFT;
int MERGE_ONTO_NEW_TEAM; // Merges will request new servers. 0 for off, 1 for \xff only, 2 for all shards.
// Higher priorities are executed first
// Priority/100 is the "priority group"/"superpriority". Priority inversion
// is possible within but not between priority groups; fewer priority groups
@ -119,12 +118,12 @@ public:
int PRIORITY_TEAM_HEALTHY;
int PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER;
int PRIORITY_TEAM_REDUNDANT;
int PRIORITY_MERGE_SHARD;
int PRIORITY_TEAM_UNHEALTHY;
int PRIORITY_TEAM_2_LEFT;
int PRIORITY_TEAM_1_LEFT;
int PRIORITY_TEAM_FAILED; // Priority when a server in the team is excluded as failed
int PRIORITY_TEAM_0_LEFT;
int PRIORITY_MERGE_SHARD;
int PRIORITY_SPLIT_SHARD;
// Data distribution
@ -151,6 +150,7 @@ public:
double DATA_DISTRIBUTION_LOGGING_INTERVAL;
double DD_ENABLED_CHECK_DELAY;
double DD_STALL_CHECK_DELAY;
double DD_LOW_BANDWIDTH_DELAY;
double DD_MERGE_COALESCE_DELAY;
double STORAGE_METRICS_POLLING_DELAY;
double STORAGE_METRICS_RANDOM_DELAY;
@ -428,6 +428,10 @@ public:
double BYTE_SAMPLE_LOAD_DELAY;
double BYTE_SAMPLE_START_DELAY;
double UPDATE_STORAGE_PROCESS_STATS_INTERVAL;
double BEHIND_CHECK_DELAY;
int BEHIND_CHECK_COUNT;
int64_t BEHIND_CHECK_VERSIONS;
double WAIT_METRICS_WRONG_SHARD_CHANCE;
//Wait Failure
int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS;
@ -474,7 +478,7 @@ public:
int64_t FASTRESTORE_HEARTBEAT_INTERVAL;
double FASTRESTORE_SAMPLING_PERCENT;
ServerKnobs(bool randomize = false, ClientKnobs* clientKnobs = NULL);
ServerKnobs(bool randomize = false, ClientKnobs* clientKnobs = NULL, bool isSimulated = false);
};
extern ServerKnobs const* SERVER_KNOBS;

View File

@ -217,6 +217,19 @@ struct LogSystemConfig {
return format("type: %d oldGenerations: %d tags: %d %s", logSystemType, oldTLogs.size(), logRouterTags, describe(tLogs).c_str());
}
Optional<Key> getRemoteDcId() const {
for( int i = 0; i < tLogs.size(); i++ ) {
if(!tLogs[i].isLocal) {
for( int j = 0; j < tLogs[i].tLogs.size(); j++ ) {
if( tLogs[i].tLogs[j].present() ) {
return tLogs[i].tLogs[j].interf().locality.dcId();
}
}
}
}
return Optional<Key>();
}
std::vector<TLogInterface> allLocalLogs(bool includeSatellite = true) const {
std::vector<TLogInterface> results;
for( int i = 0; i < tLogs.size(); i++ ) {

View File

@ -219,8 +219,6 @@ struct ProxyCommitData {
NotifiedVersion latestLocalCommitBatchResolving;
NotifiedVersion latestLocalCommitBatchLogging;
PromiseStream<Void> commitBatchStartNotifications;
PromiseStream<Future<GetCommitVersionReply>> commitBatchVersions; // 1:1 with commitBatchStartNotifications
RequestStream<GetReadVersionRequest> getConsistentReadVersion;
RequestStream<CommitTransactionRequest> commit;
Database cx;
@ -432,7 +430,6 @@ ACTOR Future<Void> commitBatcher(ProxyCommitData *commitData, PromiseStream<std:
}
if(!batch.size()) {
commitData->commitBatchStartNotifications.send(Void());
if(now() - lastBatch > commitData->commitBatchInterval) {
timeout = delayJittered(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE, TaskPriority::ProxyCommitBatcher);
}
@ -444,7 +441,6 @@ ACTOR Future<Void> commitBatcher(ProxyCommitData *commitData, PromiseStream<std:
if((batchBytes + bytes > CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT || req.firstInBatch()) && batch.size()) {
out.send({ batch, batchBytes });
lastBatch = now();
commitData->commitBatchStartNotifications.send(Void());
timeout = delayJittered(commitData->commitBatchInterval, TaskPriority::ProxyCommitBatcher);
batch = std::vector<CommitTransactionRequest>();
batchBytes = 0;
@ -508,7 +504,7 @@ ACTOR Future<Void> addBackupMutations(ProxyCommitData* self, std::map<Key, Mutat
while(blobIter) {
if(yieldBytes > SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
yieldBytes = 0;
wait(yield());
wait(yield(TaskPriority::ProxyCommitYield2));
}
valueWriter.serializeBytes(blobIter->data);
yieldBytes += blobIter->data.size();
@ -602,21 +598,16 @@ ACTOR Future<Void> commitBatch(
if (debugID.present())
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.Before");
if (trs.empty()) {
// We are sending an empty batch, so we have to trigger the version fetcher
self->commitBatchStartNotifications.send(Void());
}
/////// Phase 1: Pre-resolution processing (CPU bound except waiting for a version # which is separately pipelined and *should* be available by now (unless empty commit); ordered; currently atomic but could yield)
TEST(self->latestLocalCommitBatchResolving.get() < localBatchNumber-1); // Queuing pre-resolution commit processing
wait(self->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber-1));
wait(yield());
wait(yield(TaskPriority::ProxyCommitYield1));
if (debugID.present())
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GettingCommitVersion");
Future<GetCommitVersionReply> fVersionReply = waitNext(self->commitBatchVersions.getFuture());
GetCommitVersionReply versionReply = wait(fVersionReply);
GetCommitVersionRequest req(self->commitVersionRequestNumber++, self->mostRecentProcessedRequestNumber, self->dbgid);
GetCommitVersionReply versionReply = wait( brokenPromiseToNever(self->master.getCommitVersion.getReply(req, TaskPriority::ProxyMasterVersionReply)) );
self->mostRecentProcessedRequestNumber = versionReply.requestNum;
self->stats.txnCommitVersionAssigned += trs.size();
@ -674,7 +665,7 @@ ACTOR Future<Void> commitBatch(
////// Phase 3: Post-resolution processing (CPU bound except for very rare situations; ordered; currently atomic but doesn't need to be)
TEST(self->latestLocalCommitBatchLogging.get() < localBatchNumber-1); // Queuing post-resolution commit processing
wait(self->latestLocalCommitBatchLogging.whenAtLeast(localBatchNumber-1));
wait(yield());
wait(yield(TaskPriority::ProxyCommitYield2));
self->stats.txnCommitResolved += trs.size();
@ -832,7 +823,7 @@ ACTOR Future<Void> commitBatch(
for (; mutationNum < pMutations->size(); mutationNum++) {
if(yieldBytes > SERVER_KNOBS->DESIRED_TOTAL_BYTES) {
yieldBytes = 0;
wait(yield());
wait(yield(TaskPriority::ProxyCommitYield2));
}
auto& m = (*pMutations)[mutationNum];
@ -1014,7 +1005,7 @@ ACTOR Future<Void> commitBatch(
}
self->lastCommitLatency = now()-commitStartTime;
self->lastCommitTime = std::max(self->lastCommitTime.get(), commitStartTime);
wait(yield());
wait(yield(TaskPriority::ProxyCommitYield3));
if( self->popRemoteTxs && msg.popTo > ( self->txsPopVersions.size() ? self->txsPopVersions.back().second : self->lastTxsPop ) ) {
if(self->txsPopVersions.size() >= SERVER_KNOBS->MAX_TXS_POP_VERSION_HISTORY) {
@ -1162,14 +1153,6 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commi
return rep;
}
ACTOR Future<Void> fetchVersions(ProxyCommitData *commitData) {
loop {
waitNext(commitData->commitBatchStartNotifications.getFuture());
GetCommitVersionRequest req(commitData->commitVersionRequestNumber++, commitData->mostRecentProcessedRequestNumber, commitData->dbgid);
commitData->commitBatchVersions.send(brokenPromiseToNever(commitData->master.getCommitVersion.getReply(req)));
}
}
struct TransactionRateInfo {
double rate;
double limit;
@ -1661,7 +1644,6 @@ ACTOR Future<Void> masterProxyServerCore(
state GetHealthMetricsReply healthMetricsReply;
state GetHealthMetricsReply detailedHealthMetricsReply;
addActor.send( fetchVersions(&commitData) );
addActor.send( waitFailureServer(proxy.waitFailure.getFuture()) );
//TraceEvent("ProxyInit1", proxy.id());

View File

@ -2301,9 +2301,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
}
wait(logData->committingQueue.getFuture() || logData->removed );
} catch( Error &e ) {
if(e.code() != error_code_actor_cancelled) {
req.reply.sendError(e);
}
req.reply.sendError(recruitment_failed());
if( e.code() != error_code_worker_removed ) {
throw;

View File

@ -182,6 +182,7 @@ struct RatekeeperData {
RatekeeperLimits batchLimits;
Deque<double> actualTpsHistory;
Optional<Key> remoteDC;
RatekeeperData() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")),
@ -384,7 +385,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
// Look at each storage server's write queue and local rate, compute and store the desired rate ratio
for(auto i = self->storageQueueInfo.begin(); i != self->storageQueueInfo.end(); ++i) {
auto& ss = i->value;
if (!ss.valid) continue;
if (!ss.valid || (self->remoteDC.present() && ss.locality.dcId() == self->remoteDC)) continue;
++sscount;
limitReason_t ssLimitReason = limitReason_t::unlimited;
@ -537,7 +538,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
Version minLimitingSSVer = std::numeric_limits<Version>::max();
for (const auto& it : self->storageQueueInfo) {
auto& ss = it.value;
if (!ss.valid) continue;
if (!ss.valid || (self->remoteDC.present() && ss.locality.dcId() == self->remoteDC)) continue;
minSSVer = std::min(minSSVer, ss.lastReply.version);
@ -741,6 +742,8 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
for( int i = 0; i < tlogInterfs.size(); i++ )
tlogTrackers.push_back( splitError( trackTLogQueueInfo(&self, tlogInterfs[i]), err ) );
self.remoteDC = dbInfo->get().logSystemConfig.getRemoteDcId();
try {
state bool lastLimited = false;
loop choose {
@ -797,6 +800,7 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
for( int i = 0; i < tlogInterfs.size(); i++ )
tlogTrackers.push_back( splitError( trackTLogQueueInfo(&self, tlogInterfs[i]), err ) );
}
self.remoteDC = dbInfo->get().logSystemConfig.getRemoteDcId();
}
when ( wait(collection) ) {
ASSERT(false);

View File

@ -2756,9 +2756,7 @@ ACTOR Future<Void> tLogStart( TLogData* self, InitializeTLogRequest req, Localit
}
wait(logData->committingQueue.getFuture() || logData->removed );
} catch( Error &e ) {
if(e.code() != error_code_actor_cancelled) {
req.reply.sendError(e);
}
req.reply.sendError(recruitment_failed());
if( e.code() != error_code_worker_removed ) {
throw;

View File

@ -436,7 +436,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
vector<Future<Void>> tLogCommitResults;
for(int loc=0; loc< it->logServers.size(); loc++) {
Standalone<StringRef> msg = data.getMessages(location);
allReplies.push_back( it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( msg.arena(), prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, msg, debugID ), TaskPriority::TLogCommitReply ) );
allReplies.push_back( it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( msg.arena(), prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, msg, debugID ), TaskPriority::ProxyTLogCommitReply ) );
Future<Void> commitSuccess = success(allReplies.back());
addActor.get().send(commitSuccess);
tLogCommitResults.push_back(commitSuccess);

View File

@ -1143,10 +1143,8 @@ private:
}
case OPT_TRACECLOCK: {
const char* a = args.OptionArg();
if (!strcmp(a, "realtime"))
g_trace_clock = TRACE_CLOCK_REALTIME;
else if (!strcmp(a, "now"))
g_trace_clock = TRACE_CLOCK_NOW;
if (!strcmp(a, "realtime")) g_trace_clock.store(TRACE_CLOCK_REALTIME);
else if (!strcmp(a, "now")) g_trace_clock.store(TRACE_CLOCK_NOW);
else {
fprintf(stderr, "ERROR: Unknown clock source `%s'\n", a);
printHelpTeaser(argv[0]);
@ -1538,7 +1536,7 @@ int main(int argc, char* argv[]) {
delete CLIENT_KNOBS;
FlowKnobs* flowKnobs = new FlowKnobs(true, role == Simulation);
ClientKnobs* clientKnobs = new ClientKnobs(true);
ServerKnobs* serverKnobs = new ServerKnobs(true, clientKnobs);
ServerKnobs* serverKnobs = new ServerKnobs(true, clientKnobs, role == Simulation);
FLOW_KNOBS = flowKnobs;
SERVER_KNOBS = serverKnobs;
CLIENT_KNOBS = clientKnobs;

View File

@ -434,6 +434,7 @@ public:
bool shuttingDown;
bool behind;
bool versionBehind;
bool debug_inApplyUpdate;
double debug_lastValidateTime;
@ -530,7 +531,7 @@ public:
shuttingDown(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0), numWatches(0),
logProtocol(0), counters(this), tag(invalidTag), maxQueryQueue(0), thisServerID(ssi.id()),
readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")),
behind(false), byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), noRecentUpdates(false),
behind(false), versionBehind(false), byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), noRecentUpdates(false),
lastUpdate(now()), poppedAllAfter(std::numeric_limits<Version>::max()), cpuUsage(0.0), diskUsage(0.0)
{
version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id);
@ -765,7 +766,7 @@ ACTOR Future<Version> waitForVersion( StorageServer* data, Version version ) {
else if (version <= data->version.get())
return version;
if(data->behind && version > data->version.get()) {
if((data->behind || data->versionBehind) && version > data->version.get()) {
throw process_behind();
}
@ -3419,9 +3420,18 @@ ACTOR Future<Void> waitMetrics( StorageServerMetrics* self, WaitMetricsRequest r
break;
}
if ( timedout || !req.min.allLessOrEqual( metrics ) || !metrics.allLessOrEqual( req.max ) ) {
TEST( !timedout ); // ShardWaitMetrics return case 2 (delayed)
TEST( timedout ); // ShardWaitMetrics return on timeout
if( timedout ) {
TEST( true ); // ShardWaitMetrics return on timeout
if(deterministicRandom()->random01() < SERVER_KNOBS->WAIT_METRICS_WRONG_SHARD_CHANCE) {
req.reply.sendError( wrong_shard_server() );
} else {
req.reply.send( metrics );
}
break;
}
if ( !req.min.allLessOrEqual( metrics ) || !metrics.allLessOrEqual( req.max ) ) {
TEST( true ); // ShardWaitMetrics return case 2 (delayed)
req.reply.send( metrics );
break;
}
@ -3510,6 +3520,28 @@ ACTOR Future<Void> logLongByteSampleRecovery(Future<Void> recovery) {
return Void();
}
ACTOR Future<Void> checkBehind( StorageServer* self ) {
state int behindCount = 0;
loop {
wait( delay(SERVER_KNOBS->BEHIND_CHECK_DELAY) );
state Transaction tr(self->cx);
loop {
try {
Version readVersion = wait( tr.getRawReadVersion() );
if( readVersion > self->version.get() + SERVER_KNOBS->BEHIND_CHECK_VERSIONS ) {
behindCount++;
} else {
behindCount = 0;
}
self->versionBehind = behindCount >= SERVER_KNOBS->BEHIND_CHECK_COUNT;
break;
} catch( Error &e ) {
wait(tr.onError(e));
}
}
}
}
ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterface ssi )
{
state Future<Void> doUpdate = Void();
@ -3526,6 +3558,7 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
actors.add(self->otherError.getFuture());
actors.add(metricsCore(self, ssi));
actors.add(logLongByteSampleRecovery(self->byteSampleRecovery));
actors.add(checkBehind(self));
self->coreStarted.send( Void() );

View File

@ -69,15 +69,23 @@ extern IKeyValueStore* keyValueStoreCompressTestData(IKeyValueStore* store);
ACTOR static Future<Void> extractClientInfo( Reference<AsyncVar<ServerDBInfo>> db, Reference<AsyncVar<ClientDBInfo>> info ) {
state std::vector<UID> lastProxyUIDs;
state std::vector<MasterProxyInterface> lastProxies;
loop {
info->set( db->get().client );
ClientDBInfo ni = db->get().client;
shrinkProxyList(ni, lastProxyUIDs, lastProxies);
info->set( ni );
wait( db->onChange() );
}
}
ACTOR static Future<Void> extractClientInfo( Reference<AsyncVar<CachedSerialization<ServerDBInfo>>> db, Reference<AsyncVar<ClientDBInfo>> info ) {
state std::vector<UID> lastProxyUIDs;
state std::vector<MasterProxyInterface> lastProxies;
loop {
info->set( db->get().read().client );
ClientDBInfo ni = db->get().read().client;
shrinkProxyList(ni, lastProxyUIDs, lastProxies);
info->set( ni );
wait( db->onChange() );
}
}

View File

@ -74,6 +74,8 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
init( TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT, 20.0 );
init( TLS_CERT_REFRESH_DELAY_SECONDS, 12*60*60 );
init( TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT, 9.0 );
init( TLS_CLIENT_CONNECTION_THROTTLE_TIMEOUT, 11.0 );
init( NETWORK_TEST_REPLY_SIZE, 600e3 );

View File

@ -91,9 +91,11 @@ public:
int USE_OBJECT_SERIALIZER;
int TLS_CERT_REFRESH_DELAY_SECONDS;
double TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT;
double TLS_CLIENT_CONNECTION_THROTTLE_TIMEOUT;
int NETWORK_TEST_REPLY_SIZE;
//AsyncFileCached
int64_t PAGE_CACHE_4K;
int64_t PAGE_CACHE_64K;

View File

@ -167,7 +167,6 @@ public:
uint64_t numYields;
double lastPriorityTrackTime;
TaskPriority lastMinTaskID;
std::priority_queue<OrderedTask, std::vector<OrderedTask>> ready;
@ -521,7 +520,7 @@ Net2::Net2(bool useThreadPool, bool useMetrics)
int priBins[] = { 1, 2050, 3050, 4050, 4950, 5050, 7050, 8050, 10050 };
static_assert( sizeof(priBins) == sizeof(int)*NetworkMetrics::PRIORITY_BINS, "Fix priority bins");
for(int i=0; i<NetworkMetrics::PRIORITY_BINS; i++)
networkMetrics.priorityBins[i] = static_cast<TaskPriority>(priBins[i]);
networkInfo.metrics.priorityBins[i] = static_cast<TaskPriority>(priBins[i]);
updateNow();
}
@ -737,22 +736,21 @@ void Net2::run() {
void Net2::trackMinPriority( TaskPriority minTaskID, double now ) {
if (minTaskID != lastMinTaskID) {
for(int c=0; c<NetworkMetrics::PRIORITY_BINS; c++) {
TaskPriority pri = networkMetrics.priorityBins[c];
TaskPriority pri = networkInfo.metrics.priorityBins[c];
if (pri > minTaskID && pri <= lastMinTaskID) { // busy -> idle
double busyFor = lastPriorityTrackTime - networkMetrics.priorityTimer[c];
networkMetrics.priorityBlocked[c] = false;
networkMetrics.priorityBlockedDuration[c] += busyFor;
networkMetrics.secSquaredPriorityBlocked[c] += busyFor * busyFor;
networkInfo.metrics.priorityBlocked[c] = false;
networkInfo.metrics.priorityBlockedDuration[c] += now - networkInfo.metrics.windowedPriorityTimer[c];
networkInfo.metrics.priorityMaxBlockedDuration[c] = std::max(networkInfo.metrics.priorityMaxBlockedDuration[c], now - networkInfo.metrics.priorityTimer[c]);
}
if (pri <= minTaskID && pri > lastMinTaskID) { // idle -> busy
networkMetrics.priorityBlocked[c] = true;
networkMetrics.priorityTimer[c] = now;
networkInfo.metrics.priorityBlocked[c] = true;
networkInfo.metrics.priorityTimer[c] = now;
networkInfo.metrics.windowedPriorityTimer[c] = now;
}
}
}
lastMinTaskID = minTaskID;
lastPriorityTrackTime = now;
}
void Net2::processThreadReady() {
@ -772,7 +770,7 @@ void Net2::checkForSlowTask(int64_t tscBegin, int64_t tscEnd, double duration, T
int64_t elapsed = tscEnd-tscBegin;
if (elapsed > FLOW_KNOBS->TSC_YIELD_TIME && tscBegin > 0) {
int i = std::min<double>(NetworkMetrics::SLOW_EVENT_BINS-1, log( elapsed/1e6 ) / log(2.));
++networkMetrics.countSlowEvents[i];
++networkInfo.metrics.countSlowEvents[i];
int64_t warnThreshold = g_network->isSimulated() ? 10e9 : 500e6;
//printf("SlowTask: %d, %d yields\n", (int)(elapsed/1e6), numYields);

View File

@ -133,12 +133,12 @@ std::string removeWhitespace(const std::string &t)
if (found != std::string::npos)
str.erase(found + 1);
else
str.clear(); // str is all whitespace
str.clear(); // str is all whitespace
found = str.find_first_not_of(ws);
if (found != std::string::npos)
str.erase(0, found);
else
str.clear(); // str is all whitespace
str.clear(); // str is all whitespace
return str;
}
@ -2805,6 +2805,8 @@ extern volatile bool net2backtraces_overflow;
extern volatile int64_t net2backtraces_count;
extern std::atomic<int64_t> net2liveness;
extern void initProfiling();
std::atomic<double> checkThreadTime;
#endif
volatile thread_local bool profileThread = false;
@ -2852,7 +2854,9 @@ void profileHandler(int sig) {
// We are casting away the volatile-ness of the backtrace array, but we believe that should be reasonably safe in the signal handler
ProfilingSample* ps = const_cast<ProfilingSample*>((volatile ProfilingSample*)(net2backtraces + net2backtraces_offset));
ps->timestamp = timer();
// We can only read the check thread time in a signal handler if the atomic is lock free.
// We can't get the time from a timer() call because it's not signal safe.
ps->timestamp = checkThreadTime.is_lock_free() ? checkThreadTime.load() : 0;
// SOMEDAY: should we limit the maximum number of frames from backtrace beyond just available space?
size_t size = backtrace(ps->frames, net2backtraces_max - net2backtraces_offset - 2);
@ -2899,6 +2903,7 @@ void* checkThread(void *arg) {
}
lastSignal = t;
checkThreadTime.store(lastSignal);
pthread_kill(mainThread, SIGPROF);
}
}

View File

@ -95,8 +95,8 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta
.detail("MachineID", machineState.machineId)
.detail("AIOSubmitCount", netData.countAIOSubmit - statState->networkState.countAIOSubmit)
.detail("AIOCollectCount", netData.countAIOCollect - statState->networkState.countAIOCollect)
.detail("AIOSubmitLag", (g_network->networkMetrics.secSquaredSubmit - statState->networkMetricsState.secSquaredSubmit) / currentStats.elapsed)
.detail("AIODiskStall", (g_network->networkMetrics.secSquaredDiskStall - statState->networkMetricsState.secSquaredDiskStall) / currentStats.elapsed)
.detail("AIOSubmitLag", (g_network->networkInfo.metrics.secSquaredSubmit - statState->networkMetricsState.secSquaredSubmit) / currentStats.elapsed)
.detail("AIODiskStall", (g_network->networkInfo.metrics.secSquaredDiskStall - statState->networkMetricsState.secSquaredDiskStall) / currentStats.elapsed)
.detail("CurrentConnections", netData.countConnEstablished - netData.countConnClosedWithError - netData.countConnClosedWithoutError)
.detail("ConnectionsEstablished", (double) (netData.countConnEstablished - statState->networkState.countConnEstablished) / currentStats.elapsed)
.detail("ConnectionsClosed", ((netData.countConnClosedWithError - statState->networkState.countConnClosedWithError) + (netData.countConnClosedWithoutError - statState->networkState.countConnClosedWithoutError)) / currentStats.elapsed)
@ -142,23 +142,22 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta
.detail("ReactTime", netData.countReactTime - statState->networkState.countReactTime);
for (int i = 0; i<NetworkMetrics::SLOW_EVENT_BINS; i++) {
if (int c = g_network->networkMetrics.countSlowEvents[i] - statState->networkMetricsState.countSlowEvents[i]) {
if (int c = g_network->networkInfo.metrics.countSlowEvents[i] - statState->networkMetricsState.countSlowEvents[i]) {
n.detail(format("SlowTask%dM", 1 << i).c_str(), c);
}
}
for (int i = 0; i < NetworkMetrics::PRIORITY_BINS && g_network->networkMetrics.priorityBins[i] != TaskPriority::Zero; i++) {
if(g_network->networkMetrics.priorityBlocked[i]) {
double lastSegment = std::min(currentStats.elapsed, now() - g_network->networkMetrics.priorityTimer[i]);
g_network->networkMetrics.priorityBlockedDuration[i] += lastSegment;
g_network->networkMetrics.secSquaredPriorityBlocked[i] += lastSegment * lastSegment;
g_network->networkMetrics.priorityTimer[i] = now();
for (int i = 0; i < NetworkMetrics::PRIORITY_BINS && g_network->networkInfo.metrics.priorityBins[i] != TaskPriority::Zero; i++) {
if(g_network->networkInfo.metrics.priorityBlocked[i]) {
g_network->networkInfo.metrics.priorityBlockedDuration[i] += now() - g_network->networkInfo.metrics.windowedPriorityTimer[i];
g_network->networkInfo.metrics.priorityMaxBlockedDuration[i] = std::max(g_network->networkInfo.metrics.priorityMaxBlockedDuration[i], now() - g_network->networkInfo.metrics.priorityTimer[i]);
g_network->networkInfo.metrics.windowedPriorityTimer[i] = now();
}
double blocked = g_network->networkMetrics.priorityBlockedDuration[i] - statState->networkMetricsState.priorityBlockedDuration[i];
double s2Blocked = g_network->networkMetrics.secSquaredPriorityBlocked[i] - statState->networkMetricsState.secSquaredPriorityBlocked[i];
n.detail(format("PriorityBusy%d", g_network->networkMetrics.priorityBins[i]).c_str(), blocked);
n.detail(format("SumOfSquaredPriorityBusy%d", g_network->networkMetrics.priorityBins[i]).c_str(), s2Blocked);
n.detail(format("PriorityBusy%d", g_network->networkInfo.metrics.priorityBins[i]).c_str(), std::min(currentStats.elapsed, g_network->networkInfo.metrics.priorityBlockedDuration[i] - statState->networkMetricsState.priorityBlockedDuration[i]));
n.detail(format("PriorityMaxBusy%d", g_network->networkInfo.metrics.priorityBins[i]).c_str(), g_network->networkInfo.metrics.priorityMaxBlockedDuration[i]);
g_network->networkInfo.metrics.priorityMaxBlockedDuration[i] = 0;
}
n.trackLatest("NetworkMetrics");
@ -288,7 +287,7 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta
}
}
#endif
statState->networkMetricsState = g_network->networkMetrics;
statState->networkMetricsState = g_network->networkInfo.metrics;
statState->networkState = netData;
return currentStats;
}

View File

@ -113,7 +113,7 @@ struct SuppressionMap {
};
TraceBatch g_traceBatch;
thread_local trace_clock_t g_trace_clock = TRACE_CLOCK_REALTIME;
std::atomic<trace_clock_t> g_trace_clock{ TRACE_CLOCK_NOW };
LatestEventCache latestEventCache;
SuppressionMap suppressedEvents;
@ -422,7 +422,7 @@ public:
TraceEventFields rolledFields;
for(auto itr = events[idx].begin(); itr != events[idx].end(); ++itr) {
if(itr->first == "Time") {
rolledFields.addField("Time", format("%.6f", (g_trace_clock == TRACE_CLOCK_NOW) ? now() : timer()));
rolledFields.addField("Time", format("%.6f", TraceEvent::getCurrentTime()));
rolledFields.addField("OriginalTime", itr->second);
}
else if(itr->first == "TrackLatestType") {
@ -724,26 +724,12 @@ bool TraceEvent::init() {
if(enabled) {
tmpEventMetric = new DynamicEventMetric(MetricNameRef());
double time;
if(g_trace_clock == TRACE_CLOCK_NOW) {
if(!g_network) {
static double preNetworkTime = timer_monotonic();
time = preNetworkTime;
}
else {
time = now();
}
}
else {
time = timer();
}
if(err.isValid() && err.isInjectedFault() && severity == SevError) {
severity = SevWarnAlways;
}
detail("Severity", int(severity));
detailf("Time", "%.6f", time);
detailf("Time", "%.6f", getCurrentTime());
detail("Type", type);
if(g_network && g_network->isSimulated()) {
NetworkAddress local = g_network->getLocalAddress();
@ -994,13 +980,26 @@ thread_local bool TraceEvent::networkThread = false;
void TraceEvent::setNetworkThread() {
traceEventThrottlerCache = new TransientThresholdMetricSample<Standalone<StringRef>>(FLOW_KNOBS->TRACE_EVENT_METRIC_UNITS_PER_SAMPLE, FLOW_KNOBS->TRACE_EVENT_THROTTLER_MSG_LIMIT);
networkThread = true;
g_trace_clock = TRACE_CLOCK_NOW;
}
bool TraceEvent::isNetworkThread() {
return networkThread;
}
double TraceEvent::getCurrentTime() {
if(g_trace_clock.load() == TRACE_CLOCK_NOW) {
if(!isNetworkThread() || !g_network) {
return timer_monotonic();
}
else {
return now();
}
}
else {
return timer();
}
}
TraceInterval& TraceInterval::begin() {
pairID = nondeterministicRandom()->randomUniqueID();
count = 0;
@ -1008,20 +1007,20 @@ TraceInterval& TraceInterval::begin() {
}
void TraceBatch::addEvent( const char *name, uint64_t id, const char *location ) {
eventBatch.push_back( EventInfo(g_trace_clock == TRACE_CLOCK_NOW ? now() : timer(), name, id, location));
eventBatch.push_back( EventInfo(TraceEvent::getCurrentTime(), name, id, location));
if( g_network->isSimulated() || FLOW_KNOBS->AUTOMATIC_TRACE_DUMP )
dump();
}
void TraceBatch::addAttach( const char *name, uint64_t id, uint64_t to ) {
attachBatch.push_back( AttachInfo(g_trace_clock == TRACE_CLOCK_NOW ? now() : timer(), name, id, to));
attachBatch.push_back( AttachInfo(TraceEvent::getCurrentTime(), name, id, to));
if( g_network->isSimulated() || FLOW_KNOBS->AUTOMATIC_TRACE_DUMP )
dump();
}
void TraceBatch::addBuggify( int activated, int line, std::string file ) {
if( g_network ) {
buggifyBatch.push_back( BuggifyInfo(g_trace_clock == TRACE_CLOCK_NOW ? now() : timer(), activated, line, file));
buggifyBatch.push_back( BuggifyInfo(TraceEvent::getCurrentTime(), activated, line, file));
if( g_network->isSimulated() || FLOW_KNOBS->AUTOMATIC_TRACE_DUMP )
dump();
} else {

View File

@ -22,6 +22,7 @@
#define FLOW_TRACE_H
#pragma once
#include <atomic>
#include <stdarg.h>
#include <stdint.h>
#include <string>
@ -381,6 +382,8 @@ struct TraceEvent {
static void setNetworkThread();
static bool isNetworkThread();
static double getCurrentTime();
//Must be called directly after constructing the trace event
TraceEvent& error(const class Error& e, bool includeCancelled=false) {
if (enabled) {
@ -572,7 +575,7 @@ void addTraceRole(std::string role);
void removeTraceRole(std::string role);
enum trace_clock_t { TRACE_CLOCK_NOW, TRACE_CLOCK_REALTIME };
extern thread_local trace_clock_t g_trace_clock;
extern std::atomic<trace_clock_t> g_trace_clock;
extern TraceBatch g_traceBatch;
#define DUMPTOKEN(name) \

View File

@ -59,9 +59,14 @@ enum class TaskPriority {
TLogCommitReply = 8580,
TLogCommit = 8570,
ProxyGetRawCommittedVersion = 8565,
ProxyResolverReply = 8560,
ProxyCommitBatcher = 8550,
ProxyCommit = 8540,
ProxyCommitYield3 = 8562,
ProxyTLogCommitReply = 8560,
ProxyCommitYield2 = 8557,
ProxyResolverReply = 8555,
ProxyMasterVersionReply = 8550,
ProxyCommitYield1 = 8547,
ProxyCommit = 8545,
ProxyCommitBatcher = 8540,
TLogConfirmRunningReply = 8530,
TLogConfirmRunning = 8520,
ProxyGRVTimer = 8510,
@ -300,24 +305,31 @@ template <class T> class Promise;
struct NetworkMetrics {
enum { SLOW_EVENT_BINS = 16 };
uint64_t countSlowEvents[SLOW_EVENT_BINS];
uint64_t countSlowEvents[SLOW_EVENT_BINS] = {};
enum { PRIORITY_BINS = 9 };
TaskPriority priorityBins[ PRIORITY_BINS ];
bool priorityBlocked[PRIORITY_BINS];
double priorityBlockedDuration[PRIORITY_BINS];
double secSquaredPriorityBlocked[PRIORITY_BINS];
double priorityTimer[PRIORITY_BINS];
TaskPriority priorityBins[PRIORITY_BINS] = {};
bool priorityBlocked[PRIORITY_BINS] = {};
double priorityBlockedDuration[PRIORITY_BINS] = {};
double priorityMaxBlockedDuration[PRIORITY_BINS] = {};
double priorityTimer[PRIORITY_BINS] = {};
double windowedPriorityTimer[PRIORITY_BINS] = {};
double oldestAlternativesFailure;
double newestAlternativesFailure;
double lastAlternativesFailureSkipDelay;
double lastSync;
double secSquaredSubmit = 0;
double secSquaredDiskStall = 0;
double secSquaredSubmit;
double secSquaredDiskStall;
NetworkMetrics() {}
};
NetworkMetrics() { memset(this, 0, sizeof(*this)); }
struct NetworkInfo {
NetworkMetrics metrics;
double oldestAlternativesFailure = 0;
double newestAlternativesFailure = 0;
double lastAlternativesFailureSkipDelay = 0;
std::map<std::pair<IPAddress, uint16_t>, double> serverTLSConnectionThrottler;
NetworkInfo() {}
};
class IEventFD : public ReferenceCounted<IEventFD> {
@ -468,7 +480,7 @@ public:
return (netAddressesFuncPtr) ? reinterpret_cast<NetworkAddressesFuncPtr>(netAddressesFuncPtr)() : NetworkAddressList();
}
NetworkMetrics networkMetrics;
NetworkInfo networkInfo;
protected:
INetwork() {}

View File

@ -32,7 +32,7 @@
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
<Product Name='$(var.Title)'
Id='{51E254F0-440E-4746-B7B3-83051EB87E6B}'
Id='{6D62DCD3-AFF0-4665-923F-6BEA167C5507}'
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
Version='$(var.Version)'
Manufacturer='$(var.Manufacturer)'