diff --git a/FDBLibTLS/FDBLibTLSSession.cpp b/FDBLibTLS/FDBLibTLSSession.cpp index d73f655dc9..d81dc4e509 100644 --- a/FDBLibTLS/FDBLibTLSSession.cpp +++ b/FDBLibTLS/FDBLibTLSSession.cpp @@ -347,7 +347,7 @@ bool FDBLibTLSSession::verify_peer() { if(now() - lastVerifyFailureLogged > 1.0) { for (std::string reason : verify_failure_reasons) { lastVerifyFailureLogged = now(); - TraceEvent("FDBLibTLSVerifyFailure", uid).detail("Reason", reason); + TraceEvent("FDBLibTLSVerifyFailure", uid).suppressFor(1.0).detail("Reason", reason); } } } diff --git a/cmake/FlowCommands.cmake b/cmake/FlowCommands.cmake index 19df995f25..5f6ad2785a 100644 --- a/cmake/FlowCommands.cmake +++ b/cmake/FlowCommands.cmake @@ -133,10 +133,10 @@ function(strip_debug_symbols target) COMMENT "Stripping symbols from ${target}") set(out_files "${out_file}") if(is_exec AND NOT APPLE) - add_custom_command(OUTPUT "${out_file}.debug" - COMMAND objcopy --only-keep-debug $ "${out_file}.debug" && - objcopy --add-gnu-debuglink="${out_file}.debug" ${out_file} - COMMENT "Copy debug symbols to ${out_name}.debug") + add_custom_command(OUTPUT "${out_file}.debug" + COMMAND objcopy --verbose --only-keep-debug $ "${out_file}.debug" + COMMAND objcopy --verbose --add-gnu-debuglink="${out_file}.debug" "${out_file}" + COMMENT "Copy debug symbols to ${out_name}.debug") list(APPEND out_files "${out_file}.debug") endif() add_custom_target(strip_${target} DEPENDS ${out_files}) diff --git a/documentation/sphinx/source/administration.rst b/documentation/sphinx/source/administration.rst index 14999a4e74..afbc27323a 100644 --- a/documentation/sphinx/source/administration.rst +++ b/documentation/sphinx/source/administration.rst @@ -492,6 +492,19 @@ If a process has had more than 10 TCP segments retransmitted in the last 5 secon 10.0.4.1:4500 ( 3% cpu; 2% machine; 0.004 Gbps; 0% disk; REXMIT! 2.5 GB / 4.1 GB RAM ) +Machine-readable status +-------------------------------- + +The status command can provide a complete summary of statistics about the cluster and the database with the ``json`` argument. Full documentation for ``status json`` output can be found :doc:`here `. +From the output of ``status json``, operators can find useful health metrics to determine whether or not their cluster is hitting performance limits. + +====================== ============================================================================================================== +Ratekeeper limit ``cluster.qos.transactions_per_second_limit`` contains the number of read versions per second that the cluster can give out. A low ratekeeper limit indicates that the cluster performance is limited in some way. The reason for a low ratekeeper limit can be found at ``cluster.qos.performance_limited_by``. ``cluster.qos.released_transactions_per_second`` describes the number of read versions given out per second, and can be used to tell how close the ratekeeper is to throttling. +Storage queue size ``cluster.qos.worst_queue_bytes_storage_server`` contains the maximum size in bytes of a storage queue. Each storage server has mutations that have not yet been made durable, stored in its storage queue. If this value gets too large, it indicates a storage server is falling behind. A large storage queue will cause the ratekeeper to increase throttling. However, depending on the configuration, the ratekeeper can ignore the worst storage queue from one fault domain. Thus, ratekeeper uses ``cluster.qos.limiting_queue_bytes_storage_server`` to determine the throttling level. +Durable version lag ``cluster.qos.worst_durability_lag_storage_server`` contains information about the worst storage server durability lag. The ``versions`` subfield contains the maximum number of versions in a storage queue. Ideally, this should be near 5 million. The ``seconds`` subfield contains the maximum number of seconds of non-durable data in a storage queue. Ideally, this should be near 5 seconds. If a storage server is overwhelmed, the durability lag could rise, causing performance issues. +Transaction log queue ``cluster.qos.worst_queue_bytes_log_server`` contains the maximum size in bytes of the mutations stored on a transaction log that have not yet been popped by storage servers. A large transaction log queue size can potentially cause the ratekeeper to increase throttling. +====================== ============================================================================================================== + .. _administration_fdbmonitor: ``fdbmonitor`` and ``fdbserver`` diff --git a/documentation/sphinx/source/downloads.rst b/documentation/sphinx/source/downloads.rst index 8fe7e31338..4aa2575bff 100644 --- a/documentation/sphinx/source/downloads.rst +++ b/documentation/sphinx/source/downloads.rst @@ -10,38 +10,38 @@ macOS The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server. -* `FoundationDB-6.2.11.pkg `_ +* `FoundationDB-6.2.13.pkg `_ Ubuntu ------ The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x. -* `foundationdb-clients-6.2.11-1_amd64.deb `_ -* `foundationdb-server-6.2.11-1_amd64.deb `_ (depends on the clients package) +* `foundationdb-clients-6.2.13-1_amd64.deb `_ +* `foundationdb-server-6.2.13-1_amd64.deb `_ (depends on the clients package) RHEL/CentOS EL6 --------------- The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x. -* `foundationdb-clients-6.2.11-1.el6.x86_64.rpm `_ -* `foundationdb-server-6.2.11-1.el6.x86_64.rpm `_ (depends on the clients package) +* `foundationdb-clients-6.2.13-1.el6.x86_64.rpm `_ +* `foundationdb-server-6.2.13-1.el6.x86_64.rpm `_ (depends on the clients package) RHEL/CentOS EL7 --------------- The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x. -* `foundationdb-clients-6.2.11-1.el7.x86_64.rpm `_ -* `foundationdb-server-6.2.11-1.el7.x86_64.rpm `_ (depends on the clients package) +* `foundationdb-clients-6.2.13-1.el7.x86_64.rpm `_ +* `foundationdb-server-6.2.13-1.el7.x86_64.rpm `_ (depends on the clients package) Windows ------- The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server. -* `foundationdb-6.2.11-x64.msi `_ +* `foundationdb-6.2.13-x64.msi `_ API Language Bindings ===================== @@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part If you need to use the FoundationDB Python API from other Python installations or paths, download the Python package: -* `foundationdb-6.2.11.tar.gz `_ +* `foundationdb-6.2.13.tar.gz `_ Ruby 1.9.3/2.0.0+ ----------------- -* `fdb-6.2.11.gem `_ +* `fdb-6.2.13.gem `_ Java 8+ ------- -* `fdb-java-6.2.11.jar `_ -* `fdb-java-6.2.11-javadoc.jar `_ +* `fdb-java-6.2.13.jar `_ +* `fdb-java-6.2.13-javadoc.jar `_ Go 1.11+ -------- diff --git a/documentation/sphinx/source/tls.rst b/documentation/sphinx/source/tls.rst index 02254989cd..d527f8887c 100644 --- a/documentation/sphinx/source/tls.rst +++ b/documentation/sphinx/source/tls.rst @@ -138,12 +138,14 @@ Default Peer Verification The default peer verification is ``Check.Valid=1``. Default Password -^^^^^^^^^^^^^^^^^^^^^^^^^ +^^^^^^^^^^^^^^^^ There is no default password. If no password is specified, it is assumed that the private key is unencrypted. -Parameters and client bindings ------------------------------- +Permissions +----------- + +All files used by TLS must have sufficient read permissions such that the user running the FoundationDB server or client process can access them. It may also be necessary to have similar read permissions on the parent directories of the files used in the TLS configuration. Automatic TLS certificate refresh --------------------------------- diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp index 3807ab7cf9..8b942fc351 100644 --- a/fdbcli/fdbcli.actor.cpp +++ b/fdbcli/fdbcli.actor.cpp @@ -2587,6 +2587,27 @@ Future stopNetworkAfter( Future what ) { } } +ACTOR Future addInterface( std::map>* address_interface, Reference connectLock, KeyValue kv) { + wait(connectLock->take()); + state FlowLock::Releaser releaser(*connectLock); + state ClientWorkerInterface workerInterf = BinaryReader::fromStringRef(kv.value, IncludeVersion()); + state ClientLeaderRegInterface leaderInterf(workerInterf.address()); + choose { + when( Optional rep = wait( brokenPromiseToNever(leaderInterf.getLeader.getReply(GetLeaderRequest())) ) ) { + StringRef ip_port = kv.key.endsWith(LiteralStringRef(":tls")) ? kv.key.removeSuffix(LiteralStringRef(":tls")) : kv.key; + (*address_interface)[ip_port] = std::make_pair(kv.value, leaderInterf); + + if(workerInterf.reboot.getEndpoint().addresses.secondaryAddress.present()) { + Key full_ip_port2 = StringRef(workerInterf.reboot.getEndpoint().addresses.secondaryAddress.get().toString()); + StringRef ip_port2 = full_ip_port2.endsWith(LiteralStringRef(":tls")) ? full_ip_port2.removeSuffix(LiteralStringRef(":tls")) : full_ip_port2; + (*address_interface)[ip_port2] = std::make_pair(kv.value, leaderInterf); + } + } + when( wait(delay(1.0)) ) {} + } + return Void(); +} + ACTOR Future cli(CLIOptions opt, LineNoise* plinenoise) { state LineNoise& linenoise = *plinenoise; state bool intrans = false; @@ -2597,7 +2618,7 @@ ACTOR Future cli(CLIOptions opt, LineNoise* plinenoise) { state bool writeMode = false; state std::string clusterConnectString; - state std::map address_interface; + state std::map> address_interface; state FdbOptions globalOptions; state FdbOptions activeOptions; @@ -2990,10 +3011,12 @@ ACTOR Future cli(CLIOptions opt, LineNoise* plinenoise) { getTransaction(db, tr, options, intrans); if (tokens.size() == 1) { Standalone kvs = wait( makeInterruptable( tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces"), LiteralStringRef("\xff\xff\xff")), 1) ) ); + Reference connectLock(new FlowLock(CLIENT_KNOBS->CLI_CONNECT_PARALLELISM)); + std::vector> addInterfs; for( auto it : kvs ) { - auto ip_port = it.key.endsWith(LiteralStringRef(":tls")) ? it.key.removeSuffix(LiteralStringRef(":tls")) : it.key; - address_interface[ip_port] = it.value; + addInterfs.push_back(addInterface(&address_interface, connectLock, it)); } + wait( waitForAll(addInterfs) ); } if (tokens.size() == 1 || tokencmp(tokens[1], "list")) { if(address_interface.size() == 0) { @@ -3009,7 +3032,7 @@ ACTOR Future cli(CLIOptions opt, LineNoise* plinenoise) { printf("\n"); } else if (tokencmp(tokens[1], "all")) { for( auto it : address_interface ) { - tr->set(LiteralStringRef("\xff\xff/reboot_worker"), it.second); + tr->set(LiteralStringRef("\xff\xff/reboot_worker"), it.second.first); } if (address_interface.size() == 0) { printf("ERROR: no processes to kill. You must run the `kill’ command before running `kill all’.\n"); @@ -3027,7 +3050,7 @@ ACTOR Future cli(CLIOptions opt, LineNoise* plinenoise) { if(!is_error) { for(int i = 1; i < tokens.size(); i++) { - tr->set(LiteralStringRef("\xff\xff/reboot_worker"), address_interface[tokens[i]]); + tr->set(LiteralStringRef("\xff\xff/reboot_worker"), address_interface[tokens[i]].first); } printf("Attempted to kill %zu processes\n", tokens.size() - 1); } @@ -3302,9 +3325,12 @@ ACTOR Future cli(CLIOptions opt, LineNoise* plinenoise) { getTransaction(db, tr, options, intrans); if (tokens.size() == 1) { Standalone kvs = wait( makeInterruptable( tr->getRange(KeyRangeRef(LiteralStringRef("\xff\xff/worker_interfaces"), LiteralStringRef("\xff\xff\xff")), 1) ) ); + Reference connectLock(new FlowLock(CLIENT_KNOBS->CLI_CONNECT_PARALLELISM)); + std::vector> addInterfs; for( auto it : kvs ) { - address_interface[it.key] = it.value; + addInterfs.push_back(addInterface(&address_interface, connectLock, it)); } + wait( waitForAll(addInterfs) ); } if (tokens.size() == 1 || tokencmp(tokens[1], "list")) { if(address_interface.size() == 0) { @@ -3320,7 +3346,7 @@ ACTOR Future cli(CLIOptions opt, LineNoise* plinenoise) { printf("\n"); } else if (tokencmp(tokens[1], "all")) { for( auto it : address_interface ) { - tr->set(LiteralStringRef("\xff\xff/reboot_and_check_worker"), it.second); + tr->set(LiteralStringRef("\xff\xff/reboot_and_check_worker"), it.second.first); } if (address_interface.size() == 0) { printf("ERROR: no processes to check. You must run the `expensive_data_check’ command before running `expensive_data_check all’.\n"); @@ -3338,7 +3364,7 @@ ACTOR Future cli(CLIOptions opt, LineNoise* plinenoise) { if(!is_error) { for(int i = 1; i < tokens.size(); i++) { - tr->set(LiteralStringRef("\xff\xff/reboot_and_check_worker"), address_interface[tokens[i]]); + tr->set(LiteralStringRef("\xff\xff/reboot_and_check_worker"), address_interface[tokens[i]].first); } printf("Attempted to kill and check %zu processes\n", tokens.size() - 1); } diff --git a/fdbcli/fdbcli.vcxproj b/fdbcli/fdbcli.vcxproj index 2c22697ee6..8e614943e4 100644 --- a/fdbcli/fdbcli.vcxproj +++ b/fdbcli/fdbcli.vcxproj @@ -85,10 +85,10 @@ TLS_DISABLED;WIN32;_WIN32_WINNT=0x0502;WINVER=0x0502;BOOST_ALL_NO_LIB;NTDDI_VERSION=0x05020000;_DEBUG;_HAS_ITERATOR_DEBUGGING=0;_CONSOLE;_CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) ..\zookeeper\win32;..\zookeeper\generated;..\zookeeper\include;%(AdditionalIncludeDirectories) true + /bigobj @../flow/no_intellisense.opt %(AdditionalOptions) false MultiThreadedDebug - @../flow/no_intellisense.opt %(AdditionalOptions) - stdcpp17 + stdcpp17 Console @@ -110,12 +110,12 @@ TLS_DISABLED;WIN32;_WIN32_WINNT=0x0502;WINVER=0x0502;BOOST_ALL_NO_LIB;NTDDI_VERSION=0x05020000;NDEBUG;_CONSOLE;_CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) ..\zookeeper\win32;..\zookeeper\generated;..\zookeeper\include;%(AdditionalIncludeDirectories) true + /bigobj @../flow/no_intellisense.opt %(AdditionalOptions) Speed MultiThreaded false StreamingSIMDExtensions2 - @../flow/no_intellisense.opt %(AdditionalOptions) - stdcpp17 + stdcpp17 Console diff --git a/fdbclient/Knobs.cpp b/fdbclient/Knobs.cpp index 824b122564..3b5aa46298 100644 --- a/fdbclient/Knobs.cpp +++ b/fdbclient/Knobs.cpp @@ -45,7 +45,7 @@ ClientKnobs::ClientKnobs(bool randomize) { init( COORDINATOR_RECONNECTION_DELAY, 1.0 ); init( CLIENT_EXAMPLE_AMOUNT, 20 ); init( MAX_CLIENT_STATUS_AGE, 1.0 ); - init( MAX_CLIENT_PROXY_CONNECTIONS, 5 ); if( randomize && BUGGIFY ) MAX_CLIENT_PROXY_CONNECTIONS = 1; + init( MAX_PROXY_CONNECTIONS, 5 ); if( randomize && BUGGIFY ) MAX_PROXY_CONNECTIONS = 1; // wrong_shard_server sometimes comes from the only nonfailed server, so we need to avoid a fast spin @@ -76,6 +76,7 @@ ClientKnobs::ClientKnobs(bool randomize) { init( GET_RANGE_SHARD_LIMIT, 2 ); init( WARM_RANGE_SHARD_LIMIT, 100 ); init( STORAGE_METRICS_SHARD_LIMIT, 100 ); if( randomize && BUGGIFY ) STORAGE_METRICS_SHARD_LIMIT = 3; + init( SHARD_COUNT_LIMIT, 80 ); if( randomize && BUGGIFY ) SHARD_COUNT_LIMIT = 3; init( STORAGE_METRICS_UNFAIR_SPLIT_LIMIT, 2.0/3.0 ); init( STORAGE_METRICS_TOO_MANY_SHARDS_DELAY, 15.0 ); init( AGGREGATE_HEALTH_METRICS_MAX_STALENESS, 0.5 ); @@ -197,6 +198,9 @@ ClientKnobs::ClientKnobs(bool randomize) { } init(CSI_STATUS_DELAY, 10.0 ); - init( CONSISTENCY_CHECK_RATE_LIMIT_MAX, 50e6 ); // Limit in per sec + init( CONSISTENCY_CHECK_RATE_LIMIT_MAX, 50e6 ); // Limit in per sec init( CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME, 7 * 24 * 60 * 60 ); // 7 days + + //fdbcli + init( CLI_CONNECT_PARALLELISM, 20 ); } diff --git a/fdbclient/Knobs.h b/fdbclient/Knobs.h index 6e1a50ed4a..25b26b0d12 100644 --- a/fdbclient/Knobs.h +++ b/fdbclient/Knobs.h @@ -44,7 +44,7 @@ public: double COORDINATOR_RECONNECTION_DELAY; int CLIENT_EXAMPLE_AMOUNT; double MAX_CLIENT_STATUS_AGE; - int MAX_CLIENT_PROXY_CONNECTIONS; + int MAX_PROXY_CONNECTIONS; // wrong_shard_server sometimes comes from the only nonfailed server, so we need to avoid a fast spin double WRONG_SHARD_SERVER_DELAY; // SOMEDAY: This delay can limit performance of retrieving data when the cache is mostly wrong (e.g. dumping the database after a test) @@ -75,6 +75,7 @@ public: int GET_RANGE_SHARD_LIMIT; int WARM_RANGE_SHARD_LIMIT; int STORAGE_METRICS_SHARD_LIMIT; + int SHARD_COUNT_LIMIT; double STORAGE_METRICS_UNFAIR_SPLIT_LIMIT; double STORAGE_METRICS_TOO_MANY_SHARDS_DELAY; double AGGREGATE_HEALTH_METRICS_MAX_STALENESS; @@ -189,6 +190,9 @@ public: int CONSISTENCY_CHECK_RATE_LIMIT_MAX; int CONSISTENCY_CHECK_ONE_ROUND_TARGET_COMPLETION_TIME; + + //fdbcli + int CLI_CONNECT_PARALLELISM; ClientKnobs(bool randomize = false); }; diff --git a/fdbclient/MonitorLeader.actor.cpp b/fdbclient/MonitorLeader.actor.cpp index df3a6737ab..6a5c4195de 100644 --- a/fdbclient/MonitorLeader.actor.cpp +++ b/fdbclient/MonitorLeader.actor.cpp @@ -670,6 +670,25 @@ ACTOR Future monitorLeaderForProxies( Key clusterKey, vector& lastProxyUIDs, std::vector& lastProxies ) { + if(ni.proxies.size() > CLIENT_KNOBS->MAX_PROXY_CONNECTIONS) { + std::vector proxyUIDs; + for(auto& proxy : ni.proxies) { + proxyUIDs.push_back(proxy.id()); + } + if(proxyUIDs != lastProxyUIDs) { + lastProxyUIDs = proxyUIDs; + lastProxies = ni.proxies; + deterministicRandom()->randomShuffle(lastProxies); + lastProxies.resize(CLIENT_KNOBS->MAX_PROXY_CONNECTIONS); + for(int i = 0; i < lastProxies.size(); i++) { + TraceEvent("ConnectedProxy").detail("Proxy", lastProxies[i].id()); + } + } + ni.proxies = lastProxies; + } +} + // Leader is the process that will be elected by coordinators as the cluster controller ACTOR Future monitorProxiesOneGeneration( Reference connFile, Reference> clientInfo, MonitorLeaderInfo info, Standalone> supportedVersions, Key traceLogGroup) { state ClusterConnectionString cs = info.intermediateConnFile->getConnectionString(); @@ -730,24 +749,8 @@ ACTOR Future monitorProxiesOneGeneration( ReferencenotifyConnected(); auto& ni = rep.get().mutate(); - if(ni.proxies.size() > CLIENT_KNOBS->MAX_CLIENT_PROXY_CONNECTIONS) { - std::vector proxyUIDs; - for(auto& proxy : ni.proxies) { - proxyUIDs.push_back(proxy.id()); - } - if(proxyUIDs != lastProxyUIDs) { - lastProxyUIDs = proxyUIDs; - lastProxies = ni.proxies; - deterministicRandom()->randomShuffle(lastProxies); - lastProxies.resize(CLIENT_KNOBS->MAX_CLIENT_PROXY_CONNECTIONS); - for(int i = 0; i < lastProxies.size(); i++) { - TraceEvent("ClientConnectedProxy").detail("Proxy", lastProxies[i].id()); - } - } - ni.proxies = lastProxies; - } - - clientInfo->set( rep.get().read() ); + shrinkProxyList(ni, lastProxyUIDs, lastProxies); + clientInfo->set( ni ); successIdx = idx; } else if(idx == successIdx) { wait(delay(CLIENT_KNOBS->COORDINATOR_RECONNECTION_DELAY)); diff --git a/fdbclient/MonitorLeader.h b/fdbclient/MonitorLeader.h index 89a128ec4f..0eae5151f7 100644 --- a/fdbclient/MonitorLeader.h +++ b/fdbclient/MonitorLeader.h @@ -59,6 +59,8 @@ Future monitorLeaderForProxies( Value const& key, vector c Future monitorProxies( Reference>> const& connFile, Reference> const& clientInfo, Standalone> const& supportedVersions, Key const& traceLogGroup ); +void shrinkProxyList( ClientDBInfo& ni, std::vector& lastProxyUIDs, std::vector& lastProxies ); + #pragma region Implementation Future monitorLeaderInternal( Reference const& connFile, Reference> const& outSerializedLeaderInfo ); diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 760e2d2831..79c9fdfef7 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1453,6 +1453,17 @@ ACTOR Future waitForCommittedVersion( Database cx, Version version ) { } } +ACTOR Future getRawVersion( Database cx ) { + loop { + choose { + when ( wait( cx->onMasterProxiesChanged() ) ) {} + when ( GetReadVersionReply v = wait( loadBalance( cx->getMasterProxies(false), &MasterProxyInterface::getConsistentReadVersion, GetReadVersionRequest( 0, GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE ), cx->taskID ) ) ) { + return v.version; + } + } + } +} + ACTOR Future readVersionBatcher( DatabaseContext* cx, FutureStream, Optional>> versionStream, uint32_t flags); @@ -2133,6 +2144,10 @@ ACTOR Future watch( Reference watch, Database cx, Transaction *self return Void(); } +Future Transaction::getRawReadVersion() { + return ::getRawVersion(cx); +} + Future< Void > Transaction::watch( Reference watch ) { return ::watch(watch, cx, this); } @@ -3215,16 +3230,25 @@ ACTOR Future< StorageMetrics > waitStorageMetricsMultipleLocations( } } -ACTOR Future< StorageMetrics > waitStorageMetrics( +ACTOR Future< StorageMetrics > extractMetrics( Future, int>> fMetrics ) { + std::pair, int> x = wait(fMetrics); + return x.first.get(); +} + +ACTOR Future< std::pair, int> > waitStorageMetrics( Database cx, KeyRange keys, StorageMetrics min, StorageMetrics max, StorageMetrics permittedError, - int shardLimit ) + int shardLimit, + int expectedShardCount ) { loop { vector< pair> > locations = wait( getKeyRangeLocations( cx, keys, shardLimit, false, &StorageServerInterface::waitMetrics, TransactionInfo(TaskPriority::DataDistribution) ) ); + if(expectedShardCount >= 0 && locations.size() != expectedShardCount) { + return std::make_pair(Optional(), locations.size()); + } //SOMEDAY: Right now, if there are too many shards we delay and check again later. There may be a better solution to this. if(locations.size() < shardLimit) { @@ -3237,7 +3261,7 @@ ACTOR Future< StorageMetrics > waitStorageMetrics( fx = loadBalance( locations[0].second, &StorageServerInterface::waitMetrics, req, TaskPriority::DataDistribution ); } StorageMetrics x = wait(fx); - return x; + return std::make_pair(x,-1); } catch (Error& e) { if (e.code() != error_code_wrong_shard_server && e.code() != error_code_all_alternatives_failed) { TraceEvent(SevError, "WaitStorageMetricsError").error(e); @@ -3258,20 +3282,21 @@ ACTOR Future< StorageMetrics > waitStorageMetrics( } } -Future< StorageMetrics > Transaction::waitStorageMetrics( +Future< std::pair, int> > Transaction::waitStorageMetrics( KeyRange const& keys, StorageMetrics const& min, StorageMetrics const& max, StorageMetrics const& permittedError, - int shardLimit ) + int shardLimit, + int expectedShardCount ) { - return ::waitStorageMetrics( cx, keys, min, max, permittedError, shardLimit ); + return ::waitStorageMetrics( cx, keys, min, max, permittedError, shardLimit, expectedShardCount ); } Future< StorageMetrics > Transaction::getStorageMetrics( KeyRange const& keys, int shardLimit ) { StorageMetrics m; m.bytes = -1; - return ::waitStorageMetrics( cx, keys, StorageMetrics(), m, StorageMetrics(), shardLimit ); + return extractMetrics( ::waitStorageMetrics( cx, keys, StorageMetrics(), m, StorageMetrics(), shardLimit, -1 ) ); } ACTOR Future< Standalone> > splitStorageMetrics( Database cx, KeyRange keys, StorageMetrics limit, StorageMetrics estimated ) diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index eadca69fe0..96d5daadd9 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -211,6 +211,7 @@ public: void setVersion( Version v ); Future getReadVersion() { return getReadVersion(0); } + Future getRawReadVersion(); [[nodiscard]] Future> get(const Key& key, bool snapshot = false); [[nodiscard]] Future watch(Reference watch); @@ -241,7 +242,7 @@ public: Future< Void > warmRange( Database cx, KeyRange keys ); - Future< StorageMetrics > waitStorageMetrics( KeyRange const& keys, StorageMetrics const& min, StorageMetrics const& max, StorageMetrics const& permittedError, int shardLimit ); + Future< std::pair, int> > waitStorageMetrics( KeyRange const& keys, StorageMetrics const& min, StorageMetrics const& max, StorageMetrics const& permittedError, int shardLimit, int expectedShardCount ); Future< StorageMetrics > getStorageMetrics( KeyRange const& keys, int shardLimit ); Future< Standalone> > splitStorageMetrics( KeyRange const& keys, StorageMetrics const& limit, StorageMetrics const& estimated ); diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index 423b099018..5ccdbee1f8 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -282,15 +282,13 @@ struct GetShardStateRequest { struct StorageMetrics { constexpr static FileIdentifier file_identifier = 13622226; - int64_t bytes; // total storage - int64_t bytesPerKSecond; // network bandwidth (average over 10s) - int64_t iosPerKSecond; - int64_t bytesReadPerKSecond; + int64_t bytes = 0; // total storage + int64_t bytesPerKSecond = 0; // network bandwidth (average over 10s) + int64_t iosPerKSecond = 0; + int64_t bytesReadPerKSecond = 0; static const int64_t infinity = 1LL<<60; - StorageMetrics() : bytes(0), bytesPerKSecond(0), iosPerKSecond(0), bytesReadPerKSecond(0) {} - bool allLessOrEqual( const StorageMetrics& rhs ) const { return bytes <= rhs.bytes && bytesPerKSecond <= rhs.bytesPerKSecond && iosPerKSecond <= rhs.iosPerKSecond && bytesReadPerKSecond <= rhs.bytesReadPerKSecond; diff --git a/fdbrpc/AsyncFileKAIO.actor.h b/fdbrpc/AsyncFileKAIO.actor.h index 14495a6cdf..1724bf5584 100644 --- a/fdbrpc/AsyncFileKAIO.actor.h +++ b/fdbrpc/AsyncFileKAIO.actor.h @@ -416,7 +416,7 @@ public: ++ctx.countAIOSubmit; double elapsed = timer_monotonic() - begin; - g_network->networkMetrics.secSquaredSubmit += elapsed*elapsed/2; + g_network->networkInfo.metrics.secSquaredSubmit += elapsed*elapsed/2; //TraceEvent("Launched").detail("N", rc).detail("Queued", ctx.queue.size()).detail("Elapsed", elapsed).detail("Outstanding", ctx.outstanding+rc); //printf("launched: %d/%d in %f us (%d outstanding; lowest prio %d)\n", rc, ctx.queue.size(), elapsed*1e6, ctx.outstanding + rc, toStart[n-1]->getTask()); @@ -672,7 +672,7 @@ private: double t = timer_monotonic(); double elapsed = t - ctx.ioStallBegin; ctx.ioStallBegin = t; - g_network->networkMetrics.secSquaredDiskStall += elapsed*elapsed/2; + g_network->networkInfo.metrics.secSquaredDiskStall += elapsed*elapsed/2; } ctx.outstanding -= n; diff --git a/fdbrpc/LoadBalance.actor.h b/fdbrpc/LoadBalance.actor.h index c9a9d97afd..bd485c40ce 100644 --- a/fdbrpc/LoadBalance.actor.h +++ b/fdbrpc/LoadBalance.actor.h @@ -321,28 +321,28 @@ Future< REPLY_TYPE(Request) > loadBalance( } if(!alternatives->alwaysFresh()) { - if(now() - g_network->networkMetrics.newestAlternativesFailure > FLOW_KNOBS->ALTERNATIVES_FAILURE_RESET_TIME) { - g_network->networkMetrics.oldestAlternativesFailure = now(); + if(now() - g_network->networkInfo.newestAlternativesFailure > FLOW_KNOBS->ALTERNATIVES_FAILURE_RESET_TIME) { + g_network->networkInfo.oldestAlternativesFailure = now(); } double delay = FLOW_KNOBS->ALTERNATIVES_FAILURE_MIN_DELAY; - if(now() - g_network->networkMetrics.lastAlternativesFailureSkipDelay > FLOW_KNOBS->ALTERNATIVES_FAILURE_SKIP_DELAY) { - g_network->networkMetrics.lastAlternativesFailureSkipDelay = now(); + if(now() - g_network->networkInfo.lastAlternativesFailureSkipDelay > FLOW_KNOBS->ALTERNATIVES_FAILURE_SKIP_DELAY) { + g_network->networkInfo.lastAlternativesFailureSkipDelay = now(); } else { - double elapsed = now()-g_network->networkMetrics.oldestAlternativesFailure; + double elapsed = now()-g_network->networkInfo.oldestAlternativesFailure; delay = std::max(delay, std::min(elapsed*FLOW_KNOBS->ALTERNATIVES_FAILURE_DELAY_RATIO, FLOW_KNOBS->ALTERNATIVES_FAILURE_MAX_DELAY)); delay = std::max(delay, std::min(elapsed*FLOW_KNOBS->ALTERNATIVES_FAILURE_SLOW_DELAY_RATIO, FLOW_KNOBS->ALTERNATIVES_FAILURE_SLOW_MAX_DELAY)); } // Making this SevWarn means a lot of clutter - if(now() - g_network->networkMetrics.newestAlternativesFailure > 1 || deterministicRandom()->random01() < 0.01) { + if(now() - g_network->networkInfo.newestAlternativesFailure > 1 || deterministicRandom()->random01() < 0.01) { TraceEvent("AllAlternativesFailed") .detail("Interval", FLOW_KNOBS->CACHE_REFRESH_INTERVAL_WHEN_ALL_ALTERNATIVES_FAILED) .detail("Alternatives", alternatives->description()) .detail("Delay", delay); } - g_network->networkMetrics.newestAlternativesFailure = now(); + g_network->networkInfo.newestAlternativesFailure = now(); choose { when ( wait( quorum( ok, 1 ) ) ) {} diff --git a/fdbrpc/TLSConnection.actor.cpp b/fdbrpc/TLSConnection.actor.cpp index 83f0a6cd2e..d9fb72fea9 100644 --- a/fdbrpc/TLSConnection.actor.cpp +++ b/fdbrpc/TLSConnection.actor.cpp @@ -71,11 +71,29 @@ static int recv_func(void* ctx, uint8_t* buf, int len) { } ACTOR static Future handshake( TLSConnection* self ) { + state std::pair peerIP = std::make_pair(self->conn->getPeerAddress().ip, self->is_client ? self->conn->getPeerAddress().port : static_cast(0)); + if(!self->is_client) { + auto iter(g_network->networkInfo.serverTLSConnectionThrottler.find(peerIP)); + if(iter != g_network->networkInfo.serverTLSConnectionThrottler.end()) { + if (now() < iter->second) { + TraceEvent("TLSIncomingConnectionThrottlingWarning", self->getDebugID()).suppressFor(1.0).detail("PeerIP", peerIP.first.toString()); + wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT)); + throw connection_failed(); + } else { + g_network->networkInfo.serverTLSConnectionThrottler.erase(peerIP); + } + } + } + loop { int r = self->session->handshake(); + if(BUGGIFY_WITH_PROB(0.001)) { + r = ITLSSession::FAILED; + } if ( r == ITLSSession::SUCCESS ) break; if ( r == ITLSSession::FAILED ) { TraceEvent("TLSConnectionHandshakeError", self->getDebugID()).suppressFor(1.0).detail("Peer", self->getPeerAddress()); + g_network->networkInfo.serverTLSConnectionThrottler[peerIP] = now() + (self->is_client ? FLOW_KNOBS->TLS_CLIENT_CONNECTION_THROTTLE_TIMEOUT : FLOW_KNOBS->TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT); throw connection_failed(); } ASSERT( r == ITLSSession::WANT_WRITE || r == ITLSSession::WANT_READ ); @@ -87,7 +105,7 @@ ACTOR static Future handshake( TLSConnection* self ) { return Void(); } -TLSConnection::TLSConnection( Reference const& conn, Reference const& policy, bool is_client, std::string host) : conn(conn), write_wants(0), read_wants(0), uid(conn->getDebugID()) { +TLSConnection::TLSConnection( Reference const& conn, Reference const& policy, bool is_client, std::string host) : conn(conn), write_wants(0), read_wants(0), uid(conn->getDebugID()), is_client(is_client) { const char * serverName = host.empty() ? NULL : host.c_str(); session = Reference( policy->create_session(is_client, serverName, send_func, this, recv_func, this, (void*)&uid) ); if ( !session ) { @@ -169,9 +187,25 @@ TLSNetworkConnections::TLSNetworkConnections( Reference options ) : g_network->setGlobal(INetwork::enumGlobal::enNetworkConnections, (flowGlobalType) this); } +ACTOR Future> waitAndFailConnection() { + wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT)); + throw connection_failed(); +} + Future> TLSNetworkConnections::connect( NetworkAddress toAddr, std::string host) { if ( toAddr.isTLS() ) { NetworkAddress clearAddr( toAddr.ip, toAddr.port, toAddr.isPublic(), false ); + std::pair peerIP = std::make_pair(toAddr.ip, toAddr.port); + auto iter(g_network->networkInfo.serverTLSConnectionThrottler.find(peerIP)); + if(iter != g_network->networkInfo.serverTLSConnectionThrottler.end()) { + if (now() < iter->second) { + TraceEvent("TLSOutgoingConnectionThrottlingWarning").suppressFor(1.0).detail("PeerIP", toAddr); + return waitAndFailConnection(); + } else { + g_network->networkInfo.serverTLSConnectionThrottler.erase(peerIP); + } + } + TraceEvent("TLSConnectionConnecting").suppressFor(1.0).detail("ToAddr", toAddr); // For FDB<->FDB connections, we don't have hostnames and can't verify IP // addresses against certificates, so we have our own peer verifying logic diff --git a/fdbrpc/TLSConnection.h b/fdbrpc/TLSConnection.h index f8395b66a0..7e8bb38fd8 100644 --- a/fdbrpc/TLSConnection.h +++ b/fdbrpc/TLSConnection.h @@ -36,6 +36,7 @@ struct TLSConnection : IConnection, ReferenceCounted { int write_wants, read_wants; UID uid; + bool is_client; virtual void addref() { ReferenceCounted::addref(); } virtual void delref() { ReferenceCounted::delref(); } diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index 00e4615f11..6051689e2c 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -1170,7 +1170,7 @@ public: std::transform(newTLogs.begin(), newTLogs.end(), std::back_inserter(exclusionWorkerIds), fn); std::transform(newSatelliteTLogs.begin(), newSatelliteTLogs.end(), std::back_inserter(exclusionWorkerIds), fn); RoleFitness newRemoteTLogFit( - (db.config.usableRegions > 1 && dbi.recoveryState == RecoveryState::FULLY_RECOVERED) ? + (db.config.usableRegions > 1 && (dbi.recoveryState == RecoveryState::ALL_LOGS_RECRUITED || dbi.recoveryState == RecoveryState::FULLY_RECOVERED)) ? getWorkersForTlogs(db.config, db.config.getRemoteTLogReplicationFactor(), db.config.getDesiredRemoteLogs(), db.config.getRemoteTLogPolicy(), id_used, true, remoteDC, exclusionWorkerIds) : remote_tlogs, ProcessClass::TLog); if(oldRemoteTLogFit < newRemoteTLogFit) return false; diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 618d9b6a4a..dbeaa0b3e0 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -774,59 +774,28 @@ struct DDTeamCollection : ReferenceCounted { int64_t bestLoadBytes = 0; Optional> bestOption; - std::vector>> randomTeams; - std::set< UID > sources; + std::vector> randomTeams; + const std::set completeSources(req.completeSources.begin(), req.completeSources.end()); if( !req.wantsNewServers ) { - std::vector> similarTeams; - bool foundExact = false; - - for( int i = 0; i < req.sources.size(); i++ ) - sources.insert( req.sources[i] ); - - for( int i = 0; i < req.sources.size(); i++ ) { - if( self->server_info.count( req.sources[i] ) ) { - auto& teamList = self->server_info[ req.sources[i] ]->teams; - for( int j = 0; j < teamList.size(); j++ ) { - if( teamList[j]->isHealthy() && (!req.preferLowerUtilization || teamList[j]->hasHealthyFreeSpace())) { - int sharedMembers = 0; - for( const UID& id : teamList[j]->getServerIDs() ) - if( sources.count( id ) ) - sharedMembers++; - - if( !foundExact && sharedMembers == teamList[j]->size() ) { - foundExact = true; - bestOption = Optional>(); - similarTeams.clear(); - } - - if( (sharedMembers == teamList[j]->size()) || (!foundExact && req.wantsTrueBest) ) { - int64_t loadBytes = SOME_SHARED * teamList[j]->getLoadBytes(true, req.inflightPenalty); - if( !bestOption.present() || ( req.preferLowerUtilization && loadBytes < bestLoadBytes ) || ( !req.preferLowerUtilization && loadBytes > bestLoadBytes ) ) { - bestLoadBytes = loadBytes; - bestOption = teamList[j]; - } - } - else if( !req.wantsTrueBest && !foundExact ) - similarTeams.push_back( teamList[j] ); + for( int i = 0; i < req.completeSources.size(); i++ ) { + if( !self->server_info.count( req.completeSources[i] ) ) { + continue; + } + auto& teamList = self->server_info[ req.completeSources[i] ]->teams; + for( int j = 0; j < teamList.size(); j++ ) { + bool found = true; + auto serverIDs = teamList[j]->getServerIDs(); + for( int k = 0; k < teamList[j]->size(); k++ ) { + if( !completeSources.count( serverIDs[k] ) ) { + found = false; + break; } } - } - } - - if( foundExact || (req.wantsTrueBest && bestOption.present() ) ) { - ASSERT( bestOption.present() ); - // Check the team size: be sure team size is correct - ASSERT(bestOption.get()->size() == self->configuration.storageTeamSize); - req.reply.send( bestOption ); - return Void(); - } - - if( !req.wantsTrueBest ) { - while( similarTeams.size() && randomTeams.size() < SERVER_KNOBS->BEST_TEAM_OPTION_COUNT ) { - int randomTeam = deterministicRandom()->randomInt( 0, similarTeams.size() ); - randomTeams.push_back( std::make_pair( SOME_SHARED, similarTeams[randomTeam] ) ); - swapAndPop( &similarTeams, randomTeam ); + if(found && teamList[j]->isHealthy()) { + req.reply.send( teamList[j] ); + return Void(); + } } } } @@ -835,7 +804,7 @@ struct DDTeamCollection : ReferenceCounted { ASSERT( !bestOption.present() ); for( int i = 0; i < self->teams.size(); i++ ) { if( self->teams[i]->isHealthy() && (!req.preferLowerUtilization || self->teams[i]->hasHealthyFreeSpace()) ) { - int64_t loadBytes = NONE_SHARED * self->teams[i]->getLoadBytes(true, req.inflightPenalty); + int64_t loadBytes = self->teams[i]->getLoadBytes(true, req.inflightPenalty); if( !bestOption.present() || ( req.preferLowerUtilization && loadBytes < bestLoadBytes ) || ( !req.preferLowerUtilization && loadBytes > bestLoadBytes ) ) { bestLoadBytes = loadBytes; bestOption = self->teams[i]; @@ -850,12 +819,15 @@ struct DDTeamCollection : ReferenceCounted { Reference dest = deterministicRandom()->randomChoice(self->teams); bool ok = dest->isHealthy() && (!req.preferLowerUtilization || dest->hasHealthyFreeSpace()); - for(int i=0; ok && igetServerIDs() == dest->getServerIDs()) + for(int i=0; ok && igetServerIDs() == dest->getServerIDs()) { ok = false; + break; + } + } if (ok) - randomTeams.push_back( std::make_pair( NONE_SHARED, dest ) ); + randomTeams.push_back( dest ); else nTries++; } @@ -866,10 +838,10 @@ struct DDTeamCollection : ReferenceCounted { } for( int i = 0; i < randomTeams.size(); i++ ) { - int64_t loadBytes = randomTeams[i].first * randomTeams[i].second->getLoadBytes(true, req.inflightPenalty); + int64_t loadBytes = randomTeams[i]->getLoadBytes(true, req.inflightPenalty); if( !bestOption.present() || ( req.preferLowerUtilization && loadBytes < bestLoadBytes ) || ( !req.preferLowerUtilization && loadBytes > bestLoadBytes ) ) { bestLoadBytes = loadBytes; - bestOption = randomTeams[i].second; + bestOption = randomTeams[i]; } } } @@ -878,30 +850,24 @@ struct DDTeamCollection : ReferenceCounted { // We will get stuck at this! This only happens when a DC fails. No need to consider it right now. if(!bestOption.present() && self->zeroHealthyTeams->get()) { //Attempt to find the unhealthy source server team and return it - std::set completeSources; for( int i = 0; i < req.completeSources.size(); i++ ) { - completeSources.insert( req.completeSources[i] ); - } - - int bestSize = 0; - for( int i = 0; i < req.completeSources.size(); i++ ) { - if( self->server_info.count( req.completeSources[i] ) ) { - auto& teamList = self->server_info[ req.completeSources[i] ]->teams; - for( int j = 0; j < teamList.size(); j++ ) { - bool found = true; - auto serverIDs = teamList[j]->getServerIDs(); - for( int k = 0; k < teamList[j]->size(); k++ ) { - if( !completeSources.count( serverIDs[k] ) ) { - found = false; - break; - } - } - if(found && teamList[j]->size() > bestSize) { - bestOption = teamList[j]; - bestSize = teamList[j]->size(); + if( !self->server_info.count( req.completeSources[i] ) ) { + continue; + } + auto& teamList = self->server_info[ req.completeSources[i] ]->teams; + for( int j = 0; j < teamList.size(); j++ ) { + bool found = true; + auto serverIDs = teamList[j]->getServerIDs(); + for( int k = 0; k < teamList[j]->size(); k++ ) { + if( !completeSources.count( serverIDs[k] ) ) { + found = false; + break; } } - break; + if(found) { + req.reply.send( teamList[j] ); + return Void(); + } } } } diff --git a/fdbserver/DataDistribution.actor.h b/fdbserver/DataDistribution.actor.h index 13adf4242c..e2130e5054 100644 --- a/fdbserver/DataDistribution.actor.h +++ b/fdbserver/DataDistribution.actor.h @@ -38,11 +38,6 @@ struct RelocateShard { RelocateShard( KeyRange const& keys, int priority ) : keys(keys), priority(priority) {} }; -enum { - SOME_SHARED = 2, - NONE_SHARED = 3 -}; - struct IDataDistributionTeam { virtual vector getLastKnownServerInterfaces() = 0; virtual int size() = 0; @@ -81,7 +76,6 @@ struct GetTeamRequest { bool wantsTrueBest; bool preferLowerUtilization; double inflightPenalty; - std::vector sources; std::vector completeSources; Promise< Optional< Reference > > reply; @@ -93,10 +87,6 @@ struct GetTeamRequest { ss << "WantsNewServers:" << wantsNewServers << " WantsTrueBest:" << wantsTrueBest << " PreferLowerUtilization:" << preferLowerUtilization << " inflightPenalty:" << inflightPenalty << ";"; - ss << "Sources:"; - for (auto& s : sources) { - ss << s.toString() << ","; - } ss << "CompleteSources:"; for (auto& cs : completeSources) { ss << cs.toString() << ","; diff --git a/fdbserver/DataDistributionQueue.actor.cpp b/fdbserver/DataDistributionQueue.actor.cpp index 8ea54aa255..88ffd0f008 100644 --- a/fdbserver/DataDistributionQueue.actor.cpp +++ b/fdbserver/DataDistributionQueue.actor.cpp @@ -54,14 +54,7 @@ struct RelocateData { rs.priority == SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM || rs.priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM || rs.priority == SERVER_KNOBS->PRIORITY_SPLIT_SHARD || - rs.priority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT || - mergeWantsNewServers(rs.keys, rs.priority)), interval("QueuedRelocation") {} - - static bool mergeWantsNewServers(KeyRangeRef keys, int priority) { - return priority == SERVER_KNOBS->PRIORITY_MERGE_SHARD && - (SERVER_KNOBS->MERGE_ONTO_NEW_TEAM == 2 || - (SERVER_KNOBS->MERGE_ONTO_NEW_TEAM == 1 && keys.begin.startsWith(LiteralStringRef("\xff")))); - } + rs.priority == SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT), interval("QueuedRelocation") {} static bool isHealthPriority(int priority) { return priority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY || @@ -946,7 +939,6 @@ ACTOR Future dataDistributionRelocator( DDQueueData *self, RelocateData rd if(rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_ONE_LEFT; auto req = GetTeamRequest(rd.wantsNewServers, rd.priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, true, inflightPenalty); - req.sources = rd.src; req.completeSources = rd.completeSources; Optional> bestTeam = wait(brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req))); // If a DC has no healthy team, we stop checking the other DCs until @@ -1450,7 +1442,7 @@ ACTOR Future dataDistributionQueue( .detail( "BytesWritten", self.bytesWritten ) .detail( "PriorityRecoverMove", self.priority_relocations[SERVER_KNOBS->PRIORITY_RECOVER_MOVE] ) .detail( "PriorityRebalanceUnderutilizedTeam", self.priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM] ) - .detail( "PriorityRebalannceOverutilizedTeam", self.priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM] ) + .detail( "PriorityRebalanceOverutilizedTeam", self.priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM] ) .detail( "PriorityTeamHealthy", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_HEALTHY] ) .detail( "PriorityTeamContainsUndesiredServer", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER] ) .detail( "PriorityTeamRedundant", self.priority_relocations[SERVER_KNOBS->PRIORITY_TEAM_REDUNDANT] ) diff --git a/fdbserver/DataDistributionTracker.actor.cpp b/fdbserver/DataDistributionTracker.actor.cpp index d4d3645cdc..ddae3324c0 100644 --- a/fdbserver/DataDistributionTracker.actor.cpp +++ b/fdbserver/DataDistributionTracker.actor.cpp @@ -35,6 +35,18 @@ enum BandwidthStatus { enum ReadBandwidthStatus { ReadBandwidthStatusNormal, ReadBandwidthStatusHigh }; +struct ShardMetrics { + StorageMetrics metrics; + double lastLowBandwidthStartTime; + int shardCount; + + bool operator == ( ShardMetrics const& rhs ) const { + return metrics == rhs.metrics && lastLowBandwidthStartTime == rhs.lastLowBandwidthStartTime && shardCount == rhs.shardCount; + } + + ShardMetrics(StorageMetrics const& metrics, double lastLowBandwidthStartTime, int shardCount) : metrics(metrics), lastLowBandwidthStartTime(lastLowBandwidthStartTime), shardCount(shardCount) {} +}; + BandwidthStatus getBandwidthStatus( StorageMetrics const& metrics ) { if( metrics.bytesPerKSecond > SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC ) return BandwidthStatusHigh; @@ -69,7 +81,7 @@ ACTOR Future updateMaxShardSize( Reference> dbSizeEstima struct ShardTrackedData { Future trackShard; Future trackBytes; - Reference>> stats; + Reference>> stats; }; struct DataDistributionTracker { @@ -106,7 +118,7 @@ struct DataDistributionTracker { void restartShardTrackers( DataDistributionTracker* self, KeyRangeRef keys, - Optional startingSize = Optional()); + Optional startingSize = Optional()); // Gets the permitted size and IO bounds for a shard. A shard that starts at allKeys.begin // (i.e. '') will have a permitted size of 0, since the database can contain no data. @@ -151,8 +163,13 @@ int64_t getMaxShardSize( double dbSizeEstimate ) { ACTOR Future trackShardBytes( DataDistributionTracker* self, KeyRange keys, - Reference>> shardMetrics) + Reference>> shardMetrics) { + state BandwidthStatus bandwidthStatus = shardMetrics->get().present() ? getBandwidthStatus( shardMetrics->get().get().metrics ) : BandwidthStatusNormal; + state double lastLowBandwidthStartTime = shardMetrics->get().present() ? shardMetrics->get().get().lastLowBandwidthStartTime : now(); + state int shardCount = shardMetrics->get().present() ? shardMetrics->get().get().shardCount : 1; + state ReadBandwidthStatus readBandwidthStatus = shardMetrics->get().present() ? getReadBandwidthStatus(shardMetrics->get().get().metrics) : ReadBandwidthStatusNormal; + wait( delay( 0, TaskPriority::DataDistribution ) ); /*TraceEvent("TrackShardBytesStarting") @@ -162,15 +179,12 @@ ACTOR Future trackShardBytes( .detail("StartingMetrics", shardMetrics->get().present() ? shardMetrics->get().get().metrics.bytes : 0) .detail("StartingMerges", shardMetrics->get().present() ? shardMetrics->get().get().merges : 0);*/ - state ReadBandwidthStatus readBandwidthStatus; try { loop { - ShardSizeBounds bounds; - if (shardMetrics->get().present()) { - auto bytes = shardMetrics->get().get().bytes; - auto bandwidthStatus = getBandwidthStatus(shardMetrics->get().get()); - auto newReadBandwidthStatus = getReadBandwidthStatus(shardMetrics->get().get()); - + state ShardSizeBounds bounds; + if( shardMetrics->get().present() ) { + auto bytes = shardMetrics->get().get().metrics.bytes; + auto newReadBandwidthStatus = getReadBandwidthStatus(shardMetrics->get().get().metrics); bounds.max.bytes = std::max( int64_t(bytes * 1.1), (int64_t)SERVER_KNOBS->MIN_SHARD_BYTES ); bounds.min.bytes = std::min( int64_t(bytes * 0.9), std::max(int64_t(bytes - (SERVER_KNOBS->MIN_SHARD_BYTES * 0.1)), (int64_t)0) ); bounds.permittedError.bytes = bytes * 0.1; @@ -227,30 +241,47 @@ ACTOR Future trackShardBytes( bounds.min.iosPerKSecond = 0; bounds.permittedError.iosPerKSecond = bounds.permittedError.infinity; - Transaction tr(self->cx); - StorageMetrics metrics = wait( tr.waitStorageMetrics( keys, bounds.min, bounds.max, bounds.permittedError, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT ) ); + loop { + Transaction tr(self->cx); + std::pair, int> metrics = wait( tr.waitStorageMetrics( keys, bounds.min, bounds.max, bounds.permittedError, CLIENT_KNOBS->STORAGE_METRICS_SHARD_LIMIT, shardCount ) ); + if(metrics.first.present()) { + BandwidthStatus newBandwidthStatus = getBandwidthStatus( metrics.first.get() ); + if(newBandwidthStatus == BandwidthStatusLow && bandwidthStatus != BandwidthStatusLow) { + lastLowBandwidthStartTime = now(); + } + bandwidthStatus = newBandwidthStatus; - /*TraceEvent("ShardSizeUpdate") - .detail("Keys", keys) - .detail("UpdatedSize", metrics.metrics.bytes) - .detail("Bandwidth", metrics.metrics.bytesPerKSecond) - .detail("BandwidthStatus", getBandwidthStatus(metrics)) - .detail("BytesLower", bounds.min.bytes) - .detail("BytesUpper", bounds.max.bytes) - .detail("BandwidthLower", bounds.min.bytesPerKSecond) - .detail("BandwidthUpper", bounds.max.bytesPerKSecond) - .detail("ShardSizePresent", shardSize->get().present()) - .detail("OldShardSize", shardSize->get().present() ? shardSize->get().get().metrics.bytes : 0) - .detail("TrackerID", trackerID);*/ + /*TraceEvent("ShardSizeUpdate") + .detail("Keys", keys) + .detail("UpdatedSize", metrics.metrics.bytes) + .detail("Bandwidth", metrics.metrics.bytesPerKSecond) + .detail("BandwithStatus", getBandwidthStatus(metrics)) + .detail("BytesLower", bounds.min.bytes) + .detail("BytesUpper", bounds.max.bytes) + .detail("BandwidthLower", bounds.min.bytesPerKSecond) + .detail("BandwidthUpper", bounds.max.bytesPerKSecond) + .detail("ShardSizePresent", shardSize->get().present()) + .detail("OldShardSize", shardSize->get().present() ? shardSize->get().get().metrics.bytes : 0) + .detail("TrackerID", trackerID);*/ - if( shardMetrics->get().present() ) { - self->dbSizeEstimate->set( self->dbSizeEstimate->get() + metrics.bytes - shardMetrics->get().get().bytes ); - if(keys.begin >= systemKeys.begin) { - self->systemSizeEstimate += metrics.bytes - shardMetrics->get().get().bytes; + if( shardMetrics->get().present() ) { + self->dbSizeEstimate->set( self->dbSizeEstimate->get() + metrics.first.get().bytes - shardMetrics->get().get().metrics.bytes ); + if(keys.begin >= systemKeys.begin) { + self->systemSizeEstimate += metrics.first.get().bytes - shardMetrics->get().get().metrics.bytes; + } + } + + shardMetrics->set( ShardMetrics(metrics.first.get(), lastLowBandwidthStartTime, shardCount) ); + break; + } else { + shardCount = metrics.second; + if(shardMetrics->get().present()) { + auto newShardMetrics = shardMetrics->get().get(); + newShardMetrics.shardCount = shardCount; + shardMetrics->set( newShardMetrics ); + } } } - - shardMetrics->set( metrics ); } } catch( Error &e ) { if (e.code() != error_code_actor_cancelled) @@ -290,10 +321,10 @@ ACTOR Future>> getSplitKeys( DataDistributionTracke } } -ACTOR Future getFirstSize( Reference>> stats ) { +ACTOR Future getFirstSize( Reference>> stats ) { loop { if(stats->get().present()) - return stats->get().get().bytes; + return stats->get().get().metrics.bytes; wait( stats->onChange() ); } } @@ -333,8 +364,12 @@ ACTOR Future changeSizes( DataDistributionTracker* self, KeyRange keys, in return Void(); } -struct HasBeenTrueFor : NonCopyable { - explicit HasBeenTrueFor( bool value ) : trigger( value ? Void() : Future() ) {} +struct HasBeenTrueFor : ReferenceCounted { + explicit HasBeenTrueFor( Optional value ) { + if(value.present()) { + trigger = delayJittered(std::max(0.0, SERVER_KNOBS->DD_MERGE_COALESCE_DELAY + value.get().lastLowBandwidthStartTime - now()), decrementPriority(TaskPriority::DataDistribution) ) || cleared.getFuture(); + } + } Future set() { if( !trigger.isValid() ) { @@ -364,11 +399,11 @@ private: ACTOR Future shardSplitter( DataDistributionTracker* self, KeyRange keys, - Reference>> shardSize, + Reference>> shardSize, ShardSizeBounds shardBounds ) { - state StorageMetrics metrics = shardSize->get().get(); - state BandwidthStatus bandwidthStatus = getBandwidthStatus( shardSize->get().get() ); + state StorageMetrics metrics = shardSize->get().get().metrics; + state BandwidthStatus bandwidthStatus = getBandwidthStatus( metrics ); //Split TEST(true); // shard to be split @@ -418,17 +453,28 @@ ACTOR Future shardSplitter( self->output.send( RelocateShard( r, SERVER_KNOBS->PRIORITY_SPLIT_SHARD) ); } - self->sizeChanges.add( changeSizes( self, keys, shardSize->get().get().bytes ) ); + self->sizeChanges.add( changeSizes( self, keys, shardSize->get().get().metrics.bytes ) ); } else { wait( delay(1.0, TaskPriority::DataDistribution) ); //In case the reason the split point was off was due to a discrepancy between storage servers } return Void(); } +ACTOR Future brokenPromiseToReady( Future f ) { + try { + wait(f); + } catch( Error &e ) { + if(e.code() != error_code_broken_promise) { + throw; + } + } + return Void(); +} + Future shardMerger( DataDistributionTracker* self, KeyRange const& keys, - Reference>> shardSize ) + Reference>> shardSize ) { int64_t maxShardSize = self->maxShardSize->get().get(); @@ -442,11 +488,17 @@ Future shardMerger( int shardsMerged = 1; bool forwardComplete = false; KeyRangeRef merged; - StorageMetrics endingStats = shardSize->get().get(); - int64_t systemBytes = keys.begin >= systemKeys.begin ? shardSize->get().get().bytes : 0; + StorageMetrics endingStats = shardSize->get().get().metrics; + int shardCount = shardSize->get().get().shardCount; + double lastLowBandwidthStartTime = shardSize->get().get().lastLowBandwidthStartTime; + if(FLOW_KNOBS->DELAY_JITTER_OFFSET*SERVER_KNOBS->DD_MERGE_COALESCE_DELAY > SERVER_KNOBS->DD_LOW_BANDWIDTH_DELAY && now() - lastLowBandwidthStartTime < SERVER_KNOBS->DD_LOW_BANDWIDTH_DELAY) { + TraceEvent( g_network->isSimulated() ? SevError : SevWarnAlways, "ShardMergeTooSoon", self->distributorId).detail("Keys", keys).detail("LastLowBandwidthStartTime", lastLowBandwidthStartTime); + } + + int64_t systemBytes = keys.begin >= systemKeys.begin ? shardSize->get().get().metrics.bytes : 0; loop { - Optional newMetrics; + Optional newMetrics; if( !forwardComplete ) { if( nextIter->range().end == allKeys.end ) { forwardComplete = true; @@ -456,7 +508,7 @@ Future shardMerger( newMetrics = nextIter->value().stats->get(); // If going forward, give up when the next shard's stats are not yet present. - if( !newMetrics.present() ) { + if( !newMetrics.present() || shardCount + newMetrics.get().shardCount >= CLIENT_KNOBS->SHARD_COUNT_LIMIT ) { --nextIter; forwardComplete = true; continue; @@ -468,10 +520,10 @@ Future shardMerger( // If going backward, stop when the stats are not present or if the shard is already over the merge // bounds. If this check triggers right away (if we have not merged anything) then return a trigger // on the previous shard changing "size". - if( !newMetrics.present() ) { + if( !newMetrics.present() || shardCount + newMetrics.get().shardCount >= CLIENT_KNOBS->SHARD_COUNT_LIMIT ) { if( shardsMerged == 1 ) { TEST( true ); // shardMerger cannot merge anything - return prevIter->value().stats->onChange(); + return brokenPromiseToReady( prevIter->value().stats->onChange() ); } ++prevIter; @@ -480,15 +532,18 @@ Future shardMerger( } merged = KeyRangeRef( prevIter->range().begin, nextIter->range().end ); - endingStats += newMetrics.get(); + endingStats += newMetrics.get().metrics; + shardCount += newMetrics.get().shardCount; + lastLowBandwidthStartTime = newMetrics.get().lastLowBandwidthStartTime; if((forwardComplete ? prevIter->range().begin : nextIter->range().begin) >= systemKeys.begin) { - systemBytes += newMetrics.get().bytes; + systemBytes += newMetrics.get().metrics.bytes; } shardsMerged++; auto shardBounds = getShardSizeBounds( merged, maxShardSize ); if( endingStats.bytes >= shardBounds.min.bytes || getBandwidthStatus( endingStats ) != BandwidthStatusLow || + now() - lastLowBandwidthStartTime < SERVER_KNOBS->DD_LOW_BANDWIDTH_DELAY || shardsMerged >= SERVER_KNOBS->DD_MERGE_LIMIT ) { // The merged range is larger than the min bounds so we cannot continue merging in this direction. // This means that: @@ -501,9 +556,10 @@ Future shardMerger( break; // If going forward, remove most recently added range - endingStats -= newMetrics.get(); + endingStats -= newMetrics.get().metrics; + shardCount -= newMetrics.get().shardCount; if(nextIter->range().begin >= systemKeys.begin) { - systemBytes -= newMetrics.get().bytes; + systemBytes -= newMetrics.get().metrics.bytes; } shardsMerged--; --nextIter; @@ -519,12 +575,14 @@ Future shardMerger( .detail("OldKeys", keys) .detail("NewKeys", mergeRange) .detail("EndingSize", endingStats.bytes) - .detail("BatchedMerges", shardsMerged); + .detail("BatchedMerges", shardsMerged) + .detail("LastLowBandwidthStartTime", lastLowBandwidthStartTime) + .detail("ShardCount", shardCount); if(mergeRange.begin < systemKeys.begin) { self->systemSizeEstimate -= systemBytes; } - restartShardTrackers( self, mergeRange, endingStats ); + restartShardTrackers( self, mergeRange, ShardMetrics(endingStats, lastLowBandwidthStartTime, shardCount) ); self->shardsAffectedByTeamFailure->defineShard( mergeRange ); self->output.send( RelocateShard( mergeRange, SERVER_KNOBS->PRIORITY_MERGE_SHARD ) ); @@ -535,8 +593,8 @@ Future shardMerger( ACTOR Future shardEvaluator( DataDistributionTracker* self, KeyRange keys, - Reference>> shardSize, - HasBeenTrueFor *wantsToMerge) + Reference>> shardSize, + Reference wantsToMerge) { Future onChange = shardSize->onChange() || yieldedFuture(self->maxShardSize->onChange()); @@ -544,7 +602,7 @@ ACTOR Future shardEvaluator( // getShardSizeBounds() will allways have shardBounds.min.bytes == 0 for shards that start at allKeys.begin, // so will will never attempt to merge that shard with the one previous. ShardSizeBounds shardBounds = getShardSizeBounds(keys, self->maxShardSize->get().get()); - StorageMetrics const& stats = shardSize->get().get(); + StorageMetrics const& stats = shardSize->get().get().metrics; auto bandwidthStatus = getBandwidthStatus( stats ); bool shouldSplit = stats.bytes > shardBounds.max.bytes || @@ -592,11 +650,8 @@ ACTOR Future shardEvaluator( ACTOR Future shardTracker( DataDistributionTracker* self, KeyRange keys, - Reference>> shardSize) + Reference>> shardSize) { - // Survives multiple calls to shardEvaluator and keeps merges from happening too quickly. - state HasBeenTrueFor wantsToMerge( shardSize->get().present() ); - wait( yieldedFuture(self->readyToStart.getFuture()) ); if( !shardSize->get().present() ) @@ -608,6 +663,9 @@ ACTOR Future shardTracker( // Since maxShardSize will become present for all shards at once, avoid slow tasks with a short delay wait( delay( 0, TaskPriority::DataDistribution ) ); + // Survives multiple calls to shardEvaluator and keeps merges from happening too quickly. + state Reference wantsToMerge( new HasBeenTrueFor( shardSize->get() ) ); + /*TraceEvent("ShardTracker", self->distributorId) .detail("Begin", keys.begin) .detail("End", keys.end) @@ -619,7 +677,7 @@ ACTOR Future shardTracker( try { loop { // Use the current known size to check for (and start) splits and merges. - wait( shardEvaluator( self, keys, shardSize, &wantsToMerge ) ); + wait( shardEvaluator( self, keys, shardSize, wantsToMerge ) ); // We could have a lot of actors being released from the previous wait at the same time. Immediately calling // delay(0) mitigates the resulting SlowTask @@ -632,7 +690,7 @@ ACTOR Future shardTracker( } } -void restartShardTrackers( DataDistributionTracker* self, KeyRangeRef keys, Optional startingSize ) { +void restartShardTrackers( DataDistributionTracker* self, KeyRangeRef keys, Optional startingSize ) { auto ranges = self->shards.getAffectedRangesAfterInsertion( keys, ShardTrackedData() ); for(int i=0; i>> shardSize( new AsyncVar>() ); + Reference>> shardSize( new AsyncVar>() ); // For the case where the new tracker will take over at the boundaries of current shard(s) // we can use the old size if it is available. This will be the case when merging shards. @@ -697,7 +755,7 @@ ACTOR Future fetchShardMetrics_impl( DataDistributionTracker* self, GetMet onChange = stats->onChange(); break; } - returnMetrics += t.value().stats->get().get(); + returnMetrics += t.value().stats->get().get().metrics; } if( !onChange.isValid() ) { diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index c914598221..93ac93fae8 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -26,7 +26,7 @@ ServerKnobs const* SERVER_KNOBS = new ServerKnobs(); #define init( knob, value ) initKnob( knob, value, #knob ) -ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { +ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimulated) { // clang-format off // Versions init( VERSIONS_PER_SECOND, 1e6 ); @@ -106,7 +106,6 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( INFLIGHT_PENALTY_HEALTHY, 1.0 ); init( INFLIGHT_PENALTY_UNHEALTHY, 10.0 ); init( INFLIGHT_PENALTY_ONE_LEFT, 1000.0 ); - init( MERGE_ONTO_NEW_TEAM, 1 ); if( randomize && BUGGIFY ) MERGE_ONTO_NEW_TEAM = deterministicRandom()->coinflip() ? 0 : 2; init( PRIORITY_RECOVER_MOVE, 110 ); init( PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, 120 ); @@ -114,13 +113,13 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( PRIORITY_TEAM_HEALTHY, 140 ); init( PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER, 150 ); init( PRIORITY_TEAM_REDUNDANT, 200 ); - init( PRIORITY_MERGE_SHARD, 340 ); init( PRIORITY_TEAM_UNHEALTHY, 700 ); init( PRIORITY_TEAM_2_LEFT, 709 ); init( PRIORITY_TEAM_1_LEFT, 800 ); init( PRIORITY_TEAM_FAILED, 805 ); init( PRIORITY_TEAM_0_LEFT, 809 ); - init( PRIORITY_SPLIT_SHARD, 900 ); if( randomize && BUGGIFY ) PRIORITY_SPLIT_SHARD = 350; + init( PRIORITY_MERGE_SHARD, 940 ); if( randomize && BUGGIFY ) PRIORITY_SPLIT_SHARD = 340; + init( PRIORITY_SPLIT_SHARD, 950 ); if( randomize && BUGGIFY ) PRIORITY_SPLIT_SHARD = 350; // Data distribution init( RETRY_RELOCATESHARD_DELAY, 0.1 ); @@ -190,7 +189,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( DATA_DISTRIBUTION_LOGGING_INTERVAL, 5.0 ); init( DD_ENABLED_CHECK_DELAY, 1.0 ); init( DD_STALL_CHECK_DELAY, 0.4 ); //Must be larger than 2*MAX_BUGGIFIED_DELAY - init( DD_MERGE_COALESCE_DELAY, 120.0 ); if( randomize && BUGGIFY ) DD_MERGE_COALESCE_DELAY = 0.001; + init( DD_LOW_BANDWIDTH_DELAY, isSimulated ? 90.0 : 240.0 ); if( randomize && BUGGIFY ) DD_LOW_BANDWIDTH_DELAY = 0; //Because of delayJitter, this should be less than 0.9 * DD_MERGE_COALESCE_DELAY + init( DD_MERGE_COALESCE_DELAY, isSimulated ? 120.0 : 300.0 ); if( randomize && BUGGIFY ) DD_MERGE_COALESCE_DELAY = 0.001; init( STORAGE_METRICS_POLLING_DELAY, 2.0 ); if( randomize && BUGGIFY ) STORAGE_METRICS_POLLING_DELAY = 15.0; init( STORAGE_METRICS_RANDOM_DELAY, 0.2 ); init( FREE_SPACE_RATIO_CUTOFF, 0.1 ); @@ -352,7 +352,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( SAMPLE_EXPIRATION_TIME, 1.0 ); init( SAMPLE_POLL_TIME, 0.1 ); init( RESOLVER_STATE_MEMORY_LIMIT, 1e6 ); - init( LAST_LIMITED_RATIO, 0.6 ); + init( LAST_LIMITED_RATIO, 2.0 ); //Cluster Controller init( CLUSTER_CONTROLLER_LOGGING_DELAY, 5.0 ); @@ -488,6 +488,10 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) { init( BYTE_SAMPLE_LOAD_DELAY, 0.0 ); if( randomize && BUGGIFY ) BYTE_SAMPLE_LOAD_DELAY = 0.1; init( BYTE_SAMPLE_START_DELAY, 1.0 ); if( randomize && BUGGIFY ) BYTE_SAMPLE_START_DELAY = 0.0; init( UPDATE_STORAGE_PROCESS_STATS_INTERVAL, 5.0 ); + init( BEHIND_CHECK_DELAY, 2.0 ); + init( BEHIND_CHECK_COUNT, 2 ); + init( BEHIND_CHECK_VERSIONS, 5 * VERSIONS_PER_SECOND ); + init( WAIT_METRICS_WRONG_SHARD_CHANCE, 0.1 ); //Wait Failure init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2; diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index c654f23916..fc28e8202c 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -106,8 +106,7 @@ public: double INFLIGHT_PENALTY_REDUNDANT; double INFLIGHT_PENALTY_UNHEALTHY; double INFLIGHT_PENALTY_ONE_LEFT; - int MERGE_ONTO_NEW_TEAM; // Merges will request new servers. 0 for off, 1 for \xff only, 2 for all shards. - + // Higher priorities are executed first // Priority/100 is the "priority group"/"superpriority". Priority inversion // is possible within but not between priority groups; fewer priority groups @@ -119,12 +118,12 @@ public: int PRIORITY_TEAM_HEALTHY; int PRIORITY_TEAM_CONTAINS_UNDESIRED_SERVER; int PRIORITY_TEAM_REDUNDANT; - int PRIORITY_MERGE_SHARD; int PRIORITY_TEAM_UNHEALTHY; int PRIORITY_TEAM_2_LEFT; int PRIORITY_TEAM_1_LEFT; int PRIORITY_TEAM_FAILED; // Priority when a server in the team is excluded as failed int PRIORITY_TEAM_0_LEFT; + int PRIORITY_MERGE_SHARD; int PRIORITY_SPLIT_SHARD; // Data distribution @@ -151,6 +150,7 @@ public: double DATA_DISTRIBUTION_LOGGING_INTERVAL; double DD_ENABLED_CHECK_DELAY; double DD_STALL_CHECK_DELAY; + double DD_LOW_BANDWIDTH_DELAY; double DD_MERGE_COALESCE_DELAY; double STORAGE_METRICS_POLLING_DELAY; double STORAGE_METRICS_RANDOM_DELAY; @@ -428,6 +428,10 @@ public: double BYTE_SAMPLE_LOAD_DELAY; double BYTE_SAMPLE_START_DELAY; double UPDATE_STORAGE_PROCESS_STATS_INTERVAL; + double BEHIND_CHECK_DELAY; + int BEHIND_CHECK_COUNT; + int64_t BEHIND_CHECK_VERSIONS; + double WAIT_METRICS_WRONG_SHARD_CHANCE; //Wait Failure int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS; @@ -474,7 +478,7 @@ public: int64_t FASTRESTORE_HEARTBEAT_INTERVAL; double FASTRESTORE_SAMPLING_PERCENT; - ServerKnobs(bool randomize = false, ClientKnobs* clientKnobs = NULL); + ServerKnobs(bool randomize = false, ClientKnobs* clientKnobs = NULL, bool isSimulated = false); }; extern ServerKnobs const* SERVER_KNOBS; diff --git a/fdbserver/LogSystemConfig.h b/fdbserver/LogSystemConfig.h index a45db2f688..ea93a83049 100644 --- a/fdbserver/LogSystemConfig.h +++ b/fdbserver/LogSystemConfig.h @@ -217,6 +217,19 @@ struct LogSystemConfig { return format("type: %d oldGenerations: %d tags: %d %s", logSystemType, oldTLogs.size(), logRouterTags, describe(tLogs).c_str()); } + Optional getRemoteDcId() const { + for( int i = 0; i < tLogs.size(); i++ ) { + if(!tLogs[i].isLocal) { + for( int j = 0; j < tLogs[i].tLogs.size(); j++ ) { + if( tLogs[i].tLogs[j].present() ) { + return tLogs[i].tLogs[j].interf().locality.dcId(); + } + } + } + } + return Optional(); + } + std::vector allLocalLogs(bool includeSatellite = true) const { std::vector results; for( int i = 0; i < tLogs.size(); i++ ) { diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index e434d96c31..068fec48e8 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -219,8 +219,6 @@ struct ProxyCommitData { NotifiedVersion latestLocalCommitBatchResolving; NotifiedVersion latestLocalCommitBatchLogging; - PromiseStream commitBatchStartNotifications; - PromiseStream> commitBatchVersions; // 1:1 with commitBatchStartNotifications RequestStream getConsistentReadVersion; RequestStream commit; Database cx; @@ -432,7 +430,6 @@ ACTOR Future commitBatcher(ProxyCommitData *commitData, PromiseStreamcommitBatchStartNotifications.send(Void()); if(now() - lastBatch > commitData->commitBatchInterval) { timeout = delayJittered(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE, TaskPriority::ProxyCommitBatcher); } @@ -444,7 +441,6 @@ ACTOR Future commitBatcher(ProxyCommitData *commitData, PromiseStream CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT || req.firstInBatch()) && batch.size()) { out.send({ batch, batchBytes }); lastBatch = now(); - commitData->commitBatchStartNotifications.send(Void()); timeout = delayJittered(commitData->commitBatchInterval, TaskPriority::ProxyCommitBatcher); batch = std::vector(); batchBytes = 0; @@ -508,7 +504,7 @@ ACTOR Future addBackupMutations(ProxyCommitData* self, std::map SERVER_KNOBS->DESIRED_TOTAL_BYTES) { yieldBytes = 0; - wait(yield()); + wait(yield(TaskPriority::ProxyCommitYield2)); } valueWriter.serializeBytes(blobIter->data); yieldBytes += blobIter->data.size(); @@ -602,21 +598,16 @@ ACTOR Future commitBatch( if (debugID.present()) g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.Before"); - if (trs.empty()) { - // We are sending an empty batch, so we have to trigger the version fetcher - self->commitBatchStartNotifications.send(Void()); - } - /////// Phase 1: Pre-resolution processing (CPU bound except waiting for a version # which is separately pipelined and *should* be available by now (unless empty commit); ordered; currently atomic but could yield) TEST(self->latestLocalCommitBatchResolving.get() < localBatchNumber-1); // Queuing pre-resolution commit processing wait(self->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber-1)); - wait(yield()); + wait(yield(TaskPriority::ProxyCommitYield1)); if (debugID.present()) g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GettingCommitVersion"); - Future fVersionReply = waitNext(self->commitBatchVersions.getFuture()); - GetCommitVersionReply versionReply = wait(fVersionReply); + GetCommitVersionRequest req(self->commitVersionRequestNumber++, self->mostRecentProcessedRequestNumber, self->dbgid); + GetCommitVersionReply versionReply = wait( brokenPromiseToNever(self->master.getCommitVersion.getReply(req, TaskPriority::ProxyMasterVersionReply)) ); self->mostRecentProcessedRequestNumber = versionReply.requestNum; self->stats.txnCommitVersionAssigned += trs.size(); @@ -674,7 +665,7 @@ ACTOR Future commitBatch( ////// Phase 3: Post-resolution processing (CPU bound except for very rare situations; ordered; currently atomic but doesn't need to be) TEST(self->latestLocalCommitBatchLogging.get() < localBatchNumber-1); // Queuing post-resolution commit processing wait(self->latestLocalCommitBatchLogging.whenAtLeast(localBatchNumber-1)); - wait(yield()); + wait(yield(TaskPriority::ProxyCommitYield2)); self->stats.txnCommitResolved += trs.size(); @@ -832,7 +823,7 @@ ACTOR Future commitBatch( for (; mutationNum < pMutations->size(); mutationNum++) { if(yieldBytes > SERVER_KNOBS->DESIRED_TOTAL_BYTES) { yieldBytes = 0; - wait(yield()); + wait(yield(TaskPriority::ProxyCommitYield2)); } auto& m = (*pMutations)[mutationNum]; @@ -1014,7 +1005,7 @@ ACTOR Future commitBatch( } self->lastCommitLatency = now()-commitStartTime; self->lastCommitTime = std::max(self->lastCommitTime.get(), commitStartTime); - wait(yield()); + wait(yield(TaskPriority::ProxyCommitYield3)); if( self->popRemoteTxs && msg.popTo > ( self->txsPopVersions.size() ? self->txsPopVersions.back().second : self->lastTxsPop ) ) { if(self->txsPopVersions.size() >= SERVER_KNOBS->MAX_TXS_POP_VERSION_HISTORY) { @@ -1162,14 +1153,6 @@ ACTOR Future getLiveCommittedVersion(ProxyCommitData* commi return rep; } -ACTOR Future fetchVersions(ProxyCommitData *commitData) { - loop { - waitNext(commitData->commitBatchStartNotifications.getFuture()); - GetCommitVersionRequest req(commitData->commitVersionRequestNumber++, commitData->mostRecentProcessedRequestNumber, commitData->dbgid); - commitData->commitBatchVersions.send(brokenPromiseToNever(commitData->master.getCommitVersion.getReply(req))); - } -} - struct TransactionRateInfo { double rate; double limit; @@ -1661,7 +1644,6 @@ ACTOR Future masterProxyServerCore( state GetHealthMetricsReply healthMetricsReply; state GetHealthMetricsReply detailedHealthMetricsReply; - addActor.send( fetchVersions(&commitData) ); addActor.send( waitFailureServer(proxy.waitFailure.getFuture()) ); //TraceEvent("ProxyInit1", proxy.id()); diff --git a/fdbserver/OldTLogServer_6_0.actor.cpp b/fdbserver/OldTLogServer_6_0.actor.cpp index 7bf6b83700..2933f59a44 100644 --- a/fdbserver/OldTLogServer_6_0.actor.cpp +++ b/fdbserver/OldTLogServer_6_0.actor.cpp @@ -2301,9 +2301,7 @@ ACTOR Future tLogStart( TLogData* self, InitializeTLogRequest req, Localit } wait(logData->committingQueue.getFuture() || logData->removed ); } catch( Error &e ) { - if(e.code() != error_code_actor_cancelled) { - req.reply.sendError(e); - } + req.reply.sendError(recruitment_failed()); if( e.code() != error_code_worker_removed ) { throw; diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index 590f4a4c98..b2ededc633 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -182,6 +182,7 @@ struct RatekeeperData { RatekeeperLimits batchLimits; Deque actualTpsHistory; + Optional remoteDC; RatekeeperData() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")), @@ -384,7 +385,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { // Look at each storage server's write queue and local rate, compute and store the desired rate ratio for(auto i = self->storageQueueInfo.begin(); i != self->storageQueueInfo.end(); ++i) { auto& ss = i->value; - if (!ss.valid) continue; + if (!ss.valid || (self->remoteDC.present() && ss.locality.dcId() == self->remoteDC)) continue; ++sscount; limitReason_t ssLimitReason = limitReason_t::unlimited; @@ -537,7 +538,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) { Version minLimitingSSVer = std::numeric_limits::max(); for (const auto& it : self->storageQueueInfo) { auto& ss = it.value; - if (!ss.valid) continue; + if (!ss.valid || (self->remoteDC.present() && ss.locality.dcId() == self->remoteDC)) continue; minSSVer = std::min(minSSVer, ss.lastReply.version); @@ -741,6 +742,8 @@ ACTOR Future ratekeeper(RatekeeperInterface rkInterf, Referenceget().logSystemConfig.getRemoteDcId(); + try { state bool lastLimited = false; loop choose { @@ -797,6 +800,7 @@ ACTOR Future ratekeeper(RatekeeperInterface rkInterf, Referenceget().logSystemConfig.getRemoteDcId(); } when ( wait(collection) ) { ASSERT(false); diff --git a/fdbserver/TLogServer.actor.cpp b/fdbserver/TLogServer.actor.cpp index 0285ebf0b9..281ce87bfd 100644 --- a/fdbserver/TLogServer.actor.cpp +++ b/fdbserver/TLogServer.actor.cpp @@ -2756,9 +2756,7 @@ ACTOR Future tLogStart( TLogData* self, InitializeTLogRequest req, Localit } wait(logData->committingQueue.getFuture() || logData->removed ); } catch( Error &e ) { - if(e.code() != error_code_actor_cancelled) { - req.reply.sendError(e); - } + req.reply.sendError(recruitment_failed()); if( e.code() != error_code_worker_removed ) { throw; diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index 68b3e21b24..2250f3242d 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -436,7 +436,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted> tLogCommitResults; for(int loc=0; loc< it->logServers.size(); loc++) { Standalone msg = data.getMessages(location); - allReplies.push_back( it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( msg.arena(), prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, msg, debugID ), TaskPriority::TLogCommitReply ) ); + allReplies.push_back( it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( msg.arena(), prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, msg, debugID ), TaskPriority::ProxyTLogCommitReply ) ); Future commitSuccess = success(allReplies.back()); addActor.get().send(commitSuccess); tLogCommitResults.push_back(commitSuccess); diff --git a/fdbserver/fdbserver.actor.cpp b/fdbserver/fdbserver.actor.cpp index d3f7377046..067ece4522 100644 --- a/fdbserver/fdbserver.actor.cpp +++ b/fdbserver/fdbserver.actor.cpp @@ -1143,10 +1143,8 @@ private: } case OPT_TRACECLOCK: { const char* a = args.OptionArg(); - if (!strcmp(a, "realtime")) - g_trace_clock = TRACE_CLOCK_REALTIME; - else if (!strcmp(a, "now")) - g_trace_clock = TRACE_CLOCK_NOW; + if (!strcmp(a, "realtime")) g_trace_clock.store(TRACE_CLOCK_REALTIME); + else if (!strcmp(a, "now")) g_trace_clock.store(TRACE_CLOCK_NOW); else { fprintf(stderr, "ERROR: Unknown clock source `%s'\n", a); printHelpTeaser(argv[0]); @@ -1538,7 +1536,7 @@ int main(int argc, char* argv[]) { delete CLIENT_KNOBS; FlowKnobs* flowKnobs = new FlowKnobs(true, role == Simulation); ClientKnobs* clientKnobs = new ClientKnobs(true); - ServerKnobs* serverKnobs = new ServerKnobs(true, clientKnobs); + ServerKnobs* serverKnobs = new ServerKnobs(true, clientKnobs, role == Simulation); FLOW_KNOBS = flowKnobs; SERVER_KNOBS = serverKnobs; CLIENT_KNOBS = clientKnobs; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 697b4571ec..cfb8d68bdf 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -434,6 +434,7 @@ public: bool shuttingDown; bool behind; + bool versionBehind; bool debug_inApplyUpdate; double debug_lastValidateTime; @@ -530,7 +531,7 @@ public: shuttingDown(false), debug_inApplyUpdate(false), debug_lastValidateTime(0), watchBytes(0), numWatches(0), logProtocol(0), counters(this), tag(invalidTag), maxQueryQueue(0), thisServerID(ssi.id()), readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")), - behind(false), byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), noRecentUpdates(false), + behind(false), versionBehind(false), byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), noRecentUpdates(false), lastUpdate(now()), poppedAllAfter(std::numeric_limits::max()), cpuUsage(0.0), diskUsage(0.0) { version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id); @@ -765,7 +766,7 @@ ACTOR Future waitForVersion( StorageServer* data, Version version ) { else if (version <= data->version.get()) return version; - if(data->behind && version > data->version.get()) { + if((data->behind || data->versionBehind) && version > data->version.get()) { throw process_behind(); } @@ -3419,9 +3420,18 @@ ACTOR Future waitMetrics( StorageServerMetrics* self, WaitMetricsRequest r break; } - if ( timedout || !req.min.allLessOrEqual( metrics ) || !metrics.allLessOrEqual( req.max ) ) { - TEST( !timedout ); // ShardWaitMetrics return case 2 (delayed) - TEST( timedout ); // ShardWaitMetrics return on timeout + if( timedout ) { + TEST( true ); // ShardWaitMetrics return on timeout + if(deterministicRandom()->random01() < SERVER_KNOBS->WAIT_METRICS_WRONG_SHARD_CHANCE) { + req.reply.sendError( wrong_shard_server() ); + } else { + req.reply.send( metrics ); + } + break; + } + + if ( !req.min.allLessOrEqual( metrics ) || !metrics.allLessOrEqual( req.max ) ) { + TEST( true ); // ShardWaitMetrics return case 2 (delayed) req.reply.send( metrics ); break; } @@ -3510,6 +3520,28 @@ ACTOR Future logLongByteSampleRecovery(Future recovery) { return Void(); } +ACTOR Future checkBehind( StorageServer* self ) { + state int behindCount = 0; + loop { + wait( delay(SERVER_KNOBS->BEHIND_CHECK_DELAY) ); + state Transaction tr(self->cx); + loop { + try { + Version readVersion = wait( tr.getRawReadVersion() ); + if( readVersion > self->version.get() + SERVER_KNOBS->BEHIND_CHECK_VERSIONS ) { + behindCount++; + } else { + behindCount = 0; + } + self->versionBehind = behindCount >= SERVER_KNOBS->BEHIND_CHECK_COUNT; + break; + } catch( Error &e ) { + wait(tr.onError(e)); + } + } + } +} + ACTOR Future storageServerCore( StorageServer* self, StorageServerInterface ssi ) { state Future doUpdate = Void(); @@ -3526,6 +3558,7 @@ ACTOR Future storageServerCore( StorageServer* self, StorageServerInterfac actors.add(self->otherError.getFuture()); actors.add(metricsCore(self, ssi)); actors.add(logLongByteSampleRecovery(self->byteSampleRecovery)); + actors.add(checkBehind(self)); self->coreStarted.send( Void() ); diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 54e9398e8f..72ef0f13b0 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -69,15 +69,23 @@ extern IKeyValueStore* keyValueStoreCompressTestData(IKeyValueStore* store); ACTOR static Future extractClientInfo( Reference> db, Reference> info ) { + state std::vector lastProxyUIDs; + state std::vector lastProxies; loop { - info->set( db->get().client ); + ClientDBInfo ni = db->get().client; + shrinkProxyList(ni, lastProxyUIDs, lastProxies); + info->set( ni ); wait( db->onChange() ); } } ACTOR static Future extractClientInfo( Reference>> db, Reference> info ) { + state std::vector lastProxyUIDs; + state std::vector lastProxies; loop { - info->set( db->get().read().client ); + ClientDBInfo ni = db->get().read().client; + shrinkProxyList(ni, lastProxyUIDs, lastProxies); + info->set( ni ); wait( db->onChange() ); } } diff --git a/flow/Knobs.cpp b/flow/Knobs.cpp index d1510817e4..fe88f79d65 100644 --- a/flow/Knobs.cpp +++ b/flow/Knobs.cpp @@ -74,6 +74,8 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) { init( TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT, 20.0 ); init( TLS_CERT_REFRESH_DELAY_SECONDS, 12*60*60 ); + init( TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT, 9.0 ); + init( TLS_CLIENT_CONNECTION_THROTTLE_TIMEOUT, 11.0 ); init( NETWORK_TEST_REPLY_SIZE, 600e3 ); diff --git a/flow/Knobs.h b/flow/Knobs.h index 333c1c1222..a5cdf1ca10 100644 --- a/flow/Knobs.h +++ b/flow/Knobs.h @@ -91,9 +91,11 @@ public: int USE_OBJECT_SERIALIZER; int TLS_CERT_REFRESH_DELAY_SECONDS; + double TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT; + double TLS_CLIENT_CONNECTION_THROTTLE_TIMEOUT; int NETWORK_TEST_REPLY_SIZE; - + //AsyncFileCached int64_t PAGE_CACHE_4K; int64_t PAGE_CACHE_64K; diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index 92aef230ea..b06f3f2b48 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -167,7 +167,6 @@ public: uint64_t numYields; - double lastPriorityTrackTime; TaskPriority lastMinTaskID; std::priority_queue> ready; @@ -521,7 +520,7 @@ Net2::Net2(bool useThreadPool, bool useMetrics) int priBins[] = { 1, 2050, 3050, 4050, 4950, 5050, 7050, 8050, 10050 }; static_assert( sizeof(priBins) == sizeof(int)*NetworkMetrics::PRIORITY_BINS, "Fix priority bins"); for(int i=0; i(priBins[i]); + networkInfo.metrics.priorityBins[i] = static_cast(priBins[i]); updateNow(); } @@ -737,22 +736,21 @@ void Net2::run() { void Net2::trackMinPriority( TaskPriority minTaskID, double now ) { if (minTaskID != lastMinTaskID) { for(int c=0; c minTaskID && pri <= lastMinTaskID) { // busy -> idle - double busyFor = lastPriorityTrackTime - networkMetrics.priorityTimer[c]; - networkMetrics.priorityBlocked[c] = false; - networkMetrics.priorityBlockedDuration[c] += busyFor; - networkMetrics.secSquaredPriorityBlocked[c] += busyFor * busyFor; + networkInfo.metrics.priorityBlocked[c] = false; + networkInfo.metrics.priorityBlockedDuration[c] += now - networkInfo.metrics.windowedPriorityTimer[c]; + networkInfo.metrics.priorityMaxBlockedDuration[c] = std::max(networkInfo.metrics.priorityMaxBlockedDuration[c], now - networkInfo.metrics.priorityTimer[c]); } if (pri <= minTaskID && pri > lastMinTaskID) { // idle -> busy - networkMetrics.priorityBlocked[c] = true; - networkMetrics.priorityTimer[c] = now; + networkInfo.metrics.priorityBlocked[c] = true; + networkInfo.metrics.priorityTimer[c] = now; + networkInfo.metrics.windowedPriorityTimer[c] = now; } } } lastMinTaskID = minTaskID; - lastPriorityTrackTime = now; } void Net2::processThreadReady() { @@ -772,7 +770,7 @@ void Net2::checkForSlowTask(int64_t tscBegin, int64_t tscEnd, double duration, T int64_t elapsed = tscEnd-tscBegin; if (elapsed > FLOW_KNOBS->TSC_YIELD_TIME && tscBegin > 0) { int i = std::min(NetworkMetrics::SLOW_EVENT_BINS-1, log( elapsed/1e6 ) / log(2.)); - ++networkMetrics.countSlowEvents[i]; + ++networkInfo.metrics.countSlowEvents[i]; int64_t warnThreshold = g_network->isSimulated() ? 10e9 : 500e6; //printf("SlowTask: %d, %d yields\n", (int)(elapsed/1e6), numYields); diff --git a/flow/Platform.cpp b/flow/Platform.cpp index 75c2b056b5..0847c9c2d4 100644 --- a/flow/Platform.cpp +++ b/flow/Platform.cpp @@ -133,12 +133,12 @@ std::string removeWhitespace(const std::string &t) if (found != std::string::npos) str.erase(found + 1); else - str.clear(); // str is all whitespace + str.clear(); // str is all whitespace found = str.find_first_not_of(ws); if (found != std::string::npos) str.erase(0, found); else - str.clear(); // str is all whitespace + str.clear(); // str is all whitespace return str; } @@ -2805,6 +2805,8 @@ extern volatile bool net2backtraces_overflow; extern volatile int64_t net2backtraces_count; extern std::atomic net2liveness; extern void initProfiling(); + +std::atomic checkThreadTime; #endif volatile thread_local bool profileThread = false; @@ -2852,7 +2854,9 @@ void profileHandler(int sig) { // We are casting away the volatile-ness of the backtrace array, but we believe that should be reasonably safe in the signal handler ProfilingSample* ps = const_cast((volatile ProfilingSample*)(net2backtraces + net2backtraces_offset)); - ps->timestamp = timer(); + // We can only read the check thread time in a signal handler if the atomic is lock free. + // We can't get the time from a timer() call because it's not signal safe. + ps->timestamp = checkThreadTime.is_lock_free() ? checkThreadTime.load() : 0; // SOMEDAY: should we limit the maximum number of frames from backtrace beyond just available space? size_t size = backtrace(ps->frames, net2backtraces_max - net2backtraces_offset - 2); @@ -2899,6 +2903,7 @@ void* checkThread(void *arg) { } lastSignal = t; + checkThreadTime.store(lastSignal); pthread_kill(mainThread, SIGPROF); } } diff --git a/flow/SystemMonitor.cpp b/flow/SystemMonitor.cpp index b2ea8e5735..e523c9d6d1 100644 --- a/flow/SystemMonitor.cpp +++ b/flow/SystemMonitor.cpp @@ -95,8 +95,8 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta .detail("MachineID", machineState.machineId) .detail("AIOSubmitCount", netData.countAIOSubmit - statState->networkState.countAIOSubmit) .detail("AIOCollectCount", netData.countAIOCollect - statState->networkState.countAIOCollect) - .detail("AIOSubmitLag", (g_network->networkMetrics.secSquaredSubmit - statState->networkMetricsState.secSquaredSubmit) / currentStats.elapsed) - .detail("AIODiskStall", (g_network->networkMetrics.secSquaredDiskStall - statState->networkMetricsState.secSquaredDiskStall) / currentStats.elapsed) + .detail("AIOSubmitLag", (g_network->networkInfo.metrics.secSquaredSubmit - statState->networkMetricsState.secSquaredSubmit) / currentStats.elapsed) + .detail("AIODiskStall", (g_network->networkInfo.metrics.secSquaredDiskStall - statState->networkMetricsState.secSquaredDiskStall) / currentStats.elapsed) .detail("CurrentConnections", netData.countConnEstablished - netData.countConnClosedWithError - netData.countConnClosedWithoutError) .detail("ConnectionsEstablished", (double) (netData.countConnEstablished - statState->networkState.countConnEstablished) / currentStats.elapsed) .detail("ConnectionsClosed", ((netData.countConnClosedWithError - statState->networkState.countConnClosedWithError) + (netData.countConnClosedWithoutError - statState->networkState.countConnClosedWithoutError)) / currentStats.elapsed) @@ -142,23 +142,22 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta .detail("ReactTime", netData.countReactTime - statState->networkState.countReactTime); for (int i = 0; inetworkMetrics.countSlowEvents[i] - statState->networkMetricsState.countSlowEvents[i]) { + if (int c = g_network->networkInfo.metrics.countSlowEvents[i] - statState->networkMetricsState.countSlowEvents[i]) { n.detail(format("SlowTask%dM", 1 << i).c_str(), c); } } - for (int i = 0; i < NetworkMetrics::PRIORITY_BINS && g_network->networkMetrics.priorityBins[i] != TaskPriority::Zero; i++) { - if(g_network->networkMetrics.priorityBlocked[i]) { - double lastSegment = std::min(currentStats.elapsed, now() - g_network->networkMetrics.priorityTimer[i]); - g_network->networkMetrics.priorityBlockedDuration[i] += lastSegment; - g_network->networkMetrics.secSquaredPriorityBlocked[i] += lastSegment * lastSegment; - g_network->networkMetrics.priorityTimer[i] = now(); + for (int i = 0; i < NetworkMetrics::PRIORITY_BINS && g_network->networkInfo.metrics.priorityBins[i] != TaskPriority::Zero; i++) { + if(g_network->networkInfo.metrics.priorityBlocked[i]) { + g_network->networkInfo.metrics.priorityBlockedDuration[i] += now() - g_network->networkInfo.metrics.windowedPriorityTimer[i]; + g_network->networkInfo.metrics.priorityMaxBlockedDuration[i] = std::max(g_network->networkInfo.metrics.priorityMaxBlockedDuration[i], now() - g_network->networkInfo.metrics.priorityTimer[i]); + g_network->networkInfo.metrics.windowedPriorityTimer[i] = now(); } - double blocked = g_network->networkMetrics.priorityBlockedDuration[i] - statState->networkMetricsState.priorityBlockedDuration[i]; - double s2Blocked = g_network->networkMetrics.secSquaredPriorityBlocked[i] - statState->networkMetricsState.secSquaredPriorityBlocked[i]; - n.detail(format("PriorityBusy%d", g_network->networkMetrics.priorityBins[i]).c_str(), blocked); - n.detail(format("SumOfSquaredPriorityBusy%d", g_network->networkMetrics.priorityBins[i]).c_str(), s2Blocked); + n.detail(format("PriorityBusy%d", g_network->networkInfo.metrics.priorityBins[i]).c_str(), std::min(currentStats.elapsed, g_network->networkInfo.metrics.priorityBlockedDuration[i] - statState->networkMetricsState.priorityBlockedDuration[i])); + n.detail(format("PriorityMaxBusy%d", g_network->networkInfo.metrics.priorityBins[i]).c_str(), g_network->networkInfo.metrics.priorityMaxBlockedDuration[i]); + + g_network->networkInfo.metrics.priorityMaxBlockedDuration[i] = 0; } n.trackLatest("NetworkMetrics"); @@ -288,7 +287,7 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta } } #endif - statState->networkMetricsState = g_network->networkMetrics; + statState->networkMetricsState = g_network->networkInfo.metrics; statState->networkState = netData; return currentStats; } diff --git a/flow/Trace.cpp b/flow/Trace.cpp index c40f026a94..5c788ffcca 100644 --- a/flow/Trace.cpp +++ b/flow/Trace.cpp @@ -113,7 +113,7 @@ struct SuppressionMap { }; TraceBatch g_traceBatch; -thread_local trace_clock_t g_trace_clock = TRACE_CLOCK_REALTIME; +std::atomic g_trace_clock{ TRACE_CLOCK_NOW }; LatestEventCache latestEventCache; SuppressionMap suppressedEvents; @@ -422,7 +422,7 @@ public: TraceEventFields rolledFields; for(auto itr = events[idx].begin(); itr != events[idx].end(); ++itr) { if(itr->first == "Time") { - rolledFields.addField("Time", format("%.6f", (g_trace_clock == TRACE_CLOCK_NOW) ? now() : timer())); + rolledFields.addField("Time", format("%.6f", TraceEvent::getCurrentTime())); rolledFields.addField("OriginalTime", itr->second); } else if(itr->first == "TrackLatestType") { @@ -724,26 +724,12 @@ bool TraceEvent::init() { if(enabled) { tmpEventMetric = new DynamicEventMetric(MetricNameRef()); - double time; - if(g_trace_clock == TRACE_CLOCK_NOW) { - if(!g_network) { - static double preNetworkTime = timer_monotonic(); - time = preNetworkTime; - } - else { - time = now(); - } - } - else { - time = timer(); - } - if(err.isValid() && err.isInjectedFault() && severity == SevError) { severity = SevWarnAlways; } detail("Severity", int(severity)); - detailf("Time", "%.6f", time); + detailf("Time", "%.6f", getCurrentTime()); detail("Type", type); if(g_network && g_network->isSimulated()) { NetworkAddress local = g_network->getLocalAddress(); @@ -994,13 +980,26 @@ thread_local bool TraceEvent::networkThread = false; void TraceEvent::setNetworkThread() { traceEventThrottlerCache = new TransientThresholdMetricSample>(FLOW_KNOBS->TRACE_EVENT_METRIC_UNITS_PER_SAMPLE, FLOW_KNOBS->TRACE_EVENT_THROTTLER_MSG_LIMIT); networkThread = true; - g_trace_clock = TRACE_CLOCK_NOW; } bool TraceEvent::isNetworkThread() { return networkThread; } +double TraceEvent::getCurrentTime() { + if(g_trace_clock.load() == TRACE_CLOCK_NOW) { + if(!isNetworkThread() || !g_network) { + return timer_monotonic(); + } + else { + return now(); + } + } + else { + return timer(); + } +} + TraceInterval& TraceInterval::begin() { pairID = nondeterministicRandom()->randomUniqueID(); count = 0; @@ -1008,20 +1007,20 @@ TraceInterval& TraceInterval::begin() { } void TraceBatch::addEvent( const char *name, uint64_t id, const char *location ) { - eventBatch.push_back( EventInfo(g_trace_clock == TRACE_CLOCK_NOW ? now() : timer(), name, id, location)); + eventBatch.push_back( EventInfo(TraceEvent::getCurrentTime(), name, id, location)); if( g_network->isSimulated() || FLOW_KNOBS->AUTOMATIC_TRACE_DUMP ) dump(); } void TraceBatch::addAttach( const char *name, uint64_t id, uint64_t to ) { - attachBatch.push_back( AttachInfo(g_trace_clock == TRACE_CLOCK_NOW ? now() : timer(), name, id, to)); + attachBatch.push_back( AttachInfo(TraceEvent::getCurrentTime(), name, id, to)); if( g_network->isSimulated() || FLOW_KNOBS->AUTOMATIC_TRACE_DUMP ) dump(); } void TraceBatch::addBuggify( int activated, int line, std::string file ) { if( g_network ) { - buggifyBatch.push_back( BuggifyInfo(g_trace_clock == TRACE_CLOCK_NOW ? now() : timer(), activated, line, file)); + buggifyBatch.push_back( BuggifyInfo(TraceEvent::getCurrentTime(), activated, line, file)); if( g_network->isSimulated() || FLOW_KNOBS->AUTOMATIC_TRACE_DUMP ) dump(); } else { diff --git a/flow/Trace.h b/flow/Trace.h index 7840db7c43..1cb0e9da2d 100644 --- a/flow/Trace.h +++ b/flow/Trace.h @@ -22,6 +22,7 @@ #define FLOW_TRACE_H #pragma once +#include #include #include #include @@ -381,6 +382,8 @@ struct TraceEvent { static void setNetworkThread(); static bool isNetworkThread(); + static double getCurrentTime(); + //Must be called directly after constructing the trace event TraceEvent& error(const class Error& e, bool includeCancelled=false) { if (enabled) { @@ -572,7 +575,7 @@ void addTraceRole(std::string role); void removeTraceRole(std::string role); enum trace_clock_t { TRACE_CLOCK_NOW, TRACE_CLOCK_REALTIME }; -extern thread_local trace_clock_t g_trace_clock; +extern std::atomic g_trace_clock; extern TraceBatch g_traceBatch; #define DUMPTOKEN(name) \ diff --git a/flow/network.h b/flow/network.h index 51239c8bb1..531955208a 100644 --- a/flow/network.h +++ b/flow/network.h @@ -59,9 +59,14 @@ enum class TaskPriority { TLogCommitReply = 8580, TLogCommit = 8570, ProxyGetRawCommittedVersion = 8565, - ProxyResolverReply = 8560, - ProxyCommitBatcher = 8550, - ProxyCommit = 8540, + ProxyCommitYield3 = 8562, + ProxyTLogCommitReply = 8560, + ProxyCommitYield2 = 8557, + ProxyResolverReply = 8555, + ProxyMasterVersionReply = 8550, + ProxyCommitYield1 = 8547, + ProxyCommit = 8545, + ProxyCommitBatcher = 8540, TLogConfirmRunningReply = 8530, TLogConfirmRunning = 8520, ProxyGRVTimer = 8510, @@ -300,24 +305,31 @@ template class Promise; struct NetworkMetrics { enum { SLOW_EVENT_BINS = 16 }; - uint64_t countSlowEvents[SLOW_EVENT_BINS]; + uint64_t countSlowEvents[SLOW_EVENT_BINS] = {}; enum { PRIORITY_BINS = 9 }; - TaskPriority priorityBins[ PRIORITY_BINS ]; - bool priorityBlocked[PRIORITY_BINS]; - double priorityBlockedDuration[PRIORITY_BINS]; - double secSquaredPriorityBlocked[PRIORITY_BINS]; - double priorityTimer[PRIORITY_BINS]; + TaskPriority priorityBins[PRIORITY_BINS] = {}; + bool priorityBlocked[PRIORITY_BINS] = {}; + double priorityBlockedDuration[PRIORITY_BINS] = {}; + double priorityMaxBlockedDuration[PRIORITY_BINS] = {}; + double priorityTimer[PRIORITY_BINS] = {}; + double windowedPriorityTimer[PRIORITY_BINS] = {}; - double oldestAlternativesFailure; - double newestAlternativesFailure; - double lastAlternativesFailureSkipDelay; - double lastSync; + double secSquaredSubmit = 0; + double secSquaredDiskStall = 0; - double secSquaredSubmit; - double secSquaredDiskStall; + NetworkMetrics() {} +}; - NetworkMetrics() { memset(this, 0, sizeof(*this)); } +struct NetworkInfo { + NetworkMetrics metrics; + double oldestAlternativesFailure = 0; + double newestAlternativesFailure = 0; + double lastAlternativesFailureSkipDelay = 0; + + std::map, double> serverTLSConnectionThrottler; + + NetworkInfo() {} }; class IEventFD : public ReferenceCounted { @@ -468,7 +480,7 @@ public: return (netAddressesFuncPtr) ? reinterpret_cast(netAddressesFuncPtr)() : NetworkAddressList(); } - NetworkMetrics networkMetrics; + NetworkInfo networkInfo; protected: INetwork() {} diff --git a/packaging/msi/FDBInstaller.wxs b/packaging/msi/FDBInstaller.wxs index 55e5e10e74..cc4357a0ed 100644 --- a/packaging/msi/FDBInstaller.wxs +++ b/packaging/msi/FDBInstaller.wxs @@ -32,7 +32,7 @@