Merge branch 'master' of https://github.com/apple/foundationdb into jfu-fix-snapshot-backup-agent

This commit is contained in:
Jon Fu 2020-10-21 13:33:28 -04:00
commit 896c4ccc62
35 changed files with 449 additions and 271 deletions

View File

@ -7,7 +7,7 @@ ADD artifacts /mnt/artifacts
# Install build tools for building via make
RUN \
yum install -y distcc-server gperf rubygems python34 libmpc-devel npm cgdb
yum install -y distcc-server gperf rubygems python34 libmpc-devel npm cgdb jq
# Download and install llvm-10.0.0
RUN cd / &&\
@ -50,8 +50,8 @@ RUN cp -iv /usr/local/bin/clang++ /usr/local/bin/clang++.deref &&\
ldconfig &&\
rm -rf /mnt/artifacts
LABEL version=0.11.8
ENV DOCKER_IMAGEVER=0.11.8
LABEL version=0.11.9
ENV DOCKER_IMAGEVER=0.11.9
ENV CLANGCC=/usr/local/bin/clang.de8a65ef
ENV CLANGCXX=/usr/local/bin/clang++.de8a65ef

View File

@ -10,38 +10,38 @@ macOS
The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server.
* `FoundationDB-6.3.8.pkg <https://www.foundationdb.org/downloads/6.3.8/macOS/installers/FoundationDB-6.3.8.pkg>`_
* `FoundationDB-6.3.9.pkg <https://www.foundationdb.org/downloads/6.3.9/macOS/installers/FoundationDB-6.3.9.pkg>`_
Ubuntu
------
The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x.
* `foundationdb-clients-6.3.8-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.8/ubuntu/installers/foundationdb-clients_6.3.8-1_amd64.deb>`_
* `foundationdb-server-6.3.8-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.8/ubuntu/installers/foundationdb-server_6.3.8-1_amd64.deb>`_ (depends on the clients package)
* `foundationdb-clients-6.3.9-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.9/ubuntu/installers/foundationdb-clients_6.3.9-1_amd64.deb>`_
* `foundationdb-server-6.3.9-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.9/ubuntu/installers/foundationdb-server_6.3.9-1_amd64.deb>`_ (depends on the clients package)
RHEL/CentOS EL6
---------------
The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x.
* `foundationdb-clients-6.3.8-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.8/rhel6/installers/foundationdb-clients-6.3.8-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.3.8-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.8/rhel6/installers/foundationdb-server-6.3.8-1.el6.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.3.9-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.9/rhel6/installers/foundationdb-clients-6.3.9-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.3.9-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.9/rhel6/installers/foundationdb-server-6.3.9-1.el6.x86_64.rpm>`_ (depends on the clients package)
RHEL/CentOS EL7
---------------
The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x.
* `foundationdb-clients-6.3.8-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.8/rhel7/installers/foundationdb-clients-6.3.8-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.3.8-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.8/rhel7/installers/foundationdb-server-6.3.8-1.el7.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.3.9-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.9/rhel7/installers/foundationdb-clients-6.3.9-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.3.9-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.9/rhel7/installers/foundationdb-server-6.3.9-1.el7.x86_64.rpm>`_ (depends on the clients package)
Windows
-------
The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server.
* `foundationdb-6.3.8-x64.msi <https://www.foundationdb.org/downloads/6.3.8/windows/installers/foundationdb-6.3.8-x64.msi>`_
* `foundationdb-6.3.9-x64.msi <https://www.foundationdb.org/downloads/6.3.9/windows/installers/foundationdb-6.3.9-x64.msi>`_
API Language Bindings
=====================
@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part
If you need to use the FoundationDB Python API from other Python installations or paths, use the Python package manager ``pip`` (``pip install foundationdb``) or download the Python package:
* `foundationdb-6.3.8.tar.gz <https://www.foundationdb.org/downloads/6.3.8/bindings/python/foundationdb-6.3.8.tar.gz>`_
* `foundationdb-6.3.9.tar.gz <https://www.foundationdb.org/downloads/6.3.9/bindings/python/foundationdb-6.3.9.tar.gz>`_
Ruby 1.9.3/2.0.0+
-----------------
* `fdb-6.3.8.gem <https://www.foundationdb.org/downloads/6.3.8/bindings/ruby/fdb-6.3.8.gem>`_
* `fdb-6.3.9.gem <https://www.foundationdb.org/downloads/6.3.9/bindings/ruby/fdb-6.3.9.gem>`_
Java 8+
-------
* `fdb-java-6.3.8.jar <https://www.foundationdb.org/downloads/6.3.8/bindings/java/fdb-java-6.3.8.jar>`_
* `fdb-java-6.3.8-javadoc.jar <https://www.foundationdb.org/downloads/6.3.8/bindings/java/fdb-java-6.3.8-javadoc.jar>`_
* `fdb-java-6.3.9.jar <https://www.foundationdb.org/downloads/6.3.9/bindings/java/fdb-java-6.3.9.jar>`_
* `fdb-java-6.3.9-javadoc.jar <https://www.foundationdb.org/downloads/6.3.9/bindings/java/fdb-java-6.3.9-javadoc.jar>`_
Go 1.11+
--------

View File

@ -2,9 +2,17 @@
Release Notes
#############
6.2.27
======
* For clusters with a large number of shards, avoid slow tasks in the data distributor by adding yields to the shard map destruction. `(PR #3834) <https://github.com/apple/foundationdb/pull/3834>`_
* Reset the network connection between a proxy and master or resolvers if the proxy is too far behind in processing transactions. `(PR #3891) <https://github.com/apple/foundationdb/pull/3891>`_
6.2.26
======
* Attempt to detect when calling :func:`fdb_future_block_until_ready` would cause a deadlock, and throw ``blocked_from_network_thread`` if it would definitely cause a deadlock.
* Fixed undefined behavior in configuring supported FoundationDB versions while starting up a client. `(PR #3849) <https://github.com/apple/foundationdb/pull/3849>`_
* Updated OpenSSL to version 1.1.1h. `(PR #3809) <https://github.com/apple/foundationdb/pull/3809>`_
* Attempt to detect when calling :func:`fdb_future_block_until_ready` would cause a deadlock, and throw ``blocked_from_network_thread`` if it would definitely cause a deadlock. `(PR #3786) <https://github.com/apple/foundationdb/pull/3786>`_
6.2.25
======
@ -15,50 +23,31 @@ Release Notes
6.2.24
======
Features
--------
* Added the ``suspend`` command to ``fdbcli`` which kills a process and prevents it from rejoining the cluster for a specified duration. `(PR #3550) <https://github.com/apple/foundationdb/pull/3550>`_
6.2.23
======
Fixes
-----
* When configured with ``usable_regions=2`` data distribution could temporarily lower the replication of a shard when moving it. `(PR #3487) <https://github.com/apple/foundationdb/pull/3487>`_
* Prevent data distribution from running out of memory by fetching the source servers for too many shards in parallel. `(PR #3487) <https://github.com/apple/foundationdb/pull/3487>`_
* Reset network connections between log routers and satellite tlogs if the latencies are larger than 500ms. `(PR #3487) <https://github.com/apple/foundationdb/pull/3487>`_
Status
------
* Added per-process server request latency statistics reported in the role section of relevant processes. These are named ``grv_latency_statistics`` and ``commit_latency_statistics`` on proxy roles and ``read_latency_statistics`` on storage roles. `(PR #3480) <https://github.com/apple/foundationdb/pull/3480>`_
* Added ``cluster.active_primary_dc`` that indicates which datacenter is serving as the primary datacenter in multi-region setups. `(PR #3320) <https://github.com/apple/foundationdb/pull/3320>`_
6.2.22
======
Fixes
-----
* Coordinator class processes could be recruited as the cluster controller. `(PR #3282) <https://github.com/apple/foundationdb/pull/3282>`_
* HTTPS requests made by backup would fail (introduced in 6.2.21). `(PR #3284) <https://github.com/apple/foundationdb/pull/3284>`_
6.2.21
======
Fixes
-----
* HTTPS requests made by backup could hang indefinitely. `(PR #3027) <https://github.com/apple/foundationdb/pull/3027>`_
* ``fdbrestore`` prefix options required exactly a single hyphen instead of the standard two. `(PR #3056) <https://github.com/apple/foundationdb/pull/3056>`_
* Commits could stall on a newly elected proxy because of inaccurate compute estimates. `(PR #3123) <https://github.com/apple/foundationdb/pull/3123>`_
* A transaction class process with a bad disk could be repeatedly recruited as a transaction log. `(PR #3268) <https://github.com/apple/foundationdb/pull/3268>`_
* Fix a potential race condition that could lead to undefined behavior when connecting to a database using the multi-version client API. `(PR #3265) <https://github.com/apple/foundationdb/pull/3265>`_
Features
--------
* Added the ``getversion`` command to ``fdbcli`` which returns the current read version of the cluster. `(PR #2882) <https://github.com/apple/foundationdb/pull/2882>`_
* Added the ``advanceversion`` command to ``fdbcli`` which increases the current version of a cluster. `(PR #2965) <https://github.com/apple/foundationdb/pull/2965>`_
* Added the ``lock`` and ``unlock`` commands to ``fdbcli`` which lock or unlock a cluster. `(PR #2890) <https://github.com/apple/foundationdb/pull/2890>`_
@ -66,9 +55,6 @@ Features
6.2.20
======
Fixes
-----
* In rare scenarios, clients could send corrupted data to the server. `(PR #2976) <https://github.com/apple/foundationdb/pull/2976>`_
* Internal tools like ``fdbbackup`` are no longer tracked as clients in status (introduced in 6.2.18) `(PR #2849) <https://github.com/apple/foundationdb/pull/2849>`_
* Changed TLS error handling to match the behavior of 6.2.15. `(PR #2993) <https://github.com/apple/foundationdb/pull/2993>`_ `(PR #2977) <https://github.com/apple/foundationdb/pull/2977>`_
@ -76,9 +62,6 @@ Fixes
6.2.19
======
Fixes
-----
* Protect the proxies from running out of memory when bombarded with requests from clients. `(PR #2812) <https://github.com/apple/foundationdb/pull/2812>`_.
* One process with a ``proxy`` class would not become the first proxy when put with other ``stateless`` class processes. `(PR #2819) <https://github.com/apple/foundationdb/pull/2819>`_.
* If a transaction log stalled on a disk operation during recruitment the cluster would become unavailable until the process died. `(PR #2815) <https://github.com/apple/foundationdb/pull/2815>`_.
@ -86,70 +69,37 @@ Fixes
* Prevent the cluster from having too many active generations as a safety measure against repeated failures. `(PR #2814) <https://github.com/apple/foundationdb/pull/2814>`_.
* ``fdbcli`` status JSON could become truncated because of unprintable characters. `(PR #2807) <https://github.com/apple/foundationdb/pull/2807>`_.
* The data distributor used too much CPU in large clusters (broken in 6.2.16). `(PR #2806) <https://github.com/apple/foundationdb/pull/2806>`_.
Status
------
* Added ``cluster.workload.operations.memory_errors`` to measure the number of requests rejected by the proxies because the memory limit has been exceeded. `(PR #2812) <https://github.com/apple/foundationdb/pull/2812>`_.
* Added ``cluster.workload.operations.location_requests`` to measure the number of outgoing key server location responses from the proxies. `(PR #2812) <https://github.com/apple/foundationdb/pull/2812>`_.
* Added ``cluster.recovery_state.active_generations`` to track the number of generations for which the cluster still requires transaction logs. `(PR #2814) <https://github.com/apple/foundationdb/pull/2814>`_.
* Added ``network.tls_policy_failures`` to the ``processes`` section to record the number of TLS policy failures each process has observed. `(PR #2811) <https://github.com/apple/foundationdb/pull/2811>`_.
Features
--------
* Added ``--debug-tls`` as a command line argument to ``fdbcli`` to help diagnose TLS issues. `(PR #2810) <https://github.com/apple/foundationdb/pull/2810>`_.
6.2.18
======
Fixes
-----
* When configuring a cluster to usable_regions=2, data distribution would not react to machine failures while copying data to the remote region. `(PR #2774) <https://github.com/apple/foundationdb/pull/2774>`_.
* When a cluster is configured with usable_regions=2, data distribution could push a cluster into saturation by relocating too many shards simulatenously. `(PR #2776) <https://github.com/apple/foundationdb/pull/2776>`_.
* Do not allow the cluster controller to mark any process as failed within 30 seconds of startup. `(PR #2780) <https://github.com/apple/foundationdb/pull/2780>`_.
* Backup could not establish TLS connections (broken in 6.2.16). `(PR #2775) <https://github.com/apple/foundationdb/pull/2775>`_.
* Certificates were not refreshed automatically (broken in 6.2.16). `(PR #2781) <https://github.com/apple/foundationdb/pull/2781>`_.
Performance
-----------
* Improved the efficiency of establishing large numbers of network connections. `(PR #2777) <https://github.com/apple/foundationdb/pull/2777>`_.
Features
--------
* Add support for setting knobs to modify the behavior of ``fdbcli``. `(PR #2773) <https://github.com/apple/foundationdb/pull/2773>`_.
Other Changes
-------------
* Setting invalid knobs in backup and DR binaries is now a warning instead of an error and will not result in the application being terminated. `(PR #2773) <https://github.com/apple/foundationdb/pull/2773>`_.
6.2.17
======
Fixes
-----
* Restored the ability to set TLS configuration using environment variables (broken in 6.2.16). `(PR #2755) <https://github.com/apple/foundationdb/pull/2755>`_.
6.2.16
======
Performance
-----------
* Reduced tail commit latencies by improving commit pipelining on the proxies. `(PR #2589) <https://github.com/apple/foundationdb/pull/2589>`_.
* Data distribution does a better job balancing data when disks are more than 70% full. `(PR #2722) <https://github.com/apple/foundationdb/pull/2722>`_.
* Reverse range reads could read too much data from disk, resulting in poor performance relative to forward range reads. `(PR #2650) <https://github.com/apple/foundationdb/pull/2650>`_.
* Switched from LibreSSL to OpenSSL to improve the speed of establishing connections. `(PR #2646) <https://github.com/apple/foundationdb/pull/2646>`_.
* The cluster controller does a better job avoiding multiple recoveries when first recruited. `(PR #2698) <https://github.com/apple/foundationdb/pull/2698>`_.
Fixes
-----
* Storage servers could fail to advance their version correctly in response to empty commits. `(PR #2617) <https://github.com/apple/foundationdb/pull/2617>`_.
* Status could not label more than 5 processes as proxies. `(PR #2653) <https://github.com/apple/foundationdb/pull/2653>`_.
* The ``TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER``, ``TR_FLAG_REMOVE_MT_WITH_MOST_TEAMS``, ``TR_FLAG_DISABLE_SERVER_TEAM_REMOVER``, and ``BUGGIFY_ALL_COORDINATION`` knobs could not be set at runtime. `(PR #2661) <https://github.com/apple/foundationdb/pull/2661>`_.
@ -161,17 +111,11 @@ Fixes
6.2.15
======
Fixes
-----
* TLS throttling could block legitimate connections. `(PR #2575) <https://github.com/apple/foundationdb/pull/2575>`_.
6.2.14
======
Fixes
-----
* Data distribution was prioritizing shard merges too highly. `(PR #2562) <https://github.com/apple/foundationdb/pull/2562>`_.
* Status would incorrectly mark clusters as having no fault tolerance. `(PR #2562) <https://github.com/apple/foundationdb/pull/2562>`_.
* A proxy could run out of memory if disconnected from the cluster for too long. `(PR #2562) <https://github.com/apple/foundationdb/pull/2562>`_.
@ -179,26 +123,16 @@ Fixes
6.2.13
======
Performance
-----------
* Optimized the commit path the proxies to significantly reduce commit latencies in large clusters. `(PR #2536) <https://github.com/apple/foundationdb/pull/2536>`_.
* Data distribution could create temporarily untrackable shards which could not be split if they became hot. `(PR #2546) <https://github.com/apple/foundationdb/pull/2546>`_.
6.2.12
======
Performance
-----------
* Throttle TLS connect attempts from misconfigured clients. `(PR #2529) <https://github.com/apple/foundationdb/pull/2529>`_.
* Reduced master recovery times in large clusters. `(PR #2430) <https://github.com/apple/foundationdb/pull/2430>`_.
* Improved performance while a remote region is catching up. `(PR #2527) <https://github.com/apple/foundationdb/pull/2527>`_.
* The data distribution algorithm does a better job preventing hot shards while recovering from machine failures. `(PR #2526) <https://github.com/apple/foundationdb/pull/2526>`_.
Fixes
-----
* Improve the reliability of a ``kill`` command from ``fdbcli``. `(PR #2512) <https://github.com/apple/foundationdb/pull/2512>`_.
* The ``--traceclock`` parameter to fdbserver incorrectly had no effect. `(PR #2420) <https://github.com/apple/foundationdb/pull/2420>`_.
* Clients could throw an internal error during ``commit`` if client buggification was enabled. `(PR #2427) <https://github.com/apple/foundationdb/pull/2427>`_.
@ -208,9 +142,6 @@ Fixes
6.2.11
======
Fixes
-----
* Clients could hang indefinitely on reads if all storage servers holding a keyrange were removed from a cluster since the last time the client read a key in the range. `(PR #2377) <https://github.com/apple/foundationdb/pull/2377>`_.
* In rare scenarios, status could falsely report no replicas remain of some data. `(PR #2380) <https://github.com/apple/foundationdb/pull/2380>`_.
* Latency band tracking could fail to configure correctly after a recovery or upon process startup. `(PR #2371) <https://github.com/apple/foundationdb/pull/2371>`_.
@ -218,17 +149,11 @@ Fixes
6.2.10
======
Fixes
-----
* ``backup_agent`` crashed on startup. `(PR #2356) <https://github.com/apple/foundationdb/pull/2356>`_.
6.2.9
=====
Fixes
-----
* Small clusters using specific sets of process classes could cause the data distributor to be continuously killed and re-recruited. `(PR #2344) <https://github.com/apple/foundationdb/pull/2344>`_.
* The data distributor and ratekeeper could be recruited on non-optimal processes. `(PR #2344) <https://github.com/apple/foundationdb/pull/2344>`_.
* A ``kill`` command from ``fdbcli`` could take a long time before being executed by a busy process. `(PR #2339) <https://github.com/apple/foundationdb/pull/2339>`_.
@ -238,9 +163,6 @@ Fixes
6.2.8
=====
Fixes
-----
* Significantly improved the rate at which the transaction logs in a remote region can pull data from the primary region. `(PR #2307) <https://github.com/apple/foundationdb/pull/2307>`_ `(PR #2323) <https://github.com/apple/foundationdb/pull/2323>`_.
* The ``system_kv_size_bytes`` status field could report a size much larger than the actual size of the system keyspace. `(PR #2305) <https://github.com/apple/foundationdb/pull/2305>`_.

View File

@ -2,7 +2,7 @@
Release Notes
#############
6.3.8
6.3.9
=====
Features
@ -61,6 +61,8 @@ Fixes
* In very rare scenarios, the data distributor process would crash when being shutdown. `(PR #3530) <https://github.com/apple/foundationdb/pull/3530>`_
* The master would die immediately if it did not have the correct cluster controller interface when recruited. [6.3.4] `(PR #3537) <https://github.com/apple/foundationdb/pull/3537>`_
* Fix an issue where ``fdbcli --exec 'exclude no_wait ...'`` would incorrectly report that processes can safely be removed from the cluster. [6.3.5] `(PR #3566) <https://github.com/apple/foundationdb/pull/3566>`_
* Commit latencies could become large because of inaccurate compute estimates. [6.3.9] `(PR #3845) <https://github.com/apple/foundationdb/pull/3845>`_
* Added a timeout on TLS handshakes to prevent them from hanging indefinitely. [6.3.9] `(PR #3850) <https://github.com/apple/foundationdb/pull/3850>`_
Status
------
@ -130,6 +132,7 @@ Fixes only impacting 6.3.0+
* Adjusted the proxy load balancing algorithm to be based on the CPU usage of the process instead of the number of requests processed. [6.3.5] `(PR #3653) <https://github.com/apple/foundationdb/pull/3653>`_
* Only return the error code ``batch_transaction_throttled`` for API versions greater than or equal to 630. [6.3.6] `(PR #3799) <https://github.com/apple/foundationdb/pull/3799>`_
* The fault tolerance calculation in status did not take into account region configurations. [6.3.8] `(PR #3836) <https://github.com/apple/foundationdb/pull/3836>`_
* Get read version tail latencies were high because some proxies were serving more read versions than other proxies. [6.3.9] `(PR #3845) <https://github.com/apple/foundationdb/pull/3845>`_
Earlier release notes
---------------------

View File

@ -257,7 +257,7 @@ struct Traceable<std::set<T>> : std::true_type {
std::string printable( const StringRef& val );
std::string printable( const std::string& val );
std::string printable( const KeyRangeRef& range );
std::string printable(const VectorRef<KeyRangeRef>& val);
std::string printable( const VectorRef<KeyRangeRef>& val);
std::string printable( const VectorRef<StringRef>& val );
std::string printable( const VectorRef<KeyValueRef>& val );
std::string printable( const KeyValueRef& val );

View File

@ -1185,9 +1185,9 @@ void DatabaseContext::setOption( FDBDatabaseOptions::Option option, Optional<Str
case FDBDatabaseOptions::MACHINE_ID:
clientLocality = LocalityData( clientLocality.processId(), value.present() ? Standalone<StringRef>(value.get()) : Optional<Standalone<StringRef>>(), clientLocality.machineId(), clientLocality.dcId() );
if (clientInfo->get().commitProxies.size())
commitProxies = Reference<CommitProxyInfo>(new CommitProxyInfo(clientInfo->get().commitProxies));
commitProxies = Reference<CommitProxyInfo>(new CommitProxyInfo(clientInfo->get().commitProxies, false));
if( clientInfo->get().grvProxies.size() )
grvProxies = Reference<GrvProxyInfo>( new GrvProxyInfo( clientInfo->get().grvProxies ) );
grvProxies = Reference<GrvProxyInfo>( new GrvProxyInfo( clientInfo->get().grvProxies, true) );
server_interf.clear();
locationCache.insert( allKeys, Reference<LocationInfo>() );
break;
@ -1197,9 +1197,9 @@ void DatabaseContext::setOption( FDBDatabaseOptions::Option option, Optional<Str
case FDBDatabaseOptions::DATACENTER_ID:
clientLocality = LocalityData(clientLocality.processId(), clientLocality.zoneId(), clientLocality.machineId(), value.present() ? Standalone<StringRef>(value.get()) : Optional<Standalone<StringRef>>());
if (clientInfo->get().commitProxies.size())
commitProxies = Reference<CommitProxyInfo>(new CommitProxyInfo(clientInfo->get().commitProxies));
commitProxies = Reference<CommitProxyInfo>( new CommitProxyInfo(clientInfo->get().commitProxies, false));
if( clientInfo->get().grvProxies.size() )
grvProxies = Reference<GrvProxyInfo>( new GrvProxyInfo( clientInfo->get().grvProxies ));
grvProxies = Reference<GrvProxyInfo>( new GrvProxyInfo( clientInfo->get().grvProxies, true));
server_interf.clear();
locationCache.insert( allKeys, Reference<LocationInfo>() );
break;
@ -1585,11 +1585,11 @@ void DatabaseContext::updateProxies() {
grvProxies.clear();
bool commitProxyProvisional = false, grvProxyProvisional = false;
if (clientInfo->get().commitProxies.size()) {
commitProxies = Reference<CommitProxyInfo>(new CommitProxyInfo(clientInfo->get().commitProxies));
commitProxies = Reference<CommitProxyInfo>(new CommitProxyInfo(clientInfo->get().commitProxies, false));
commitProxyProvisional = clientInfo->get().commitProxies[0].provisional;
}
if (clientInfo->get().grvProxies.size()) {
grvProxies = Reference<GrvProxyInfo>(new GrvProxyInfo(clientInfo->get().grvProxies));
grvProxies = Reference<GrvProxyInfo>(new GrvProxyInfo(clientInfo->get().grvProxies, true));
grvProxyProvisional = clientInfo->get().grvProxies[0].provisional;
}
if (clientInfo->get().commitProxies.size() && clientInfo->get().grvProxies.size()) {

View File

@ -1313,7 +1313,8 @@ Future< Standalone<RangeResultRef> > ReadYourWritesTransaction::getRange(
bool reverse )
{
if (getDatabase()->apiVersionAtLeast(630)) {
if (specialKeys.contains(begin.getKey()) && end.getKey() <= specialKeys.end) {
if (specialKeys.contains(begin.getKey()) && specialKeys.begin <= end.getKey() &&
end.getKey() <= specialKeys.end) {
TEST(true); // Special key space get range
return getDatabase()->specialKeySpace->getRange(this, begin, end, limits, reverse);
}

View File

@ -151,7 +151,7 @@ ACTOR Future<Void> normalizeKeySelectorActor(SpecialKeySpace* sks, ReadYourWrite
if (!ks->isFirstGreaterOrEqual()) {
// The Key Selector clamps up to the legal key space
TraceEvent(SevInfo, "ReadToBoundary")
TraceEvent(SevDebug, "ReadToBoundary")
.detail("TerminateKey", ks->getKey())
.detail("TerminateOffset", ks->offset);
if (ks->offset < 1)

View File

@ -186,16 +186,7 @@ struct PingReceiver final : NetworkMessageReceiver {
class TransportData {
public:
TransportData(uint64_t transportId)
: endpointNotFoundReceiver(endpoints),
pingReceiver(endpoints),
warnAlwaysForLargePacket(true),
lastIncompatibleMessage(0),
transportId(transportId),
numIncompatibleConnections(0)
{
degraded = Reference<AsyncVar<bool>>( new AsyncVar<bool>(false) );
}
TransportData(uint64_t transportId);
~TransportData();
@ -219,6 +210,7 @@ public:
std::unordered_map<NetworkAddress, Reference<struct Peer>> peers;
std::unordered_map<NetworkAddress, std::pair<double, double>> closedPeers;
HealthMonitor healthMonitor;
std::set<NetworkAddress> orderedAddresses;
Reference<AsyncVar<bool>> degraded;
bool warnAlwaysForLargePacket;
@ -243,8 +235,58 @@ public:
uint64_t transportId;
Future<Void> multiVersionCleanup;
Future<Void> pingLogger;
};
ACTOR Future<Void> pingLatencyLogger(TransportData* self) {
state NetworkAddress lastAddress = NetworkAddress();
loop {
if(self->orderedAddresses.size()) {
auto it = self->orderedAddresses.upper_bound(lastAddress);
if(it == self->orderedAddresses.end()) {
it = self->orderedAddresses.begin();
}
lastAddress = *it;
auto peer = self->getPeer(lastAddress);
if(!peer) {
TraceEvent(SevWarnAlways, "MissingNetworkAddress").suppressFor(10.0).detail("PeerAddr", lastAddress);
}
if(peer && peer->pingLatencies.getPopulationSize() >= 10) {
TraceEvent("PingLatency")
.detail("PeerAddr", lastAddress)
.detail("MinLatency", peer->pingLatencies.min())
.detail("MaxLatency", peer->pingLatencies.max())
.detail("MeanLatency", peer->pingLatencies.mean())
.detail("MedianLatency", peer->pingLatencies.median())
.detail("P90Latency", peer->pingLatencies.percentile(0.90))
.detail("Count", peer->pingLatencies.getPopulationSize())
.detail("BytesReceived", peer->bytesReceived - peer->lastLoggedBytesReceived)
.detail("BytesSent", peer->bytesSent - peer->lastLoggedBytesSent);
peer->pingLatencies.clear();
peer->lastLoggedBytesReceived = peer->bytesReceived;
peer->lastLoggedBytesSent = peer->bytesSent;
wait(delay(FLOW_KNOBS->PING_LOGGING_INTERVAL));
} else if(it == self->orderedAddresses.begin()) {
wait(delay(FLOW_KNOBS->PING_LOGGING_INTERVAL));
}
} else {
wait(delay(FLOW_KNOBS->PING_LOGGING_INTERVAL));
}
}
}
TransportData::TransportData(uint64_t transportId)
: endpointNotFoundReceiver(endpoints),
pingReceiver(endpoints),
warnAlwaysForLargePacket(true),
lastIncompatibleMessage(0),
transportId(transportId),
numIncompatibleConnections(0)
{
degraded = Reference<AsyncVar<bool>>( new AsyncVar<bool>(false) );
pingLogger = pingLatencyLogger(this);
}
#define CONNECT_PACKET_V0 0x0FDB00A444020001LL
#define CONNECT_PACKET_V0_SIZE 14
@ -370,10 +412,14 @@ ACTOR Future<Void> connectionMonitor( Reference<Peer> peer ) {
FlowTransport::transport().sendUnreliable( SerializeSource<ReplyPromise<Void>>(reply), remotePingEndpoint, true );
state int64_t startingBytes = peer->bytesReceived;
state int timeouts = 0;
state double startTime = now();
loop {
choose {
when (wait( delay( FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT ) )) {
if(startingBytes == peer->bytesReceived) {
if(peer->destination.isPublic()) {
peer->pingLatencies.addSample(now() - startTime);
}
TraceEvent("ConnectionTimeout").suppressFor(1.0).detail("WithAddr", peer->destination);
throw connection_failed();
}
@ -387,6 +433,9 @@ ACTOR Future<Void> connectionMonitor( Reference<Peer> peer ) {
timeouts++;
}
when (wait( reply.getFuture() )) {
if(peer->destination.isPublic()) {
peer->pingLatencies.addSample(now() - startTime);
}
break;
}
when (wait( peer->resetPing.onTrigger())) {
@ -409,9 +458,9 @@ ACTOR Future<Void> connectionWriter( Reference<Peer> self, Reference<IConnection
loop {
lastWriteTime = now();
int sent = conn->write(self->unsent.getUnsent(), FLOW_KNOBS->MAX_PACKET_SEND_BYTES);
if (sent != 0) {
int sent = conn->write(self->unsent.getUnsent(), /* limit= */ FLOW_KNOBS->MAX_PACKET_SEND_BYTES);
if (sent) {
self->bytesSent += sent;
self->transport->bytesSent += sent;
self->unsent.sent(sent);
}
@ -658,6 +707,7 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
TraceEvent("PeerDestroy").error(e).suppressFor(1.0).detail("PeerAddr", self->destination);
self->connect.cancel();
self->transport->peers.erase(self->destination);
self->transport->orderedAddresses.erase(self->destination);
return Void();
}
}
@ -667,8 +717,9 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
Peer::Peer(TransportData* transport, NetworkAddress const& destination)
: transport(transport), destination(destination), outgoingConnectionIdle(true), lastConnectTime(0.0),
reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true), outstandingReplies(0),
incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()) {
incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()),
pingLatencies(destination.isPublic() ? FLOW_KNOBS->PING_SAMPLE_AMOUNT : 1), lastLoggedBytesReceived(0),
bytesSent(0), lastLoggedBytesSent(0) {
IFailureMonitor::failureMonitor().setStatus(destination, FailureStatus(false));
}
@ -1146,6 +1197,9 @@ Reference<Peer> TransportData::getOrOpenPeer( NetworkAddress const& address, boo
peer->connect = connectionKeeper(peer);
}
peers[address] = peer;
if(address.isPublic()) {
orderedAddresses.insert(address);
}
}
return peer;

View File

@ -28,6 +28,7 @@
#include "flow/network.h"
#include "flow/FileIdentifier.h"
#include "flow/Net2Packet.h"
#include "fdbrpc/ContinuousSample.h"
#pragma pack(push, 4)
class Endpoint {
@ -140,8 +141,12 @@ struct Peer : public ReferenceCounted<Peer> {
int peerReferences;
bool incompatibleProtocolVersionNewer;
int64_t bytesReceived;
int64_t bytesSent;
double lastDataPacketSentTime;
int outstandingReplies;
ContinuousSample<double> pingLatencies;
int64_t lastLoggedBytesReceived;
int64_t lastLoggedBytesSent;
explicit Peer(TransportData* transport, NetworkAddress const& destination);

View File

@ -86,7 +86,10 @@ struct AlternativeInfo {
template <class T>
class ModelInterface : public ReferenceCounted<ModelInterface<T>> {
public:
ModelInterface( const vector<T>& v ) {
//If balanceOnRequests is true, the client will load balance based on the number of GRVs released by each proxy
//If balanceOnRequests is false, the client will load balance based on the CPU usage of each proxy
//Only requests which take from the GRV budget on the proxy should set balanceOnRequests to true
ModelInterface( const vector<T>& v, bool balanceOnRequests ) : balanceOnRequests(balanceOnRequests) {
for(int i = 0; i < v.size(); i++) {
alternatives.push_back(AlternativeInfo(v[i], 1.0/v.size(), (i+1.0)/v.size()));
}
@ -111,22 +114,26 @@ public:
}
void updateProbabilities() {
double totalBusyTime = 0;
double totalBusy = 0;
for(auto& it : alternatives) {
totalBusyTime += it.processBusyTime;
int busyMetric = balanceOnRequests ? it.processBusyTime/FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION :
it.processBusyTime%FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION;
totalBusy += busyMetric;
if(now() - it.lastUpdate > FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/2.0) {
return;
}
}
//Do not update probabilities if the average proxy busyness is less than 5%
if(totalBusyTime < FLOW_KNOBS->BASIC_LOAD_BALANCE_MIN_AMOUNT*alternatives.size()) {
if((balanceOnRequests && totalBusy < FLOW_KNOBS->BASIC_LOAD_BALANCE_MIN_REQUESTS*alternatives.size()) ||
(!balanceOnRequests && totalBusy < FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION*FLOW_KNOBS->BASIC_LOAD_BALANCE_MIN_CPU*alternatives.size())) {
return;
}
double totalProbability = 0;
for(auto& it : alternatives) {
it.probability += (1.0/alternatives.size()-(it.processBusyTime/totalBusyTime))*FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_CHANGE;
int busyMetric = balanceOnRequests ? it.processBusyTime/FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION :
it.processBusyTime%FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION;
it.probability += (1.0/alternatives.size()-(busyMetric/totalBusy))*FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_CHANGE;
it.probability = std::max(it.probability, 1/(FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_PROB*alternatives.size()));
it.probability = std::min(it.probability, FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_PROB/alternatives.size());
totalProbability += it.probability;
@ -160,6 +167,7 @@ public:
private:
vector<AlternativeInfo<T>> alternatives;
Future<Void> updater;
bool balanceOnRequests;
};
template <class T>

View File

@ -189,6 +189,8 @@ public:
void insert( const Range& keys, const Val& value );
Future<Void> clearAsync() { return map.clearAsync(); }
protected:
Map<Key,Val,pair_type,Metric> map;
const MetricFunc mf;

View File

@ -429,7 +429,7 @@ struct BackupData {
ACTOR static Future<Version> _getMinKnownCommittedVersion(BackupData* self) {
state Span span("BA:GetMinCommittedVersion"_loc);
loop {
GetReadVersionRequest request(span.context, 1, TransactionPriority::DEFAULT,
GetReadVersionRequest request(span.context, 0, TransactionPriority::DEFAULT,
GetReadVersionRequest::FLAG_USE_MIN_KNOWN_COMMITTED_VERSION);
choose {
when(wait(self->cx->onProxiesChanged())) {}

View File

@ -536,6 +536,16 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
state const Optional<UID>& debugID = self->debugID;
state Span span("MP:preresolutionProcessing"_loc, self->span.context);
if (self->localBatchNumber - self->pProxyCommitData->latestLocalCommitBatchResolving.get() >
SERVER_KNOBS->RESET_MASTER_BATCHES &&
now() - self->pProxyCommitData->lastMasterReset > SERVER_KNOBS->RESET_MASTER_DELAY) {
TraceEvent(SevWarnAlways, "ResetMasterNetwork")
.detail("CurrentBatch", self->localBatchNumber)
.detail("InProcessBatch", self->pProxyCommitData->latestLocalCommitBatchResolving.get());
FlowTransport::transport().resetConnection(self->pProxyCommitData->master.address());
self->pProxyCommitData->lastMasterReset = now();
}
// Pre-resolution the commits
TEST(pProxyCommitData->latestLocalCommitBatchResolving.get() < localBatchNumber - 1);
wait(pProxyCommitData->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber - 1));
@ -630,6 +640,18 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
pProxyCommitData, self->releaseDelay, self->localBatchNumber
);
if (self->localBatchNumber - self->pProxyCommitData->latestLocalCommitBatchLogging.get() >
SERVER_KNOBS->RESET_RESOLVER_BATCHES &&
now() - self->pProxyCommitData->lastResolverReset > SERVER_KNOBS->RESET_RESOLVER_DELAY) {
TraceEvent(SevWarnAlways, "ResetResolverNetwork")
.detail("CurrentBatch", self->localBatchNumber)
.detail("InProcessBatch", self->pProxyCommitData->latestLocalCommitBatchLogging.get());
for (int r = 0; r < self->pProxyCommitData->resolvers.size(); r++) {
FlowTransport::transport().resetConnection(self->pProxyCommitData->resolvers[r].address());
}
self->pProxyCommitData->lastResolverReset = now();
}
// Wait for the final resolution
std::vector<ResolveTransactionBatchReply> resolutionResp = wait(getAll(replies));
self->resolution.swap(*const_cast<std::vector<ResolveTransactionBatchReply>*>(&resolutionResp));
@ -1069,14 +1091,16 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
}
self->computeDuration += g_network->timer() - self->computeStart;
if(self->computeDuration > SERVER_KNOBS->MIN_PROXY_COMPUTE && self->batchOperations > 0) {
double computePerOperation = self->computeDuration / self->batchOperations;
if(computePerOperation <= pProxyCommitData->commitComputePerOperation[self->latencyBucket]) {
pProxyCommitData->commitComputePerOperation[self->latencyBucket] = computePerOperation;
} else {
pProxyCommitData->commitComputePerOperation[self->latencyBucket] = SERVER_KNOBS->PROXY_COMPUTE_GROWTH_RATE*computePerOperation + ((1.0-SERVER_KNOBS->PROXY_COMPUTE_GROWTH_RATE)*pProxyCommitData->commitComputePerOperation[self->latencyBucket]);
}
}
if (self->batchOperations > 0) {
double computePerOperation = std::min( SERVER_KNOBS->MAX_COMPUTE_PER_OPERATION, self->computeDuration / self->batchOperations );
if(computePerOperation <= pProxyCommitData->commitComputePerOperation[self->latencyBucket]) {
pProxyCommitData->commitComputePerOperation[self->latencyBucket] = computePerOperation;
} else {
pProxyCommitData->commitComputePerOperation[self->latencyBucket] = SERVER_KNOBS->PROXY_COMPUTE_GROWTH_RATE*computePerOperation + ((1.0-SERVER_KNOBS->PROXY_COMPUTE_GROWTH_RATE)*pProxyCommitData->commitComputePerOperation[self->latencyBucket]);
}
pProxyCommitData->stats.maxComputeNS = std::max<int64_t>(pProxyCommitData->stats.maxComputeNS, 1e9 * pProxyCommitData->commitComputePerOperation[self->latencyBucket]);
pProxyCommitData->stats.minComputeNS = std::min<int64_t>(pProxyCommitData->stats.minComputeNS, 1e9 * pProxyCommitData->commitComputePerOperation[self->latencyBucket]);
}
return Void();
}

View File

@ -181,8 +181,11 @@ public:
}
vector<StorageServerInterface> getLastKnownServerInterfaces() const override {
vector<StorageServerInterface> v(servers.size());
for (const auto& server : servers) v.push_back(server->lastKnownInterface);
vector<StorageServerInterface> v;
v.reserve(servers.size());
for (const auto& server : servers) {
v.push_back(server->lastKnownInterface);
}
return v;
}
int size() const override {
@ -3908,13 +3911,17 @@ ACTOR Future<Void> storageServerTracker(
}
}
} catch( Error &e ) {
if (e.code() != error_code_actor_cancelled && errorOut.canBeSet())
errorOut.sendError(e);
state Error err = e;
TraceEvent("StorageServerTrackerCancelled", self->distributorId)
.suppressFor(1.0)
.detail("Primary", self->primary)
.detail("Server", server->id);
throw;
.detail("Server", server->id)
.error(e, /*includeCancelled*/ true);
if (e.code() != error_code_actor_cancelled && errorOut.canBeSet()) {
errorOut.sendError(e);
wait(delay(0)); // Check for cancellation, since errorOut.sendError(e) could delete self
}
throw err;
}
}
@ -4486,7 +4493,7 @@ ACTOR Future<Void> monitorBatchLimitedTime(Reference<AsyncVar<ServerDBInfo>> db,
loop {
wait( delay(SERVER_KNOBS->METRIC_UPDATE_RATE) );
state Reference<GrvProxyInfo> grvProxies(new GrvProxyInfo(db->get().client.grvProxies));
state Reference<GrvProxyInfo> grvProxies(new GrvProxyInfo(db->get().client.grvProxies, false));
choose {
when (wait(db->onChange())) {}
@ -4665,6 +4672,10 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self, Promise
zeroHealthyTeams.push_back(Reference<AsyncVar<bool>>( new AsyncVar<bool>(true) ));
int storageTeamSize = configuration.storageTeamSize;
// Stored outside of data distribution tracker to avoid slow tasks
// when tracker is cancelled
state KeyRangeMap<ShardTrackedData> shards;
vector<Future<Void>> actors;
if (configuration.usableRegions > 1) {
tcis.push_back(TeamCollectionInterface());
@ -4678,8 +4689,16 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self, Promise
}
actors.push_back( pollMoveKeysLock(cx, lock) );
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getShardMetricsList, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, self->ddId ), "DDTracker", self->ddId, &normalDDQueueErrors() ) );
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, self->ddId, storageTeamSize, configuration.storageTeamSize, &lastLimited ), "DDQueue", self->ddId, &normalDDQueueErrors() ) );
actors.push_back(reportErrorsExcept(
dataDistributionTracker(initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics,
getShardMetricsList, getAverageShardBytes.getFuture(), readyToStart,
anyZeroHealthyTeams, self->ddId, &shards),
"DDTracker", self->ddId, &normalDDQueueErrors()));
actors.push_back(reportErrorsExcept(
dataDistributionQueue(cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis,
shardsAffectedByTeamFailure, lock, getAverageShardBytes, self->ddId,
storageTeamSize, configuration.storageTeamSize, &lastLimited),
"DDQueue", self->ddId, &normalDDQueueErrors()));
vector<DDTeamCollection*> teamCollectionsPtrs;
primaryTeamCollection = Reference<DDTeamCollection>( new DDTeamCollection(cx, self->ddId, lock, output, shardsAffectedByTeamFailure, configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy) );
@ -4704,8 +4723,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self, Promise
self->teamCollection = nullptr;
primaryTeamCollection = Reference<DDTeamCollection>();
remoteTeamCollection = Reference<DDTeamCollection>();
if( e.code() != error_code_movekeys_conflict )
throw err;
wait(shards.clearAsync());
if (err.code() != error_code_movekeys_conflict) throw err;
bool ddEnabled = wait( isDataDistributionEnabled(cx) );
TraceEvent("DataDistributionMoveKeysConflict").detail("DataDistributionEnabled", ddEnabled).error(err);
if( ddEnabled )

View File

@ -211,6 +211,26 @@ struct InitialDataDistribution : ReferenceCounted<InitialDataDistribution> {
Optional<Key> initHealthyZoneValue;
};
struct ShardMetrics {
StorageMetrics metrics;
double lastLowBandwidthStartTime;
int shardCount;
bool operator==(ShardMetrics const& rhs) const {
return metrics == rhs.metrics && lastLowBandwidthStartTime == rhs.lastLowBandwidthStartTime &&
shardCount == rhs.shardCount;
}
ShardMetrics(StorageMetrics const& metrics, double lastLowBandwidthStartTime, int shardCount)
: metrics(metrics), lastLowBandwidthStartTime(lastLowBandwidthStartTime), shardCount(shardCount) {}
};
struct ShardTrackedData {
Future<Void> trackShard;
Future<Void> trackBytes;
Reference<AsyncVar<Optional<ShardMetrics>>> stats;
};
Future<Void> dataDistributionTracker(
Reference<InitialDataDistribution> const& initData,
Database const& cx,
@ -221,7 +241,8 @@ Future<Void> dataDistributionTracker(
FutureStream<Promise<int64_t>> const& getAverageShardBytes,
Promise<Void> const& readyToStart,
Reference<AsyncVar<bool>> const& zeroHealthyTeams,
UID const& distributorId);
UID const& distributorId,
KeyRangeMap<ShardTrackedData>* const& shards);
Future<Void> dataDistributionQueue(
Database const& cx,

View File

@ -95,7 +95,7 @@ class ParallelTCInfo final : public ReferenceCounted<ParallelTCInfo>, public IDa
template <class T>
vector<T> collect(std::function<vector<T>(IDataDistributionTeam const&)> func) const {
vector<T> result(teams.size());
vector<T> result;
for (const auto& team : teams) {
vector<T> newItems = func(*team);

View File

@ -35,18 +35,6 @@ enum BandwidthStatus {
enum ReadBandwidthStatus { ReadBandwidthStatusNormal, ReadBandwidthStatusHigh };
struct ShardMetrics {
StorageMetrics metrics;
double lastLowBandwidthStartTime;
int shardCount;
bool operator == ( ShardMetrics const& rhs ) const {
return metrics == rhs.metrics && lastLowBandwidthStartTime == rhs.lastLowBandwidthStartTime && shardCount == rhs.shardCount;
}
ShardMetrics(StorageMetrics const& metrics, double lastLowBandwidthStartTime, int shardCount) : metrics(metrics), lastLowBandwidthStartTime(lastLowBandwidthStartTime), shardCount(shardCount) {}
};
BandwidthStatus getBandwidthStatus( StorageMetrics const& metrics ) {
if( metrics.bytesPerKSecond > SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC )
return BandwidthStatusHigh;
@ -81,16 +69,11 @@ ACTOR Future<Void> updateMaxShardSize( Reference<AsyncVar<int64_t>> dbSizeEstima
}
}
struct ShardTrackedData {
Future<Void> trackShard;
Future<Void> trackBytes;
Reference<AsyncVar<Optional<ShardMetrics>>> stats;
};
struct DataDistributionTracker {
Database cx;
UID distributorId;
KeyRangeMap< ShardTrackedData > shards;
KeyRangeMap<ShardTrackedData>& shards;
ActorCollection sizeChanges;
int64_t systemSizeEstimate;
@ -108,16 +91,19 @@ struct DataDistributionTracker {
// Read hot detection
PromiseStream<KeyRange> readHotShard;
DataDistributionTracker(Database cx, UID distributorId, Promise<Void> const& readyToStart, PromiseStream<RelocateShard> const& output, Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure, Reference<AsyncVar<bool>> anyZeroHealthyTeams)
: cx(cx), distributorId( distributorId ), dbSizeEstimate( new AsyncVar<int64_t>() ), systemSizeEstimate(0),
maxShardSize( new AsyncVar<Optional<int64_t>>() ),
sizeChanges(false), readyToStart(readyToStart), output( output ), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), anyZeroHealthyTeams(anyZeroHealthyTeams) {}
DataDistributionTracker(Database cx, UID distributorId, Promise<Void> const& readyToStart,
PromiseStream<RelocateShard> const& output,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
Reference<AsyncVar<bool>> anyZeroHealthyTeams, KeyRangeMap<ShardTrackedData>& shards)
: cx(cx), distributorId(distributorId), dbSizeEstimate(new AsyncVar<int64_t>()), systemSizeEstimate(0),
maxShardSize(new AsyncVar<Optional<int64_t>>()), sizeChanges(false), readyToStart(readyToStart), output(output),
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), anyZeroHealthyTeams(anyZeroHealthyTeams),
shards(shards) {}
~DataDistributionTracker()
{
//Cancel all actors so they aren't waiting on sizeChanged broken promise
sizeChanges.clear(false);
shards.insert( allKeys, ShardTrackedData() );
}
};
@ -363,7 +349,7 @@ ACTOR Future<int64_t> getFirstSize( Reference<AsyncVar<Optional<ShardMetrics>>>
ACTOR Future<Void> changeSizes( DataDistributionTracker* self, KeyRange keys, int64_t oldShardsEndingSize ) {
state vector<Future<int64_t>> sizes;
state vector<Future<int64_t>> systemSizes;
for (auto it : self->shards.intersectingRanges(keys) ) {
for (auto it : self->shards.intersectingRanges(keys)) {
Future<int64_t> thisSize = getFirstSize( it->value().stats );
sizes.push_back( thisSize );
if(it->range().begin >= systemKeys.begin) {
@ -722,7 +708,7 @@ ACTOR Future<Void> shardTracker(
}
void restartShardTrackers(DataDistributionTracker* self, KeyRangeRef keys, Optional<ShardMetrics> startingMetrics) {
auto ranges = self->shards.getAffectedRangesAfterInsertion( keys, ShardTrackedData() );
auto ranges = self->shards.getAffectedRangesAfterInsertion(keys, ShardTrackedData());
for(int i=0; i<ranges.size(); i++) {
if( !ranges[i].value.trackShard.isValid() && ranges[i].begin != keys.begin ) {
// When starting, key space will be full of "dummy" default contructed entries.
@ -780,7 +766,7 @@ ACTOR Future<Void> fetchShardMetrics_impl( DataDistributionTracker* self, GetMet
loop {
Future<Void> onChange;
StorageMetrics returnMetrics;
for( auto t : self->shards.intersectingRanges( req.keys ) ) {
for (auto t : self->shards.intersectingRanges(req.keys)) {
auto &stats = t.value().stats;
if( !stats->get().present() ) {
onChange = stats->onChange();
@ -816,7 +802,6 @@ ACTOR Future<Void> fetchShardMetrics( DataDistributionTracker* self, GetMetricsR
return Void();
}
ACTOR Future<Void> fetchShardMetricsList_impl( DataDistributionTracker* self, GetMetricsListRequest req ) {
try {
loop {
@ -865,19 +850,16 @@ ACTOR Future<Void> fetchShardMetricsList( DataDistributionTracker* self, GetMetr
return Void();
}
ACTOR Future<Void> dataDistributionTracker(
Reference<InitialDataDistribution> initData,
Database cx,
PromiseStream<RelocateShard> output,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> getShardMetrics,
PromiseStream<GetMetricsListRequest> getShardMetricsList,
FutureStream<Promise<int64_t>> getAverageShardBytes,
Promise<Void> readyToStart,
Reference<AsyncVar<bool>> anyZeroHealthyTeams,
UID distributorId)
{
state DataDistributionTracker self(cx, distributorId, readyToStart, output, shardsAffectedByTeamFailure, anyZeroHealthyTeams);
ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> initData, Database cx,
PromiseStream<RelocateShard> output,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> getShardMetrics,
PromiseStream<GetMetricsListRequest> getShardMetricsList,
FutureStream<Promise<int64_t>> getAverageShardBytes,
Promise<Void> readyToStart, Reference<AsyncVar<bool>> anyZeroHealthyTeams,
UID distributorId, KeyRangeMap<ShardTrackedData>* shards) {
state DataDistributionTracker self(cx, distributorId, readyToStart, output, shardsAffectedByTeamFailure,
anyZeroHealthyTeams, *shards);
state Future<Void> loggingTrigger = Void();
state Future<Void> readHotDetect = readHotDetector(&self);
try {
@ -890,10 +872,10 @@ ACTOR Future<Void> dataDistributionTracker(
}
when( wait( loggingTrigger ) ) {
TraceEvent("DDTrackerStats", self.distributorId)
.detail("Shards", self.shards.size())
.detail("TotalSizeBytes", self.dbSizeEstimate->get())
.detail("SystemSizeBytes", self.systemSizeEstimate)
.trackLatest( "DDTrackerStats" );
.detail("Shards", self.shards.size())
.detail("TotalSizeBytes", self.dbSizeEstimate->get())
.detail("SystemSizeBytes", self.systemSizeEstimate)
.trackLatest("DDTrackerStats");
loggingTrigger = delay(SERVER_KNOBS->DATA_DISTRIBUTION_LOGGING_INTERVAL, TaskPriority::FlushTrace);
}

View File

@ -42,8 +42,35 @@ struct GrvProxyStats {
Future<Void> logger;
int recentRequests;
Deque<int> requestBuckets;
double lastBucketBegin;
double bucketInterval;
void updateRequestBuckets() {
while(now() - lastBucketBegin > bucketInterval) {
lastBucketBegin += bucketInterval;
recentRequests -= requestBuckets.front();
requestBuckets.pop_front();
requestBuckets.push_back(0);
}
}
void addRequest(int transactionCount) {
updateRequestBuckets();
recentRequests += transactionCount;
requestBuckets.back() += transactionCount;
}
int getRecentRequests() {
updateRequestBuckets();
return recentRequests/(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE-(lastBucketBegin+bucketInterval-now()));
}
explicit GrvProxyStats(UID id)
: cc("GrvProxyStats", id.toString()), txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc),
: cc("GrvProxyStats", id.toString()), recentRequests(0), lastBucketBegin(now()),
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc),
txnRequestErrors("TxnRequestErrors", cc), txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc),
txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc),
txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc),
@ -55,6 +82,9 @@ struct GrvProxyStats {
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
grvLatencyBands("GRVLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
logger = traceCounters("GrvProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "GrvProxyMetrics");
for(int i = 0; i < FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS; i++) {
requestBuckets.push_back(0);
}
}
};
@ -280,6 +310,7 @@ ACTOR Future<Void> queueGetReadVersionRequests(
req.reply.send(rep);
TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceeded").suppressFor(60);
} else {
stats->addRequest(req.transactionCount);
// TODO: check whether this is reasonable to do in the fast path
for(auto tag : req.tags) {
(*transactionTagCounter)[tag.first] += tag.second;
@ -389,8 +420,13 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan, Grv
rep.version = repFromMaster.version;
rep.locked = repFromMaster.locked;
rep.metadataVersion = repFromMaster.metadataVersion;
rep.processBusyTime = 1e6 * (g_network->isSimulated() ? deterministicRandom()->random01() : g_network->networkInfo.metrics.lastRunLoopBusyness);
rep.processBusyTime =
FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION *
std::min((std::numeric_limits<int>::max() / FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION) - 1,
grvProxyData->stats.getRecentRequests());
rep.processBusyTime += FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION *
(g_network->isSimulated() ? deterministicRandom()->random01()
: g_network->networkInfo.metrics.lastRunLoopBusyness);
if (debugID.present()) {
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "GrvProxyServer.getLiveCommittedVersion.After");

View File

@ -348,7 +348,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( COMMIT_TRANSACTION_BATCH_COUNT_MAX, 32768 ); if( randomize && BUGGIFY ) COMMIT_TRANSACTION_BATCH_COUNT_MAX = 1000; // Do NOT increase this number beyond 32768, as CommitIds only budget 2 bytes for storing transaction id within each batch
init( COMMIT_BATCHES_MEM_BYTES_HARD_LIMIT, 8LL << 30 ); if (randomize && BUGGIFY) COMMIT_BATCHES_MEM_BYTES_HARD_LIMIT = deterministicRandom()->randomInt64(100LL << 20, 8LL << 30);
init( COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL, 0.5 );
init( COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR, 10.0 );
init( COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR, 5.0 );
// these settings disable batch bytes scaling. Try COMMIT_TRANSACTION_BATCH_BYTES_MAX=1e6, COMMIT_TRANSACTION_BATCH_BYTES_SCALE_BASE=50000, COMMIT_TRANSACTION_BATCH_BYTES_SCALE_POWER=0.5?
init( COMMIT_TRANSACTION_BATCH_BYTES_MIN, 100000 );
@ -368,13 +368,18 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( REQUIRED_MIN_RECOVERY_DURATION, 0.080 ); if( shortRecoveryDuration ) REQUIRED_MIN_RECOVERY_DURATION = 0.01;
init( ALWAYS_CAUSAL_READ_RISKY, false );
init( MAX_COMMIT_UPDATES, 2000 ); if( randomize && BUGGIFY ) MAX_COMMIT_UPDATES = 1;
init( MIN_PROXY_COMPUTE, 0.001 );
init( MAX_PROXY_COMPUTE, 2.0 );
init( MAX_COMPUTE_PER_OPERATION, 0.1 );
init( PROXY_COMPUTE_BUCKETS, 20000 );
init( PROXY_COMPUTE_GROWTH_RATE, 0.01 );
init( TXN_STATE_SEND_AMOUNT, 4 );
init( REPORT_TRANSACTION_COST_ESTIMATION_DELAY, 0.1 );
init( RESET_MASTER_BATCHES, 200 );
init( RESET_RESOLVER_BATCHES, 200 );
init( RESET_MASTER_DELAY, 300.0 );
init( RESET_RESOLVER_DELAY, 300.0 );
// Master Server
// masterCommitter() in the master server will allow lower priority tasks (e.g. DataDistibution)
// by delay()ing for this amount of time between accepted batches of TransactionRequests.

View File

@ -299,13 +299,18 @@ public:
double REQUIRED_MIN_RECOVERY_DURATION;
bool ALWAYS_CAUSAL_READ_RISKY;
int MAX_COMMIT_UPDATES;
double MIN_PROXY_COMPUTE;
double MAX_PROXY_COMPUTE;
double MAX_COMPUTE_PER_OPERATION;
int PROXY_COMPUTE_BUCKETS;
double PROXY_COMPUTE_GROWTH_RATE;
int TXN_STATE_SEND_AMOUNT;
double REPORT_TRANSACTION_COST_ESTIMATION_DELAY;
int RESET_MASTER_BATCHES;
int RESET_RESOLVER_BATCHES;
double RESET_MASTER_DELAY;
double RESET_RESOLVER_DELAY;
// Master Server
double COMMIT_SLEEP_TIME;
double MIN_BALANCE_TIME;

View File

@ -65,9 +65,24 @@ struct ProxyStats {
Future<Void> logger;
int64_t maxComputeNS;
int64_t minComputeNS;
int64_t getAndResetMaxCompute() {
int64_t r = maxComputeNS;
maxComputeNS = 0;
return r;
}
int64_t getAndResetMinCompute() {
int64_t r = minComputeNS;
minComputeNS = 1e12;
return r;
}
explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion,
int64_t* commitBatchesMemBytesCountPtr)
: cc("ProxyStats", id.toString()),
: cc("ProxyStats", id.toString()), maxComputeNS(0), minComputeNS(1e12),
txnCommitIn("TxnCommitIn", cc), txnCommitVersionAssigned("TxnCommitVersionAssigned", cc),
txnCommitResolving("TxnCommitResolving", cc), txnCommitResolved("TxnCommitResolved", cc),
txnCommitOut("TxnCommitOut", cc), txnCommitOutSuccess("TxnCommitOutSuccess", cc),
@ -84,6 +99,8 @@ struct ProxyStats {
specialCounter(cc, "CommittedVersion", [pCommittedVersion]() { return pCommittedVersion->get(); });
specialCounter(cc, "CommitBatchesMemBytesCount",
[commitBatchesMemBytesCountPtr]() { return *commitBatchesMemBytesCountPtr; });
specialCounter(cc, "MaxCompute", [this](){ return this->getAndResetMaxCompute(); });
specialCounter(cc, "MinCompute", [this](){ return this->getAndResetMinCompute(); });
logger = traceCounters("ProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ProxyMetrics");
}
};
@ -142,6 +159,8 @@ struct ProxyCommitData {
vector<double> commitComputePerOperation;
UIDTransactionTagMap<TransactionCommitCostEstimation> ssTrTagCommitCost;
double lastMasterReset;
double lastResolverReset;
// The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly
// more CPU efficient. When a tag related to a storage server does change, we empty out all of these vectors to
@ -205,7 +224,8 @@ struct ProxyCommitData {
commitBatchInterval(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN), firstProxy(firstProxy),
cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true)), db(db),
singleKeyMutationEvent(LiteralStringRef("SingleKeyMutation")), commitBatchesMemBytesCount(0), lastTxsPop(0),
lastStartCommit(0), lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION), lastCommitTime(0) {
lastStartCommit(0), lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION),
lastCommitTime(0), lastMasterReset(now()), lastResolverReset(now()) {
commitComputePerOperation.resize(SERVER_KNOBS->PROXY_COMPUTE_BUCKETS, 0.0);
}
};

View File

@ -1472,10 +1472,11 @@ public:
// If the page is still being read then it's not also being written because a write places
// the new content into readFuture when the write is launched, not when it is completed.
// Read/write ordering is being enforced waiting readers will not see the new write. This
// Read/write ordering is being enforced so waiting readers will not see the new write. This
// is necessary for remap erasure to work correctly since the oldest version of a page, located
// at the original page ID, could have a pending read when that version is expired and the write
// of the next newest version over top of the original page begins.
// at the original page ID, could have a pending read when that version is expired (after which
// future reads of the version are not allowed) and the write of the next newest version over top
// of the original page begins.
if (!cacheEntry.initialized()) {
cacheEntry.writeFuture = writePhysicalPage(pageID, data);
} else if (cacheEntry.reading()) {
@ -1718,7 +1719,7 @@ public:
secondType = RemappedPage::NONE;
} else {
secondType = RemappedPage::getTypeOf(nextEntry->second);
secondAfterOldestRetainedVersion = nextEntry->first >= oldestRetainedVersion;
secondAfterOldestRetainedVersion = nextEntry->first > oldestRetainedVersion;
}
} else {
ASSERT(iVersionPagePair->second == invalidLogicalPageID);
@ -1767,6 +1768,10 @@ public:
self->filename.c_str(), p.toString().c_str(), secondType, ::toString(*iVersionPagePair).c_str(), oldestRetainedVersion);
if(copyNewToOriginal) {
if(g_network->isSimulated()) {
ASSERT(self->remapDestinationsSimOnly.count(p.originalPageID) == 0);
self->remapDestinationsSimOnly.insert(p.originalPageID);
}
debug_printf("DWALPager(%s) remapCleanup copy %s\n", self->filename.c_str(), p.toString().c_str());
// Read the data from the page that the original was mapped to
@ -1829,7 +1834,8 @@ public:
state RemappedPage cutoff(oldestRetainedVersion - self->remapCleanupWindow);
// Minimum version we must pop to before obeying stop command.
state Version minStopVersion = cutoff.version - (self->remapCleanupWindow * SERVER_KNOBS->REDWOOD_REMAP_CLEANUP_LAG);
state Version minStopVersion = cutoff.version - (BUGGIFY ? deterministicRandom()->randomInt(0, 10) : (self->remapCleanupWindow * SERVER_KNOBS->REDWOOD_REMAP_CLEANUP_LAG));
self->remapDestinationsSimOnly.clear();
loop {
state Optional<RemappedPage> p = wait(self->remapQueue.pop(cutoff));
@ -2142,6 +2148,7 @@ private:
RemapQueueT remapQueue;
Version remapCleanupWindow;
std::unordered_set<PhysicalPageID> remapDestinationsSimOnly;
struct SnapshotEntry {
Version version;
@ -5683,12 +5690,13 @@ public:
: m_filePrefix(filePrefix), m_concurrentReads(new FlowLock(SERVER_KNOBS->REDWOOD_KVSTORE_CONCURRENT_READS)) {
// TODO: This constructor should really just take an IVersionedStore
int pageSize = BUGGIFY ? deterministicRandom()->randomInt(1000, 4096*4) : SERVER_KNOBS->REDWOOD_DEFAULT_PAGE_SIZE;
int64_t pageCacheBytes = g_network->isSimulated()
? (BUGGIFY ? FLOW_KNOBS->BUGGIFY_SIM_PAGE_CACHE_4K : FLOW_KNOBS->SIM_PAGE_CACHE_4K)
? (BUGGIFY ? deterministicRandom()->randomInt(pageSize, FLOW_KNOBS->BUGGIFY_SIM_PAGE_CACHE_4K) : FLOW_KNOBS->SIM_PAGE_CACHE_4K)
: FLOW_KNOBS->PAGE_CACHE_4K;
Version remapCleanupWindow = BUGGIFY ? deterministicRandom()->randomInt64(0, 1000) : SERVER_KNOBS->REDWOOD_REMAP_CLEANUP_WINDOW;
IPager2* pager = new DWALPager(SERVER_KNOBS->REDWOOD_DEFAULT_PAGE_SIZE, filePrefix, pageCacheBytes, remapCleanupWindow);
IPager2* pager = new DWALPager(pageSize, filePrefix, pageCacheBytes, remapCleanupWindow);
m_tree = new VersionedBTree(pager, filePrefix);
m_init = catchError(init_impl(this));
}
@ -7235,7 +7243,7 @@ TEST_CASE("!/redwood/correctness/btree") {
shortTest ? 200 : (deterministicRandom()->coinflip() ? 4096 : deterministicRandom()->randomInt(200, 400));
state int64_t targetPageOps = shortTest ? 50000 : 1000000;
state bool pagerMemoryOnly = shortTest && (deterministicRandom()->random01() < .01);
state bool pagerMemoryOnly = shortTest && (deterministicRandom()->random01() < .001);
state int maxKeySize = deterministicRandom()->randomInt(1, pageSize * 2);
state int maxValueSize = randomSize(pageSize * 25);
state int maxCommitSize = shortTest ? 1000 : randomSize(std::min<int>((maxKeySize + maxValueSize) * 20000, 10e6));
@ -7244,10 +7252,9 @@ TEST_CASE("!/redwood/correctness/btree") {
state double clearPostSetProbability = deterministicRandom()->random01() * .1;
state double coldStartProbability = pagerMemoryOnly ? 0 : (deterministicRandom()->random01() * 0.3);
state double advanceOldVersionProbability = deterministicRandom()->random01();
state int64_t cacheSizeBytes =
pagerMemoryOnly ? 2e9 : (BUGGIFY ? deterministicRandom()->randomInt(1, 10 * pageSize) : 0);
state int64_t cacheSizeBytes = pagerMemoryOnly ? 2e9 : (pageSize * deterministicRandom()->randomInt(1, (BUGGIFY ? 2 : 10000) + 1));
state Version versionIncrement = deterministicRandom()->randomInt64(1, 1e8);
state Version remapCleanupWindow = deterministicRandom()->randomInt64(0, versionIncrement * 50);
state Version remapCleanupWindow = BUGGIFY ? 0 : deterministicRandom()->randomInt64(1, versionIncrement * 50);
state int maxVerificationMapEntries = 300e3;
printf("\n");

View File

@ -4003,7 +4003,7 @@ ACTOR Future<Void> replaceInterface( StorageServer* self, StorageServerInterface
loop {
state Future<Void> infoChanged = self->db->onChange();
state Reference<CommitProxyInfo> commitProxies(new CommitProxyInfo(self->db->get().client.commitProxies));
state Reference<CommitProxyInfo> commitProxies(new CommitProxyInfo(self->db->get().client.commitProxies, false));
choose {
when(GetStorageServerRejoinInfoReply _rep =
wait(commitProxies->size()

View File

@ -423,7 +423,11 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
for (size_t j = 0; j < key_size; ++j)
skey.append(1, (char) deterministicRandom()->randomInt(0, 256));
return Key(skey);
// 15% (= 20% * 75%) of the time generating keys after \xff\xff to test special keys code
if (deterministicRandom()->random01() < 0.2)
return Key(skey).withPrefix(specialKeys.begin);
else
return Key(skey);
}
static Value makeValue() {
@ -672,7 +676,8 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
limit = deterministicRandom()->randomInt(0, INT_MAX)+1;
}
bool isSpecialKeyRange = specialKeys.contains(keysel1.getKey()) && keysel2.getKey() <= specialKeys.end;
bool isSpecialKeyRange = specialKeys.contains(keysel1.getKey()) && specialKeys.begin <= keysel2.getKey() &&
keysel2.getKey() <= specialKeys.end;
contract = {
std::make_pair(error_code_range_limits_invalid, ExceptionContract::possibleButRequiredIf(limit < 0)),
@ -710,7 +715,8 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
keysel2 = makeKeySel();
limits = makeRangeLimits();
bool isSpecialKeyRange = specialKeys.contains(keysel1.getKey()) && keysel2.getKey() <= specialKeys.end;
bool isSpecialKeyRange = specialKeys.contains(keysel1.getKey()) && specialKeys.begin <= keysel2.getKey() &&
keysel2.getKey() <= specialKeys.end;
contract = {
std::make_pair(error_code_range_limits_invalid,
@ -760,7 +766,7 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
limit = deterministicRandom()->randomInt(0, INT_MAX)+1;
}
bool isSpecialKeyRange = specialKeys.contains(key1) && key2 <= specialKeys.end;
bool isSpecialKeyRange = specialKeys.contains(key1) && specialKeys.begin <= key2 && key2 <= specialKeys.end;
contract = {
std::make_pair(error_code_inverted_range, ExceptionContract::requiredIf(key1 > key2)),
@ -800,7 +806,7 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
key2 = makeKey();
limits = makeRangeLimits();
bool isSpecialKeyRange = specialKeys.contains(key1) && key2 <= specialKeys.end;
bool isSpecialKeyRange = specialKeys.contains(key1) && specialKeys.begin <= key2 && key2 <= specialKeys.end;
contract = {
std::make_pair(error_code_inverted_range, ExceptionContract::requiredIf(key1 > key2)),

View File

@ -420,6 +420,18 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
ASSERT(e.code() == error_code_special_keys_cross_module_clear);
tx->reset();
}
// base key of the end key selector not in (\xff\xff, \xff\xff\xff), throw key_outside_legal_range()
try {
const KeySelector startKeySelector = KeySelectorRef(LiteralStringRef("\xff\xff/test"), true, -200);
const KeySelector endKeySelector = KeySelectorRef(LiteralStringRef("test"), true, -10);
Standalone<RangeResultRef> result =
wait(tx->getRange(startKeySelector, endKeySelector, GetRangeLimits(CLIENT_KNOBS->TOO_MANY)));
ASSERT(false);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) throw;
ASSERT(e.code() == error_code_key_outside_legal_range);
tx->reset();
}
return Void();
}

View File

@ -162,7 +162,17 @@ struct VersionStampWorkload : TestWorkload {
state ReadYourWritesTransaction tr(cx);
// We specifically wish to grab the smalles read version that we can get and maintain it, to
// have the strictest check we can on versionstamps monotonically increasing.
state Version readVersion = wait(tr.getReadVersion());
state Version readVersion;
loop {
try {
Version _readVersion = wait(tr.getReadVersion());
readVersion = _readVersion;
break;
}
catch(Error &e) {
wait(tr.onError(e));
}
}
if(BUGGIFY) {
if(deterministicRandom()->random01() < 0.5) {

View File

@ -562,7 +562,7 @@ public:
memcpy(dst, data, length);
return dst + length;
}
std::vector<StringRef> splitAny(StringRef sep) const {
StringRef r = *this;
std::vector<StringRef> tokens;

View File

@ -503,6 +503,8 @@ public:
Map(Map&& r) noexcept : set(std::move(r.set)) {}
void operator=(Map&& r) noexcept { set = std::move(r.set); }
Future<Void> clearAsync();
private:
Map( Map<Key,Value,Pair> const& ); // unimplemented
void operator=( Map<Key,Value,Pair> const& ); // unimplemented
@ -1311,4 +1313,9 @@ Future<Void> IndexedSet<T, Metric>::eraseAsync(typename IndexedSet<T,Metric>::it
return uncancellable(ISFreeNodes(toFree, false));
}
template <class Key, class Value, class Pair, class Metric>
Future<Void> Map<Key, Value, Pair, Metric>::clearAsync() {
return set.eraseAsync(set.begin(), set.end());
}
#endif

View File

@ -84,6 +84,8 @@ void FlowKnobs::initialize(bool randomize, bool isSimulated) {
init( TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT, 20.0 );
init( PEER_UNAVAILABLE_FOR_LONG_TIME_TIMEOUT, 3600.0 );
init( INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING, 5.0 );
init( PING_LOGGING_INTERVAL, 3.0 );
init( PING_SAMPLE_AMOUNT, 100 );
init( TLS_CERT_REFRESH_DELAY_SECONDS, 12*60*60 );
init( TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT, 9.0 );
@ -216,7 +218,10 @@ void FlowKnobs::initialize(bool randomize, bool isSimulated) {
init( BASIC_LOAD_BALANCE_UPDATE_RATE, 10.0 ); //should be longer than the rate we log network metrics
init( BASIC_LOAD_BALANCE_MAX_CHANGE, 0.10 );
init( BASIC_LOAD_BALANCE_MAX_PROB, 2.0 );
init( BASIC_LOAD_BALANCE_MIN_AMOUNT, 50000 ); //Will not update probabilities if the average proxy busyness is less than 5%
init( BASIC_LOAD_BALANCE_MIN_REQUESTS, 20 ); //do not adjust LB probabilities if the proxies are less than releasing less than 20 transactions per second
init( BASIC_LOAD_BALANCE_MIN_CPU, 0.05 ); //do not adjust LB probabilities if the proxies are less than 5% utilized
init( BASIC_LOAD_BALANCE_BUCKETS, 40 ); //proxies bin recent GRV requests into 40 time bins
init( BASIC_LOAD_BALANCE_COMPUTE_PRECISION, 10000 ); //determines how much of the LB usage is holding the CPU usage of the proxy
// Health Monitor
init( FAILURE_DETECTION_DELAY, 4.0 ); if( randomize && BUGGIFY ) FAILURE_DETECTION_DELAY = 1.0;

View File

@ -98,6 +98,8 @@ public:
double ALWAYS_ACCEPT_DELAY;
int ACCEPT_BATCH_SIZE;
double INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING;
double PING_LOGGING_INTERVAL;
int PING_SAMPLE_AMOUNT;
int TLS_CERT_REFRESH_DELAY_SECONDS;
double TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT;
@ -235,7 +237,10 @@ public:
double BASIC_LOAD_BALANCE_UPDATE_RATE;
double BASIC_LOAD_BALANCE_MAX_CHANGE;
double BASIC_LOAD_BALANCE_MAX_PROB;
double BASIC_LOAD_BALANCE_MIN_AMOUNT;
int BASIC_LOAD_BALANCE_BUCKETS;
int BASIC_LOAD_BALANCE_COMPUTE_PRECISION;
double BASIC_LOAD_BALANCE_MIN_REQUESTS;
double BASIC_LOAD_BALANCE_MIN_CPU;
// Health Monitor
int FAILURE_DETECTION_DELAY;

View File

@ -629,30 +629,10 @@ public:
init();
}
ACTOR static void doAcceptHandshake( Reference<SSLConnection> self, Promise<Void> connected) {
state std::pair<IPAddress,uint16_t> peerIP;
ACTOR static void doAcceptHandshake( Reference<SSLConnection> self, Promise<Void> connected ) {
state Hold<int> holder;
try {
peerIP = std::make_pair(self->getPeerAddress().ip, static_cast<uint16_t>(0));
auto iter(g_network->networkInfo.serverTLSConnectionThrottler.find(peerIP));
if(iter != g_network->networkInfo.serverTLSConnectionThrottler.end()) {
if (now() < iter->second.second) {
if(iter->second.first >= FLOW_KNOBS->TLS_SERVER_CONNECTION_THROTTLE_ATTEMPTS) {
TraceEvent("TLSIncomingConnectionThrottlingWarning").suppressFor(1.0).detail("PeerIP", peerIP.first.toString());
wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT));
self->closeSocket();
connected.sendError(connection_failed());
return;
}
} else {
g_network->networkInfo.serverTLSConnectionThrottler.erase(peerIP);
}
}
wait(g_network->networkInfo.handshakeLock->take());
state FlowLock::Releaser releaser(*g_network->networkInfo.handshakeLock);
Future<Void> onHandshook;
// If the background handshakers are not all busy, use one
@ -672,24 +652,51 @@ public:
wait(delay(0, TaskPriority::Handshake));
connected.send(Void());
} catch (...) {
auto iter(g_network->networkInfo.serverTLSConnectionThrottler.find(peerIP));
if(iter != g_network->networkInfo.serverTLSConnectionThrottler.end()) {
iter->second.first++;
} else {
g_network->networkInfo.serverTLSConnectionThrottler[peerIP] = std::make_pair(0,now() + FLOW_KNOBS->TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT);
}
self->closeSocket();
connected.sendError(connection_failed());
}
}
ACTOR static Future<Void> acceptHandshakeWrapper( Reference<SSLConnection> self ) {
state std::pair<IPAddress,uint16_t> peerIP;
peerIP = std::make_pair(self->getPeerAddress().ip, static_cast<uint16_t>(0));
auto iter(g_network->networkInfo.serverTLSConnectionThrottler.find(peerIP));
if(iter != g_network->networkInfo.serverTLSConnectionThrottler.end()) {
if (now() < iter->second.second) {
if(iter->second.first >= FLOW_KNOBS->TLS_SERVER_CONNECTION_THROTTLE_ATTEMPTS) {
TraceEvent("TLSIncomingConnectionThrottlingWarning").suppressFor(1.0).detail("PeerIP", peerIP.first.toString());
wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT));
self->closeSocket();
throw connection_failed();
}
} else {
g_network->networkInfo.serverTLSConnectionThrottler.erase(peerIP);
}
}
wait(g_network->networkInfo.handshakeLock->take());
state FlowLock::Releaser releaser(*g_network->networkInfo.handshakeLock);
Promise<Void> connected;
doAcceptHandshake(self, connected);
try {
wait(connected.getFuture());
return Void();
choose {
when(wait(connected.getFuture())) {
return Void();
}
when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) {
throw connection_failed();
}
}
} catch (Error& e) {
if(e.code() != error_code_actor_cancelled) {
auto iter(g_network->networkInfo.serverTLSConnectionThrottler.find(peerIP));
if(iter != g_network->networkInfo.serverTLSConnectionThrottler.end()) {
iter->second.first++;
} else {
g_network->networkInfo.serverTLSConnectionThrottler[peerIP] = std::make_pair(0,now() + FLOW_KNOBS->TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT);
}
}
// Either the connection failed, or was cancelled by the caller
self->closeSocket();
throw;
@ -704,9 +711,6 @@ public:
state Hold<int> holder;
try {
wait(g_network->networkInfo.handshakeLock->take());
state FlowLock::Releaser releaser(*g_network->networkInfo.handshakeLock);
Future<Void> onHandshook;
// If the background handshakers are not all busy, use one
if(N2::g_net2->sslPoolHandshakesInProgress < N2::g_net2->sslHandshakerThreadsStarted) {
@ -725,26 +729,37 @@ public:
wait(delay(0, TaskPriority::Handshake));
connected.send(Void());
} catch (...) {
std::pair<IPAddress,uint16_t> peerIP = std::make_pair(self->peer_address.ip, self->peer_address.port);
auto iter(g_network->networkInfo.serverTLSConnectionThrottler.find(peerIP));
if(iter != g_network->networkInfo.serverTLSConnectionThrottler.end()) {
iter->second.first++;
} else {
g_network->networkInfo.serverTLSConnectionThrottler[peerIP] = std::make_pair(0,now() + FLOW_KNOBS->TLS_CLIENT_CONNECTION_THROTTLE_TIMEOUT);
}
self->closeSocket();
connected.sendError(connection_failed());
}
}
ACTOR static Future<Void> connectHandshakeWrapper( Reference<SSLConnection> self ) {
wait(g_network->networkInfo.handshakeLock->take());
state FlowLock::Releaser releaser(*g_network->networkInfo.handshakeLock);
Promise<Void> connected;
doConnectHandshake(self, connected);
try {
wait(connected.getFuture());
return Void();
choose {
when(wait(connected.getFuture())) {
return Void();
}
when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) {
throw connection_failed();
}
}
} catch (Error& e) {
// Either the connection failed, or was cancelled by the caller
if(e.code() != error_code_actor_cancelled) {
std::pair<IPAddress,uint16_t> peerIP = std::make_pair(self->peer_address.ip, self->peer_address.port);
auto iter(g_network->networkInfo.serverTLSConnectionThrottler.find(peerIP));
if(iter != g_network->networkInfo.serverTLSConnectionThrottler.end()) {
iter->second.first++;
} else {
g_network->networkInfo.serverTLSConnectionThrottler[peerIP] = std::make_pair(0,now() + FLOW_KNOBS->TLS_CLIENT_CONNECTION_THROTTLE_TIMEOUT);
}
}
self->closeSocket();
throw;
}

View File

@ -163,6 +163,7 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta
}
for (auto const& itr : loggedDurations) {
// PriorityBusyX measures the amount of time spent busy at exactly priority X
n.detail(format("PriorityBusy%d", itr.first).c_str(), itr.second);
}
@ -174,6 +175,9 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta
itr.windowedTimer = now();
}
// PriorityStarvedBelowX: how much of the elapsed time we were running tasks at a priority at or above X
// PriorityMaxStarvedBelowX: The longest single span of time that you were starved below that priority,
// which could tell you if you are doing work in bursts.
n.detail(format("PriorityStarvedBelow%d", itr.priority).c_str(), std::min(currentStats.elapsed, itr.duration));
n.detail(format("PriorityMaxStarvedBelow%d", itr.priority).c_str(), itr.maxDuration);

View File

@ -347,7 +347,7 @@ struct NetworkMetrics {
static const std::vector<int> starvationBins;
NetworkMetrics() {
NetworkMetrics() : lastRunLoopBusyness(0) {
for(int priority : starvationBins) {
starvationTrackers.emplace_back(static_cast<TaskPriority>(priority));
}

View File

@ -32,7 +32,7 @@
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
<Product Name='$(var.Title)'
Id='{0AB36B0F-2187-4ECD-9E7E-983EDD966CEB}'
Id='{4F01CFD6-596A-4224-BF7D-4AEF30BA2083}'
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
Version='$(var.Version)'
Manufacturer='$(var.Manufacturer)'