Merge remote-tracking branch 'origin/master' into remove-global-ddenabled-flag

This commit is contained in:
sfc-gh-tclinkenbeard 2020-10-21 18:22:08 -07:00
commit e0b1f95740
63 changed files with 1000 additions and 577 deletions

View File

@ -7,7 +7,7 @@ ADD artifacts /mnt/artifacts
# Install build tools for building via make
RUN \
yum install -y distcc-server gperf rubygems python34 libmpc-devel npm cgdb
yum install -y distcc-server gperf rubygems python34 libmpc-devel npm cgdb jq
# Download and install llvm-10.0.0
RUN cd / &&\
@ -50,8 +50,8 @@ RUN cp -iv /usr/local/bin/clang++ /usr/local/bin/clang++.deref &&\
ldconfig &&\
rm -rf /mnt/artifacts
LABEL version=0.11.8
ENV DOCKER_IMAGEVER=0.11.8
LABEL version=0.11.9
ENV DOCKER_IMAGEVER=0.11.9
ENV CLANGCC=/usr/local/bin/clang.de8a65ef
ENV CLANGCXX=/usr/local/bin/clang++.de8a65ef

View File

@ -10,38 +10,38 @@ macOS
The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server.
* `FoundationDB-6.3.8.pkg <https://www.foundationdb.org/downloads/6.3.8/macOS/installers/FoundationDB-6.3.8.pkg>`_
* `FoundationDB-6.3.9.pkg <https://www.foundationdb.org/downloads/6.3.9/macOS/installers/FoundationDB-6.3.9.pkg>`_
Ubuntu
------
The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x.
* `foundationdb-clients-6.3.8-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.8/ubuntu/installers/foundationdb-clients_6.3.8-1_amd64.deb>`_
* `foundationdb-server-6.3.8-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.8/ubuntu/installers/foundationdb-server_6.3.8-1_amd64.deb>`_ (depends on the clients package)
* `foundationdb-clients-6.3.9-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.9/ubuntu/installers/foundationdb-clients_6.3.9-1_amd64.deb>`_
* `foundationdb-server-6.3.9-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.9/ubuntu/installers/foundationdb-server_6.3.9-1_amd64.deb>`_ (depends on the clients package)
RHEL/CentOS EL6
---------------
The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x.
* `foundationdb-clients-6.3.8-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.8/rhel6/installers/foundationdb-clients-6.3.8-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.3.8-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.8/rhel6/installers/foundationdb-server-6.3.8-1.el6.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.3.9-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.9/rhel6/installers/foundationdb-clients-6.3.9-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.3.9-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.9/rhel6/installers/foundationdb-server-6.3.9-1.el6.x86_64.rpm>`_ (depends on the clients package)
RHEL/CentOS EL7
---------------
The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x.
* `foundationdb-clients-6.3.8-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.8/rhel7/installers/foundationdb-clients-6.3.8-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.3.8-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.8/rhel7/installers/foundationdb-server-6.3.8-1.el7.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.3.9-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.9/rhel7/installers/foundationdb-clients-6.3.9-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.3.9-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.9/rhel7/installers/foundationdb-server-6.3.9-1.el7.x86_64.rpm>`_ (depends on the clients package)
Windows
-------
The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server.
* `foundationdb-6.3.8-x64.msi <https://www.foundationdb.org/downloads/6.3.8/windows/installers/foundationdb-6.3.8-x64.msi>`_
* `foundationdb-6.3.9-x64.msi <https://www.foundationdb.org/downloads/6.3.9/windows/installers/foundationdb-6.3.9-x64.msi>`_
API Language Bindings
=====================
@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part
If you need to use the FoundationDB Python API from other Python installations or paths, use the Python package manager ``pip`` (``pip install foundationdb``) or download the Python package:
* `foundationdb-6.3.8.tar.gz <https://www.foundationdb.org/downloads/6.3.8/bindings/python/foundationdb-6.3.8.tar.gz>`_
* `foundationdb-6.3.9.tar.gz <https://www.foundationdb.org/downloads/6.3.9/bindings/python/foundationdb-6.3.9.tar.gz>`_
Ruby 1.9.3/2.0.0+
-----------------
* `fdb-6.3.8.gem <https://www.foundationdb.org/downloads/6.3.8/bindings/ruby/fdb-6.3.8.gem>`_
* `fdb-6.3.9.gem <https://www.foundationdb.org/downloads/6.3.9/bindings/ruby/fdb-6.3.9.gem>`_
Java 8+
-------
* `fdb-java-6.3.8.jar <https://www.foundationdb.org/downloads/6.3.8/bindings/java/fdb-java-6.3.8.jar>`_
* `fdb-java-6.3.8-javadoc.jar <https://www.foundationdb.org/downloads/6.3.8/bindings/java/fdb-java-6.3.8-javadoc.jar>`_
* `fdb-java-6.3.9.jar <https://www.foundationdb.org/downloads/6.3.9/bindings/java/fdb-java-6.3.9.jar>`_
* `fdb-java-6.3.9-javadoc.jar <https://www.foundationdb.org/downloads/6.3.9/bindings/java/fdb-java-6.3.9-javadoc.jar>`_
Go 1.11+
--------

View File

@ -446,6 +446,7 @@
}
],
"recovery_state":{
"seconds_since_last_recovered":1,
"required_resolvers":1,
"required_commit_proxies":1,
"required_grv_proxies":1,

View File

@ -2,9 +2,17 @@
Release Notes
#############
6.2.27
======
* For clusters with a large number of shards, avoid slow tasks in the data distributor by adding yields to the shard map destruction. `(PR #3834) <https://github.com/apple/foundationdb/pull/3834>`_
* Reset the network connection between a proxy and master or resolvers if the proxy is too far behind in processing transactions. `(PR #3891) <https://github.com/apple/foundationdb/pull/3891>`_
6.2.26
======
* Attempt to detect when calling :func:`fdb_future_block_until_ready` would cause a deadlock, and throw ``blocked_from_network_thread`` if it would definitely cause a deadlock.
* Fixed undefined behavior in configuring supported FoundationDB versions while starting up a client. `(PR #3849) <https://github.com/apple/foundationdb/pull/3849>`_
* Updated OpenSSL to version 1.1.1h. `(PR #3809) <https://github.com/apple/foundationdb/pull/3809>`_
* Attempt to detect when calling :func:`fdb_future_block_until_ready` would cause a deadlock, and throw ``blocked_from_network_thread`` if it would definitely cause a deadlock. `(PR #3786) <https://github.com/apple/foundationdb/pull/3786>`_
6.2.25
======
@ -15,50 +23,31 @@ Release Notes
6.2.24
======
Features
--------
* Added the ``suspend`` command to ``fdbcli`` which kills a process and prevents it from rejoining the cluster for a specified duration. `(PR #3550) <https://github.com/apple/foundationdb/pull/3550>`_
6.2.23
======
Fixes
-----
* When configured with ``usable_regions=2`` data distribution could temporarily lower the replication of a shard when moving it. `(PR #3487) <https://github.com/apple/foundationdb/pull/3487>`_
* Prevent data distribution from running out of memory by fetching the source servers for too many shards in parallel. `(PR #3487) <https://github.com/apple/foundationdb/pull/3487>`_
* Reset network connections between log routers and satellite tlogs if the latencies are larger than 500ms. `(PR #3487) <https://github.com/apple/foundationdb/pull/3487>`_
Status
------
* Added per-process server request latency statistics reported in the role section of relevant processes. These are named ``grv_latency_statistics`` and ``commit_latency_statistics`` on proxy roles and ``read_latency_statistics`` on storage roles. `(PR #3480) <https://github.com/apple/foundationdb/pull/3480>`_
* Added ``cluster.active_primary_dc`` that indicates which datacenter is serving as the primary datacenter in multi-region setups. `(PR #3320) <https://github.com/apple/foundationdb/pull/3320>`_
6.2.22
======
Fixes
-----
* Coordinator class processes could be recruited as the cluster controller. `(PR #3282) <https://github.com/apple/foundationdb/pull/3282>`_
* HTTPS requests made by backup would fail (introduced in 6.2.21). `(PR #3284) <https://github.com/apple/foundationdb/pull/3284>`_
6.2.21
======
Fixes
-----
* HTTPS requests made by backup could hang indefinitely. `(PR #3027) <https://github.com/apple/foundationdb/pull/3027>`_
* ``fdbrestore`` prefix options required exactly a single hyphen instead of the standard two. `(PR #3056) <https://github.com/apple/foundationdb/pull/3056>`_
* Commits could stall on a newly elected proxy because of inaccurate compute estimates. `(PR #3123) <https://github.com/apple/foundationdb/pull/3123>`_
* A transaction class process with a bad disk could be repeatedly recruited as a transaction log. `(PR #3268) <https://github.com/apple/foundationdb/pull/3268>`_
* Fix a potential race condition that could lead to undefined behavior when connecting to a database using the multi-version client API. `(PR #3265) <https://github.com/apple/foundationdb/pull/3265>`_
Features
--------
* Added the ``getversion`` command to ``fdbcli`` which returns the current read version of the cluster. `(PR #2882) <https://github.com/apple/foundationdb/pull/2882>`_
* Added the ``advanceversion`` command to ``fdbcli`` which increases the current version of a cluster. `(PR #2965) <https://github.com/apple/foundationdb/pull/2965>`_
* Added the ``lock`` and ``unlock`` commands to ``fdbcli`` which lock or unlock a cluster. `(PR #2890) <https://github.com/apple/foundationdb/pull/2890>`_
@ -66,9 +55,6 @@ Features
6.2.20
======
Fixes
-----
* In rare scenarios, clients could send corrupted data to the server. `(PR #2976) <https://github.com/apple/foundationdb/pull/2976>`_
* Internal tools like ``fdbbackup`` are no longer tracked as clients in status (introduced in 6.2.18) `(PR #2849) <https://github.com/apple/foundationdb/pull/2849>`_
* Changed TLS error handling to match the behavior of 6.2.15. `(PR #2993) <https://github.com/apple/foundationdb/pull/2993>`_ `(PR #2977) <https://github.com/apple/foundationdb/pull/2977>`_
@ -76,9 +62,6 @@ Fixes
6.2.19
======
Fixes
-----
* Protect the proxies from running out of memory when bombarded with requests from clients. `(PR #2812) <https://github.com/apple/foundationdb/pull/2812>`_.
* One process with a ``proxy`` class would not become the first proxy when put with other ``stateless`` class processes. `(PR #2819) <https://github.com/apple/foundationdb/pull/2819>`_.
* If a transaction log stalled on a disk operation during recruitment the cluster would become unavailable until the process died. `(PR #2815) <https://github.com/apple/foundationdb/pull/2815>`_.
@ -86,70 +69,37 @@ Fixes
* Prevent the cluster from having too many active generations as a safety measure against repeated failures. `(PR #2814) <https://github.com/apple/foundationdb/pull/2814>`_.
* ``fdbcli`` status JSON could become truncated because of unprintable characters. `(PR #2807) <https://github.com/apple/foundationdb/pull/2807>`_.
* The data distributor used too much CPU in large clusters (broken in 6.2.16). `(PR #2806) <https://github.com/apple/foundationdb/pull/2806>`_.
Status
------
* Added ``cluster.workload.operations.memory_errors`` to measure the number of requests rejected by the proxies because the memory limit has been exceeded. `(PR #2812) <https://github.com/apple/foundationdb/pull/2812>`_.
* Added ``cluster.workload.operations.location_requests`` to measure the number of outgoing key server location responses from the proxies. `(PR #2812) <https://github.com/apple/foundationdb/pull/2812>`_.
* Added ``cluster.recovery_state.active_generations`` to track the number of generations for which the cluster still requires transaction logs. `(PR #2814) <https://github.com/apple/foundationdb/pull/2814>`_.
* Added ``network.tls_policy_failures`` to the ``processes`` section to record the number of TLS policy failures each process has observed. `(PR #2811) <https://github.com/apple/foundationdb/pull/2811>`_.
Features
--------
* Added ``--debug-tls`` as a command line argument to ``fdbcli`` to help diagnose TLS issues. `(PR #2810) <https://github.com/apple/foundationdb/pull/2810>`_.
6.2.18
======
Fixes
-----
* When configuring a cluster to usable_regions=2, data distribution would not react to machine failures while copying data to the remote region. `(PR #2774) <https://github.com/apple/foundationdb/pull/2774>`_.
* When a cluster is configured with usable_regions=2, data distribution could push a cluster into saturation by relocating too many shards simulatenously. `(PR #2776) <https://github.com/apple/foundationdb/pull/2776>`_.
* Do not allow the cluster controller to mark any process as failed within 30 seconds of startup. `(PR #2780) <https://github.com/apple/foundationdb/pull/2780>`_.
* Backup could not establish TLS connections (broken in 6.2.16). `(PR #2775) <https://github.com/apple/foundationdb/pull/2775>`_.
* Certificates were not refreshed automatically (broken in 6.2.16). `(PR #2781) <https://github.com/apple/foundationdb/pull/2781>`_.
Performance
-----------
* Improved the efficiency of establishing large numbers of network connections. `(PR #2777) <https://github.com/apple/foundationdb/pull/2777>`_.
Features
--------
* Add support for setting knobs to modify the behavior of ``fdbcli``. `(PR #2773) <https://github.com/apple/foundationdb/pull/2773>`_.
Other Changes
-------------
* Setting invalid knobs in backup and DR binaries is now a warning instead of an error and will not result in the application being terminated. `(PR #2773) <https://github.com/apple/foundationdb/pull/2773>`_.
6.2.17
======
Fixes
-----
* Restored the ability to set TLS configuration using environment variables (broken in 6.2.16). `(PR #2755) <https://github.com/apple/foundationdb/pull/2755>`_.
6.2.16
======
Performance
-----------
* Reduced tail commit latencies by improving commit pipelining on the proxies. `(PR #2589) <https://github.com/apple/foundationdb/pull/2589>`_.
* Data distribution does a better job balancing data when disks are more than 70% full. `(PR #2722) <https://github.com/apple/foundationdb/pull/2722>`_.
* Reverse range reads could read too much data from disk, resulting in poor performance relative to forward range reads. `(PR #2650) <https://github.com/apple/foundationdb/pull/2650>`_.
* Switched from LibreSSL to OpenSSL to improve the speed of establishing connections. `(PR #2646) <https://github.com/apple/foundationdb/pull/2646>`_.
* The cluster controller does a better job avoiding multiple recoveries when first recruited. `(PR #2698) <https://github.com/apple/foundationdb/pull/2698>`_.
Fixes
-----
* Storage servers could fail to advance their version correctly in response to empty commits. `(PR #2617) <https://github.com/apple/foundationdb/pull/2617>`_.
* Status could not label more than 5 processes as proxies. `(PR #2653) <https://github.com/apple/foundationdb/pull/2653>`_.
* The ``TR_FLAG_DISABLE_MACHINE_TEAM_REMOVER``, ``TR_FLAG_REMOVE_MT_WITH_MOST_TEAMS``, ``TR_FLAG_DISABLE_SERVER_TEAM_REMOVER``, and ``BUGGIFY_ALL_COORDINATION`` knobs could not be set at runtime. `(PR #2661) <https://github.com/apple/foundationdb/pull/2661>`_.
@ -161,17 +111,11 @@ Fixes
6.2.15
======
Fixes
-----
* TLS throttling could block legitimate connections. `(PR #2575) <https://github.com/apple/foundationdb/pull/2575>`_.
6.2.14
======
Fixes
-----
* Data distribution was prioritizing shard merges too highly. `(PR #2562) <https://github.com/apple/foundationdb/pull/2562>`_.
* Status would incorrectly mark clusters as having no fault tolerance. `(PR #2562) <https://github.com/apple/foundationdb/pull/2562>`_.
* A proxy could run out of memory if disconnected from the cluster for too long. `(PR #2562) <https://github.com/apple/foundationdb/pull/2562>`_.
@ -179,26 +123,16 @@ Fixes
6.2.13
======
Performance
-----------
* Optimized the commit path the proxies to significantly reduce commit latencies in large clusters. `(PR #2536) <https://github.com/apple/foundationdb/pull/2536>`_.
* Data distribution could create temporarily untrackable shards which could not be split if they became hot. `(PR #2546) <https://github.com/apple/foundationdb/pull/2546>`_.
6.2.12
======
Performance
-----------
* Throttle TLS connect attempts from misconfigured clients. `(PR #2529) <https://github.com/apple/foundationdb/pull/2529>`_.
* Reduced master recovery times in large clusters. `(PR #2430) <https://github.com/apple/foundationdb/pull/2430>`_.
* Improved performance while a remote region is catching up. `(PR #2527) <https://github.com/apple/foundationdb/pull/2527>`_.
* The data distribution algorithm does a better job preventing hot shards while recovering from machine failures. `(PR #2526) <https://github.com/apple/foundationdb/pull/2526>`_.
Fixes
-----
* Improve the reliability of a ``kill`` command from ``fdbcli``. `(PR #2512) <https://github.com/apple/foundationdb/pull/2512>`_.
* The ``--traceclock`` parameter to fdbserver incorrectly had no effect. `(PR #2420) <https://github.com/apple/foundationdb/pull/2420>`_.
* Clients could throw an internal error during ``commit`` if client buggification was enabled. `(PR #2427) <https://github.com/apple/foundationdb/pull/2427>`_.
@ -208,9 +142,6 @@ Fixes
6.2.11
======
Fixes
-----
* Clients could hang indefinitely on reads if all storage servers holding a keyrange were removed from a cluster since the last time the client read a key in the range. `(PR #2377) <https://github.com/apple/foundationdb/pull/2377>`_.
* In rare scenarios, status could falsely report no replicas remain of some data. `(PR #2380) <https://github.com/apple/foundationdb/pull/2380>`_.
* Latency band tracking could fail to configure correctly after a recovery or upon process startup. `(PR #2371) <https://github.com/apple/foundationdb/pull/2371>`_.
@ -218,17 +149,11 @@ Fixes
6.2.10
======
Fixes
-----
* ``backup_agent`` crashed on startup. `(PR #2356) <https://github.com/apple/foundationdb/pull/2356>`_.
6.2.9
=====
Fixes
-----
* Small clusters using specific sets of process classes could cause the data distributor to be continuously killed and re-recruited. `(PR #2344) <https://github.com/apple/foundationdb/pull/2344>`_.
* The data distributor and ratekeeper could be recruited on non-optimal processes. `(PR #2344) <https://github.com/apple/foundationdb/pull/2344>`_.
* A ``kill`` command from ``fdbcli`` could take a long time before being executed by a busy process. `(PR #2339) <https://github.com/apple/foundationdb/pull/2339>`_.
@ -238,9 +163,6 @@ Fixes
6.2.8
=====
Fixes
-----
* Significantly improved the rate at which the transaction logs in a remote region can pull data from the primary region. `(PR #2307) <https://github.com/apple/foundationdb/pull/2307>`_ `(PR #2323) <https://github.com/apple/foundationdb/pull/2323>`_.
* The ``system_kv_size_bytes`` status field could report a size much larger than the actual size of the system keyspace. `(PR #2305) <https://github.com/apple/foundationdb/pull/2305>`_.

View File

@ -2,7 +2,7 @@
Release Notes
#############
6.3.8
6.3.9
=====
Features
@ -61,6 +61,8 @@ Fixes
* In very rare scenarios, the data distributor process would crash when being shutdown. `(PR #3530) <https://github.com/apple/foundationdb/pull/3530>`_
* The master would die immediately if it did not have the correct cluster controller interface when recruited. [6.3.4] `(PR #3537) <https://github.com/apple/foundationdb/pull/3537>`_
* Fix an issue where ``fdbcli --exec 'exclude no_wait ...'`` would incorrectly report that processes can safely be removed from the cluster. [6.3.5] `(PR #3566) <https://github.com/apple/foundationdb/pull/3566>`_
* Commit latencies could become large because of inaccurate compute estimates. [6.3.9] `(PR #3845) <https://github.com/apple/foundationdb/pull/3845>`_
* Added a timeout on TLS handshakes to prevent them from hanging indefinitely. [6.3.9] `(PR #3850) <https://github.com/apple/foundationdb/pull/3850>`_
Status
------
@ -130,6 +132,7 @@ Fixes only impacting 6.3.0+
* Adjusted the proxy load balancing algorithm to be based on the CPU usage of the process instead of the number of requests processed. [6.3.5] `(PR #3653) <https://github.com/apple/foundationdb/pull/3653>`_
* Only return the error code ``batch_transaction_throttled`` for API versions greater than or equal to 630. [6.3.6] `(PR #3799) <https://github.com/apple/foundationdb/pull/3799>`_
* The fault tolerance calculation in status did not take into account region configurations. [6.3.8] `(PR #3836) <https://github.com/apple/foundationdb/pull/3836>`_
* Get read version tail latencies were high because some proxies were serving more read versions than other proxies. [6.3.9] `(PR #3845) <https://github.com/apple/foundationdb/pull/3845>`_
Earlier release notes
---------------------

View File

@ -129,9 +129,9 @@ namespace dbBackup {
struct BackupRangeTaskFunc : TaskFuncBase {
static StringRef name;
static const uint32_t version;
static constexpr uint32_t version = 1;
static struct {
static struct {
static TaskParam<int64_t> bytesWritten() { return LiteralStringRef(__FUNCTION__); }
} Params;
@ -421,16 +421,15 @@ namespace dbBackup {
};
StringRef BackupRangeTaskFunc::name = LiteralStringRef("dr_backup_range");
const uint32_t BackupRangeTaskFunc::version = 1;
const Key BackupRangeTaskFunc::keyAddBackupRangeTasks = LiteralStringRef("addBackupRangeTasks");
const Key BackupRangeTaskFunc::keyBackupRangeBeginKey = LiteralStringRef("backupRangeBeginKey");
REGISTER_TASKFUNC(BackupRangeTaskFunc);
struct FinishFullBackupTaskFunc : TaskFuncBase {
static StringRef name;
static const uint32_t version;
static constexpr uint32_t version = 1;
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state Subspace states = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keyStates).get(task->params[BackupAgentBase::keyConfigLogUid]);
wait(checkTaskVersion(tr, task, FinishFullBackupTaskFunc::name, FinishFullBackupTaskFunc::version));
@ -467,14 +466,13 @@ namespace dbBackup {
};
StringRef FinishFullBackupTaskFunc::name = LiteralStringRef("dr_finish_full_backup");
const uint32_t FinishFullBackupTaskFunc::version = 1;
REGISTER_TASKFUNC(FinishFullBackupTaskFunc);
struct EraseLogRangeTaskFunc : TaskFuncBase {
static StringRef name;
static const uint32_t version;
static constexpr uint32_t version = 1;
StringRef getName() const { return name; };
StringRef getName() const { return name; };
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _execute(cx, tb, fb, task); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
@ -523,14 +521,13 @@ namespace dbBackup {
}
};
StringRef EraseLogRangeTaskFunc::name = LiteralStringRef("dr_erase_log_range");
const uint32_t EraseLogRangeTaskFunc::version = 1;
REGISTER_TASKFUNC(EraseLogRangeTaskFunc);
struct CopyLogRangeTaskFunc : TaskFuncBase {
static StringRef name;
static const uint32_t version;
static constexpr uint32_t version = 1;
static struct {
static struct {
static TaskParam<int64_t> bytesWritten() { return LiteralStringRef(__FUNCTION__); }
} Params;
@ -773,15 +770,14 @@ namespace dbBackup {
}
};
StringRef CopyLogRangeTaskFunc::name = LiteralStringRef("dr_copy_log_range");
const uint32_t CopyLogRangeTaskFunc::version = 1;
const Key CopyLogRangeTaskFunc::keyNextBeginVersion = LiteralStringRef("nextBeginVersion");
REGISTER_TASKFUNC(CopyLogRangeTaskFunc);
struct CopyLogsTaskFunc : TaskFuncBase {
static StringRef name;
static const uint32_t version;
static constexpr uint32_t version = 1;
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state Subspace conf = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keyConfig).get(task->params[BackupAgentBase::keyConfigLogUid]);
state Subspace states = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keyStates).get(task->params[BackupAgentBase::keyConfigLogUid]);
wait(checkTaskVersion(tr, task, CopyLogsTaskFunc::name, CopyLogsTaskFunc::version));
@ -876,13 +872,12 @@ namespace dbBackup {
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef CopyLogsTaskFunc::name = LiteralStringRef("dr_copy_logs");
const uint32_t CopyLogsTaskFunc::version = 1;
REGISTER_TASKFUNC(CopyLogsTaskFunc);
struct FinishedFullBackupTaskFunc : TaskFuncBase {
static StringRef name;
static const uint32_t version;
static const Key keyInsertTask;
static constexpr uint32_t version = 1;
static const Key keyInsertTask;
StringRef getName() const { return name; };
@ -976,15 +971,14 @@ namespace dbBackup {
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef FinishedFullBackupTaskFunc::name = LiteralStringRef("dr_finished_full_backup");
const uint32_t FinishedFullBackupTaskFunc::version = 1;
const Key FinishedFullBackupTaskFunc::keyInsertTask = LiteralStringRef("insertTask");
REGISTER_TASKFUNC(FinishedFullBackupTaskFunc);
struct CopyDiffLogsTaskFunc : TaskFuncBase {
static StringRef name;
static const uint32_t version;
static constexpr uint32_t version = 1;
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state Subspace conf = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keyConfig).get(task->params[BackupAgentBase::keyConfigLogUid]);
state Subspace states = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keyStates).get(task->params[BackupAgentBase::keyConfigLogUid]);
wait(checkTaskVersion(tr, task, CopyDiffLogsTaskFunc::name, CopyDiffLogsTaskFunc::version));
@ -1059,15 +1053,14 @@ namespace dbBackup {
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef CopyDiffLogsTaskFunc::name = LiteralStringRef("dr_copy_diff_logs");
const uint32_t CopyDiffLogsTaskFunc::version = 1;
REGISTER_TASKFUNC(CopyDiffLogsTaskFunc);
// Skip unneeded EraseLogRangeTaskFunc in 5.1
struct SkipOldEraseLogRangeTaskFunc : TaskFuncBase {
static StringRef name;
static const uint32_t version;
static constexpr uint32_t version = 1;
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state Reference<TaskFuture> taskFuture = futureBucket->unpack(task->params[Task::reservedTaskParamKeyDone]);
wait(taskFuture->set(tr, taskBucket) && taskBucket->finish(tr, task));
return Void();
@ -1079,16 +1072,15 @@ namespace dbBackup {
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef SkipOldEraseLogRangeTaskFunc::name = LiteralStringRef("dr_skip_legacy_task");
const uint32_t SkipOldEraseLogRangeTaskFunc::version = 1;
REGISTER_TASKFUNC(SkipOldEraseLogRangeTaskFunc);
REGISTER_TASKFUNC_ALIAS(SkipOldEraseLogRangeTaskFunc, db_erase_log_range);
// This is almost the same as CopyLogRangeTaskFunc in 5.1. The only purpose is to support DR upgrade
struct OldCopyLogRangeTaskFunc : TaskFuncBase {
static StringRef name;
static const uint32_t version;
static constexpr uint32_t version = 1;
static struct {
static struct {
static TaskParam<int64_t> bytesWritten() { return LiteralStringRef(__FUNCTION__); }
} Params;
@ -1255,15 +1247,14 @@ namespace dbBackup {
}
};
StringRef OldCopyLogRangeTaskFunc::name = LiteralStringRef("db_copy_log_range");
const uint32_t OldCopyLogRangeTaskFunc::version = 1;
const Key OldCopyLogRangeTaskFunc::keyNextBeginVersion = LiteralStringRef("nextBeginVersion");
REGISTER_TASKFUNC(OldCopyLogRangeTaskFunc);
struct AbortOldBackupTaskFunc : TaskFuncBase {
static StringRef name;
static const uint32_t version;
static constexpr uint32_t version = 1;
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state DatabaseBackupAgent srcDrAgent(taskBucket->src);
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state Key tagNameKey;
@ -1316,7 +1307,6 @@ namespace dbBackup {
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef AbortOldBackupTaskFunc::name = LiteralStringRef("dr_abort_legacy_backup");
const uint32_t AbortOldBackupTaskFunc::version = 1;
REGISTER_TASKFUNC(AbortOldBackupTaskFunc);
REGISTER_TASKFUNC_ALIAS(AbortOldBackupTaskFunc, db_backup_range);
REGISTER_TASKFUNC_ALIAS(AbortOldBackupTaskFunc, db_finish_full_backup);
@ -1328,9 +1318,9 @@ namespace dbBackup {
//Upgrade DR from 5.1
struct CopyDiffLogsUpgradeTaskFunc : TaskFuncBase {
static StringRef name;
static const uint32_t version;
static constexpr uint32_t version = 1;
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state Key logUidValue = task->params[DatabaseBackupAgent::keyConfigLogUid];
state Subspace sourceStates = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceStates).get(logUidValue);
state Subspace config = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keyConfig).get(logUidValue);
@ -1435,14 +1425,13 @@ namespace dbBackup {
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef CopyDiffLogsUpgradeTaskFunc::name = LiteralStringRef("db_copy_diff_logs");
const uint32_t CopyDiffLogsUpgradeTaskFunc::version = 1;
REGISTER_TASKFUNC(CopyDiffLogsUpgradeTaskFunc);
struct BackupRestorableTaskFunc : TaskFuncBase {
static StringRef name;
static const uint32_t version;
static constexpr uint32_t version = 1;
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state Subspace sourceStates = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceStates).get(task->params[BackupAgentBase::keyConfigLogUid]);
wait(checkTaskVersion(cx, task, BackupRestorableTaskFunc::name, BackupRestorableTaskFunc::version));
state Transaction tr(taskBucket->src);
@ -1527,14 +1516,13 @@ namespace dbBackup {
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef BackupRestorableTaskFunc::name = LiteralStringRef("dr_backup_restorable");
const uint32_t BackupRestorableTaskFunc::version = 1;
REGISTER_TASKFUNC(BackupRestorableTaskFunc);
struct StartFullBackupTaskFunc : TaskFuncBase {
static StringRef name;
static const uint32_t version;
static constexpr uint32_t version = 1;
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state Key logUidValue = task->params[DatabaseBackupAgent::keyConfigLogUid];
state Subspace sourceStates = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceStates).get(logUidValue);
wait(checkTaskVersion(cx, task, StartFullBackupTaskFunc::name, StartFullBackupTaskFunc::version));
@ -1726,7 +1714,6 @@ namespace dbBackup {
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef StartFullBackupTaskFunc::name = LiteralStringRef("dr_start_full_backup");
const uint32_t StartFullBackupTaskFunc::version = 1;
REGISTER_TASKFUNC(StartFullBackupTaskFunc);
}
@ -1822,7 +1809,7 @@ void checkAtomicSwitchOverConfig(StatusObjectReader srcStatus, StatusObjectReade
class DatabaseBackupAgentImpl {
public:
static const int MAX_RESTORABLE_FILE_METASECTION_BYTES = 1024 * 8;
static constexpr int MAX_RESTORABLE_FILE_METASECTION_BYTES = 1024 * 8;
ACTOR static Future<Void> waitUpgradeToLatestDrVersion(DatabaseBackupAgent* backupAgent, Database cx, Key tagName) {
state UID logUid = wait(backupAgent->getLogUid(cx, tagName));

View File

@ -257,7 +257,7 @@ struct Traceable<std::set<T>> : std::true_type {
std::string printable( const StringRef& val );
std::string printable( const std::string& val );
std::string printable( const KeyRangeRef& range );
std::string printable(const VectorRef<KeyRangeRef>& val);
std::string printable( const VectorRef<KeyRangeRef>& val);
std::string printable( const VectorRef<StringRef>& val );
std::string printable( const VectorRef<KeyValueRef>& val );
std::string printable( const KeyValueRef& val );

View File

@ -623,8 +623,6 @@ namespace fileBackup {
// Very simple format compared to KeyRange files.
// Header, [Key, Value]... Key len
struct LogFileWriter {
static const std::string &FFs;
LogFileWriter(Reference<IBackupFile> file = Reference<IBackupFile>(), int blockSize = 0)
: file(file), blockSize(blockSize), blockEnd(0) {}
@ -940,14 +938,14 @@ namespace fileBackup {
// Backup and Restore taskFunc definitions will inherit from one of the following classes which
// servers to catch and log to the appropriate config any error that execute/finish didn't catch and log.
struct RestoreTaskFuncBase : TaskFuncBase {
Future<Void> handleError(Database cx, Reference<Task> task, Error const& error) final override {
Future<Void> handleError(Database cx, Reference<Task> task, Error const& error) final {
return RestoreConfig(task).logError(cx, error, format("'%s' on '%s'", error.what(), task->params[Task::reservedTaskParamKeyType].printable().c_str()));
}
virtual std::string toString(Reference<Task> task) const { return ""; }
};
struct BackupTaskFuncBase : TaskFuncBase {
Future<Void> handleError(Database cx, Reference<Task> task, Error const& error) final override {
Future<Void> handleError(Database cx, Reference<Task> task, Error const& error) final {
return BackupConfig(task).logError(cx, error, format("'%s' on '%s'", error.what(), task->params[Task::reservedTaskParamKeyType].printable().c_str()));
}
virtual std::string toString(Reference<Task> task) const { return ""; }
@ -970,9 +968,9 @@ namespace fileBackup {
struct BackupRangeTaskFunc : BackupTaskFuncBase {
static StringRef name;
static const uint32_t version;
static constexpr uint32_t version = 1;
static struct {
static struct {
static TaskParam<Key> beginKey() {
return LiteralStringRef(__FUNCTION__);
}
@ -1265,14 +1263,13 @@ namespace fileBackup {
};
StringRef BackupRangeTaskFunc::name = LiteralStringRef("file_backup_write_range_5.2");
const uint32_t BackupRangeTaskFunc::version = 1;
REGISTER_TASKFUNC(BackupRangeTaskFunc);
struct BackupSnapshotDispatchTask : BackupTaskFuncBase {
static StringRef name;
static const uint32_t version;
static constexpr uint32_t version = 1;
static struct {
static struct {
// Set by Execute, used by Finish
static TaskParam<int64_t> shardsBehind() {
return LiteralStringRef(__FUNCTION__);
@ -1786,14 +1783,13 @@ namespace fileBackup {
};
StringRef BackupSnapshotDispatchTask::name = LiteralStringRef("file_backup_dispatch_ranges_5.2");
const uint32_t BackupSnapshotDispatchTask::version = 1;
REGISTER_TASKFUNC(BackupSnapshotDispatchTask);
struct BackupLogRangeTaskFunc : BackupTaskFuncBase {
static StringRef name;
static const uint32_t version;
static constexpr uint32_t version = 1;
static struct {
static struct {
static TaskParam<bool> addBackupLogRangeTasks() {
return LiteralStringRef(__FUNCTION__);
}
@ -1988,14 +1984,13 @@ namespace fileBackup {
};
StringRef BackupLogRangeTaskFunc::name = LiteralStringRef("file_backup_write_logs_5.2");
const uint32_t BackupLogRangeTaskFunc::version = 1;
REGISTER_TASKFUNC(BackupLogRangeTaskFunc);
//This task stopped being used in 6.2, however the code remains here to handle upgrades.
struct EraseLogRangeTaskFunc : BackupTaskFuncBase {
static StringRef name;
static const uint32_t version;
StringRef getName() const { return name; };
static constexpr uint32_t version = 1;
StringRef getName() const { return name; };
static struct {
static TaskParam<Version> beginVersion() {
@ -2045,16 +2040,15 @@ namespace fileBackup {
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef EraseLogRangeTaskFunc::name = LiteralStringRef("file_backup_erase_logs_5.2");
const uint32_t EraseLogRangeTaskFunc::version = 1;
REGISTER_TASKFUNC(EraseLogRangeTaskFunc);
struct BackupLogsDispatchTask : BackupTaskFuncBase {
static StringRef name;
static const uint32_t version;
static constexpr uint32_t version = 1;
static struct {
static struct {
static TaskParam<Version> prevBeginVersion() {
return LiteralStringRef(__FUNCTION__);
}
@ -2173,14 +2167,13 @@ namespace fileBackup {
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef BackupLogsDispatchTask::name = LiteralStringRef("file_backup_dispatch_logs_5.2");
const uint32_t BackupLogsDispatchTask::version = 1;
REGISTER_TASKFUNC(BackupLogsDispatchTask);
struct FileBackupFinishedTask : BackupTaskFuncBase {
static StringRef name;
static const uint32_t version;
static constexpr uint32_t version = 1;
StringRef getName() const { return name; };
StringRef getName() const { return name; };
ACTOR static Future<Void> _finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
wait(checkTaskVersion(tr->getDatabase(), task, FileBackupFinishedTask::name, FileBackupFinishedTask::version));
@ -2214,13 +2207,12 @@ namespace fileBackup {
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef FileBackupFinishedTask::name = LiteralStringRef("file_backup_finished_5.2");
const uint32_t FileBackupFinishedTask::version = 1;
REGISTER_TASKFUNC(FileBackupFinishedTask);
struct BackupSnapshotManifest : BackupTaskFuncBase {
static StringRef name;
static const uint32_t version;
static struct {
static constexpr uint32_t version = 1;
static struct {
static TaskParam<Version> endVersion() { return LiteralStringRef(__FUNCTION__); }
} Params;
@ -2375,7 +2367,6 @@ namespace fileBackup {
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef BackupSnapshotManifest::name = LiteralStringRef("file_backup_write_snapshot_manifest_5.2");
const uint32_t BackupSnapshotManifest::version = 1;
REGISTER_TASKFUNC(BackupSnapshotManifest);
Future<Key> BackupSnapshotDispatchTask::addSnapshotManifestTask(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> taskBucket, Reference<Task> parentTask, TaskCompletionKey completionKey, Reference<TaskFuture> waitFor) {
@ -2384,9 +2375,9 @@ namespace fileBackup {
struct StartFullBackupTaskFunc : BackupTaskFuncBase {
static StringRef name;
static const uint32_t version;
static constexpr uint32_t version = 1;
static struct {
static struct {
static TaskParam<Version> beginVersion() { return LiteralStringRef(__FUNCTION__); }
} Params;
@ -2528,7 +2519,6 @@ namespace fileBackup {
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef StartFullBackupTaskFunc::name = LiteralStringRef("file_backup_start_5.2");
const uint32_t StartFullBackupTaskFunc::version = 1;
REGISTER_TASKFUNC(StartFullBackupTaskFunc);
struct RestoreCompleteTaskFunc : RestoreTaskFuncBase {
@ -2571,15 +2561,14 @@ namespace fileBackup {
}
static StringRef name;
static const uint32_t version;
StringRef getName() const { return name; };
static constexpr uint32_t version = 1;
StringRef getName() const { return name; };
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return Void(); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef RestoreCompleteTaskFunc::name = LiteralStringRef("restore_complete");
const uint32_t RestoreCompleteTaskFunc::version = 1;
REGISTER_TASKFUNC(RestoreCompleteTaskFunc);
struct RestoreFileTaskFuncBase : RestoreTaskFuncBase {
@ -2835,20 +2824,19 @@ namespace fileBackup {
}
static StringRef name;
static const uint32_t version;
StringRef getName() const { return name; };
static constexpr uint32_t version = 1;
StringRef getName() const { return name; };
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _execute(cx, tb, fb, task); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef RestoreRangeTaskFunc::name = LiteralStringRef("restore_range_data");
const uint32_t RestoreRangeTaskFunc::version = 1;
REGISTER_TASKFUNC(RestoreRangeTaskFunc);
struct RestoreLogDataTaskFunc : RestoreFileTaskFuncBase {
static StringRef name;
static const uint32_t version;
StringRef getName() const { return name; };
static constexpr uint32_t version = 1;
StringRef getName() const { return name; };
static struct : InputParams {
} Params;
@ -2990,13 +2978,12 @@ namespace fileBackup {
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef RestoreLogDataTaskFunc::name = LiteralStringRef("restore_log_data");
const uint32_t RestoreLogDataTaskFunc::version = 1;
REGISTER_TASKFUNC(RestoreLogDataTaskFunc);
struct RestoreDispatchTaskFunc : RestoreTaskFuncBase {
static StringRef name;
static const uint32_t version;
StringRef getName() const { return name; };
static constexpr uint32_t version = 1;
StringRef getName() const { return name; };
static struct {
static TaskParam<Version> beginVersion() { return LiteralStringRef(__FUNCTION__); }
@ -3303,7 +3290,6 @@ namespace fileBackup {
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef RestoreDispatchTaskFunc::name = LiteralStringRef("restore_dispatch");
const uint32_t RestoreDispatchTaskFunc::version = 1;
REGISTER_TASKFUNC(RestoreDispatchTaskFunc);
ACTOR Future<std::string> restoreStatus(Reference<ReadYourWritesTransaction> tr, Key tagName) {
@ -3397,9 +3383,9 @@ namespace fileBackup {
struct StartFullRestoreTaskFunc : RestoreTaskFuncBase {
static StringRef name;
static const uint32_t version;
static constexpr uint32_t version = 1;
static struct {
static struct {
static TaskParam<Version> firstVersion() { return LiteralStringRef(__FUNCTION__); }
} Params;
@ -3592,7 +3578,6 @@ namespace fileBackup {
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
};
StringRef StartFullRestoreTaskFunc::name = LiteralStringRef("restore_start");
const uint32_t StartFullRestoreTaskFunc::version = 1;
REGISTER_TASKFUNC(StartFullRestoreTaskFunc);
}
@ -3608,7 +3593,7 @@ struct LogInfo : public ReferenceCounted<LogInfo> {
class FileBackupAgentImpl {
public:
static const int MAX_RESTORABLE_FILE_METASECTION_BYTES = 1024 * 8;
static constexpr int MAX_RESTORABLE_FILE_METASECTION_BYTES = 1024 * 8;
// Parallel restore
ACTOR static Future<Void> parallelRestoreFinish(Database cx, UID randomUID, bool unlockDB = true) {

View File

@ -1185,9 +1185,9 @@ void DatabaseContext::setOption( FDBDatabaseOptions::Option option, Optional<Str
case FDBDatabaseOptions::MACHINE_ID:
clientLocality = LocalityData( clientLocality.processId(), value.present() ? Standalone<StringRef>(value.get()) : Optional<Standalone<StringRef>>(), clientLocality.machineId(), clientLocality.dcId() );
if (clientInfo->get().commitProxies.size())
commitProxies = Reference<CommitProxyInfo>(new CommitProxyInfo(clientInfo->get().commitProxies));
commitProxies = Reference<CommitProxyInfo>(new CommitProxyInfo(clientInfo->get().commitProxies, false));
if( clientInfo->get().grvProxies.size() )
grvProxies = Reference<GrvProxyInfo>( new GrvProxyInfo( clientInfo->get().grvProxies ) );
grvProxies = Reference<GrvProxyInfo>( new GrvProxyInfo( clientInfo->get().grvProxies, true) );
server_interf.clear();
locationCache.insert( allKeys, Reference<LocationInfo>() );
break;
@ -1197,9 +1197,9 @@ void DatabaseContext::setOption( FDBDatabaseOptions::Option option, Optional<Str
case FDBDatabaseOptions::DATACENTER_ID:
clientLocality = LocalityData(clientLocality.processId(), clientLocality.zoneId(), clientLocality.machineId(), value.present() ? Standalone<StringRef>(value.get()) : Optional<Standalone<StringRef>>());
if (clientInfo->get().commitProxies.size())
commitProxies = Reference<CommitProxyInfo>(new CommitProxyInfo(clientInfo->get().commitProxies));
commitProxies = Reference<CommitProxyInfo>( new CommitProxyInfo(clientInfo->get().commitProxies, false));
if( clientInfo->get().grvProxies.size() )
grvProxies = Reference<GrvProxyInfo>( new GrvProxyInfo( clientInfo->get().grvProxies ));
grvProxies = Reference<GrvProxyInfo>( new GrvProxyInfo( clientInfo->get().grvProxies, true));
server_interf.clear();
locationCache.insert( allKeys, Reference<LocationInfo>() );
break;
@ -1585,11 +1585,11 @@ void DatabaseContext::updateProxies() {
grvProxies.clear();
bool commitProxyProvisional = false, grvProxyProvisional = false;
if (clientInfo->get().commitProxies.size()) {
commitProxies = Reference<CommitProxyInfo>(new CommitProxyInfo(clientInfo->get().commitProxies));
commitProxies = Reference<CommitProxyInfo>(new CommitProxyInfo(clientInfo->get().commitProxies, false));
commitProxyProvisional = clientInfo->get().commitProxies[0].provisional;
}
if (clientInfo->get().grvProxies.size()) {
grvProxies = Reference<GrvProxyInfo>(new GrvProxyInfo(clientInfo->get().grvProxies));
grvProxies = Reference<GrvProxyInfo>(new GrvProxyInfo(clientInfo->get().grvProxies, true));
grvProxyProvisional = clientInfo->get().grvProxies[0].provisional;
}
if (clientInfo->get().commitProxies.size() && clientInfo->get().grvProxies.size()) {

View File

@ -1313,7 +1313,8 @@ Future< Standalone<RangeResultRef> > ReadYourWritesTransaction::getRange(
bool reverse )
{
if (getDatabase()->apiVersionAtLeast(630)) {
if (specialKeys.contains(begin.getKey()) && end.getKey() <= specialKeys.end) {
if (specialKeys.contains(begin.getKey()) && specialKeys.begin <= end.getKey() &&
end.getKey() <= specialKeys.end) {
TEST(true); // Special key space get range
return getDatabase()->specialKeySpace->getRange(this, begin, end, limits, reverse);
}

View File

@ -490,6 +490,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
)statusSchema"
R"statusSchema(
"recovery_state":{
"seconds_since_last_recovered":1,
"required_resolvers":1,
"required_commit_proxies":1,
"required_grv_proxies":1,

View File

@ -151,7 +151,7 @@ ACTOR Future<Void> normalizeKeySelectorActor(SpecialKeySpace* sks, ReadYourWrite
if (!ks->isFirstGreaterOrEqual()) {
// The Key Selector clamps up to the legal key space
TraceEvent(SevInfo, "ReadToBoundary")
TraceEvent(SevDebug, "ReadToBoundary")
.detail("TerminateKey", ks->getKey())
.detail("TerminateOffset", ks->offset);
if (ks->offset < 1)

View File

@ -69,7 +69,7 @@ REGISTER_TASKFUNC(AddTaskFunc);
struct IdleTaskFunc : TaskFuncBase {
static StringRef name;
static const uint32_t version = 1;
static constexpr uint32_t version = 1;
StringRef getName() const { return name; };
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return Void(); };

View File

@ -256,7 +256,7 @@ public:
return pauseKey;
}
Subspace getAvailableSpace(int priority = 0) {
Subspace getAvailableSpace(int priority = 0) const {
if(priority == 0)
return available;
return available_prioritized.get(priority);

View File

@ -186,16 +186,7 @@ struct PingReceiver final : NetworkMessageReceiver {
class TransportData {
public:
TransportData(uint64_t transportId)
: endpointNotFoundReceiver(endpoints),
pingReceiver(endpoints),
warnAlwaysForLargePacket(true),
lastIncompatibleMessage(0),
transportId(transportId),
numIncompatibleConnections(0)
{
degraded = Reference<AsyncVar<bool>>( new AsyncVar<bool>(false) );
}
TransportData(uint64_t transportId);
~TransportData();
@ -219,6 +210,7 @@ public:
std::unordered_map<NetworkAddress, Reference<struct Peer>> peers;
std::unordered_map<NetworkAddress, std::pair<double, double>> closedPeers;
HealthMonitor healthMonitor;
std::set<NetworkAddress> orderedAddresses;
Reference<AsyncVar<bool>> degraded;
bool warnAlwaysForLargePacket;
@ -243,8 +235,58 @@ public:
uint64_t transportId;
Future<Void> multiVersionCleanup;
Future<Void> pingLogger;
};
ACTOR Future<Void> pingLatencyLogger(TransportData* self) {
state NetworkAddress lastAddress = NetworkAddress();
loop {
if(self->orderedAddresses.size()) {
auto it = self->orderedAddresses.upper_bound(lastAddress);
if(it == self->orderedAddresses.end()) {
it = self->orderedAddresses.begin();
}
lastAddress = *it;
auto peer = self->getPeer(lastAddress);
if(!peer) {
TraceEvent(SevWarnAlways, "MissingNetworkAddress").suppressFor(10.0).detail("PeerAddr", lastAddress);
}
if(peer && peer->pingLatencies.getPopulationSize() >= 10) {
TraceEvent("PingLatency")
.detail("PeerAddr", lastAddress)
.detail("MinLatency", peer->pingLatencies.min())
.detail("MaxLatency", peer->pingLatencies.max())
.detail("MeanLatency", peer->pingLatencies.mean())
.detail("MedianLatency", peer->pingLatencies.median())
.detail("P90Latency", peer->pingLatencies.percentile(0.90))
.detail("Count", peer->pingLatencies.getPopulationSize())
.detail("BytesReceived", peer->bytesReceived - peer->lastLoggedBytesReceived)
.detail("BytesSent", peer->bytesSent - peer->lastLoggedBytesSent);
peer->pingLatencies.clear();
peer->lastLoggedBytesReceived = peer->bytesReceived;
peer->lastLoggedBytesSent = peer->bytesSent;
wait(delay(FLOW_KNOBS->PING_LOGGING_INTERVAL));
} else if(it == self->orderedAddresses.begin()) {
wait(delay(FLOW_KNOBS->PING_LOGGING_INTERVAL));
}
} else {
wait(delay(FLOW_KNOBS->PING_LOGGING_INTERVAL));
}
}
}
TransportData::TransportData(uint64_t transportId)
: endpointNotFoundReceiver(endpoints),
pingReceiver(endpoints),
warnAlwaysForLargePacket(true),
lastIncompatibleMessage(0),
transportId(transportId),
numIncompatibleConnections(0)
{
degraded = Reference<AsyncVar<bool>>( new AsyncVar<bool>(false) );
pingLogger = pingLatencyLogger(this);
}
#define CONNECT_PACKET_V0 0x0FDB00A444020001LL
#define CONNECT_PACKET_V0_SIZE 14
@ -370,10 +412,14 @@ ACTOR Future<Void> connectionMonitor( Reference<Peer> peer ) {
FlowTransport::transport().sendUnreliable( SerializeSource<ReplyPromise<Void>>(reply), remotePingEndpoint, true );
state int64_t startingBytes = peer->bytesReceived;
state int timeouts = 0;
state double startTime = now();
loop {
choose {
when (wait( delay( FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT ) )) {
if(startingBytes == peer->bytesReceived) {
if(peer->destination.isPublic()) {
peer->pingLatencies.addSample(now() - startTime);
}
TraceEvent("ConnectionTimeout").suppressFor(1.0).detail("WithAddr", peer->destination);
throw connection_failed();
}
@ -387,6 +433,9 @@ ACTOR Future<Void> connectionMonitor( Reference<Peer> peer ) {
timeouts++;
}
when (wait( reply.getFuture() )) {
if(peer->destination.isPublic()) {
peer->pingLatencies.addSample(now() - startTime);
}
break;
}
when (wait( peer->resetPing.onTrigger())) {
@ -409,9 +458,9 @@ ACTOR Future<Void> connectionWriter( Reference<Peer> self, Reference<IConnection
loop {
lastWriteTime = now();
int sent = conn->write(self->unsent.getUnsent(), FLOW_KNOBS->MAX_PACKET_SEND_BYTES);
if (sent != 0) {
int sent = conn->write(self->unsent.getUnsent(), /* limit= */ FLOW_KNOBS->MAX_PACKET_SEND_BYTES);
if (sent) {
self->bytesSent += sent;
self->transport->bytesSent += sent;
self->unsent.sent(sent);
}
@ -658,6 +707,7 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
TraceEvent("PeerDestroy").error(e).suppressFor(1.0).detail("PeerAddr", self->destination);
self->connect.cancel();
self->transport->peers.erase(self->destination);
self->transport->orderedAddresses.erase(self->destination);
return Void();
}
}
@ -667,8 +717,9 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
Peer::Peer(TransportData* transport, NetworkAddress const& destination)
: transport(transport), destination(destination), outgoingConnectionIdle(true), lastConnectTime(0.0),
reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true), outstandingReplies(0),
incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()) {
incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()),
pingLatencies(destination.isPublic() ? FLOW_KNOBS->PING_SAMPLE_AMOUNT : 1), lastLoggedBytesReceived(0),
bytesSent(0), lastLoggedBytesSent(0) {
IFailureMonitor::failureMonitor().setStatus(destination, FailureStatus(false));
}
@ -1146,6 +1197,9 @@ Reference<Peer> TransportData::getOrOpenPeer( NetworkAddress const& address, boo
peer->connect = connectionKeeper(peer);
}
peers[address] = peer;
if(address.isPublic()) {
orderedAddresses.insert(address);
}
}
return peer;

View File

@ -28,6 +28,7 @@
#include "flow/network.h"
#include "flow/FileIdentifier.h"
#include "flow/Net2Packet.h"
#include "fdbrpc/ContinuousSample.h"
#pragma pack(push, 4)
class Endpoint {
@ -140,8 +141,12 @@ struct Peer : public ReferenceCounted<Peer> {
int peerReferences;
bool incompatibleProtocolVersionNewer;
int64_t bytesReceived;
int64_t bytesSent;
double lastDataPacketSentTime;
int outstandingReplies;
ContinuousSample<double> pingLatencies;
int64_t lastLoggedBytesReceived;
int64_t lastLoggedBytesSent;
explicit Peer(TransportData* transport, NetworkAddress const& destination);

View File

@ -86,7 +86,10 @@ struct AlternativeInfo {
template <class T>
class ModelInterface : public ReferenceCounted<ModelInterface<T>> {
public:
ModelInterface( const vector<T>& v ) {
//If balanceOnRequests is true, the client will load balance based on the number of GRVs released by each proxy
//If balanceOnRequests is false, the client will load balance based on the CPU usage of each proxy
//Only requests which take from the GRV budget on the proxy should set balanceOnRequests to true
ModelInterface( const vector<T>& v, bool balanceOnRequests ) : balanceOnRequests(balanceOnRequests) {
for(int i = 0; i < v.size(); i++) {
alternatives.push_back(AlternativeInfo(v[i], 1.0/v.size(), (i+1.0)/v.size()));
}
@ -111,22 +114,26 @@ public:
}
void updateProbabilities() {
double totalBusyTime = 0;
double totalBusy = 0;
for(auto& it : alternatives) {
totalBusyTime += it.processBusyTime;
int busyMetric = balanceOnRequests ? it.processBusyTime/FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION :
it.processBusyTime%FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION;
totalBusy += busyMetric;
if(now() - it.lastUpdate > FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/2.0) {
return;
}
}
//Do not update probabilities if the average proxy busyness is less than 5%
if(totalBusyTime < FLOW_KNOBS->BASIC_LOAD_BALANCE_MIN_AMOUNT*alternatives.size()) {
if((balanceOnRequests && totalBusy < FLOW_KNOBS->BASIC_LOAD_BALANCE_MIN_REQUESTS*alternatives.size()) ||
(!balanceOnRequests && totalBusy < FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION*FLOW_KNOBS->BASIC_LOAD_BALANCE_MIN_CPU*alternatives.size())) {
return;
}
double totalProbability = 0;
for(auto& it : alternatives) {
it.probability += (1.0/alternatives.size()-(it.processBusyTime/totalBusyTime))*FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_CHANGE;
int busyMetric = balanceOnRequests ? it.processBusyTime/FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION :
it.processBusyTime%FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION;
it.probability += (1.0/alternatives.size()-(busyMetric/totalBusy))*FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_CHANGE;
it.probability = std::max(it.probability, 1/(FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_PROB*alternatives.size()));
it.probability = std::min(it.probability, FLOW_KNOBS->BASIC_LOAD_BALANCE_MAX_PROB/alternatives.size());
totalProbability += it.probability;
@ -160,6 +167,7 @@ public:
private:
vector<AlternativeInfo<T>> alternatives;
Future<Void> updater;
bool balanceOnRequests;
};
template <class T>

View File

@ -189,6 +189,8 @@ public:
void insert( const Range& keys, const Val& value );
Future<Void> clearAsync() { return map.clearAsync(); }
protected:
Map<Key,Val,pair_type,Metric> map;
const MetricFunc mf;

View File

@ -429,7 +429,7 @@ struct BackupData {
ACTOR static Future<Version> _getMinKnownCommittedVersion(BackupData* self) {
state Span span("BA:GetMinCommittedVersion"_loc);
loop {
GetReadVersionRequest request(span.context, 1, TransactionPriority::DEFAULT,
GetReadVersionRequest request(span.context, 0, TransactionPriority::DEFAULT,
GetReadVersionRequest::FLAG_USE_MIN_KNOWN_COMMITTED_VERSION);
choose {
when(wait(self->cx->onProxiesChanged())) {}

View File

@ -182,6 +182,7 @@ set(FDBSERVER_SRCS
workloads/ReadWrite.actor.cpp
workloads/RemoveServersSafely.actor.cpp
workloads/ReportConflictingKeys.actor.cpp
workloads/RestoreBackup.actor.cpp
workloads/Rollback.actor.cpp
workloads/RyowCorrectness.actor.cpp
workloads/RYWDisable.actor.cpp
@ -197,6 +198,7 @@ set(FDBSERVER_SRCS
workloads/StatusWorkload.actor.cpp
workloads/Storefront.actor.cpp
workloads/StreamingRead.actor.cpp
workloads/SubmitBackup.actor.cpp
workloads/TagThrottleApi.actor.cpp
workloads/TargetedKill.actor.cpp
workloads/TaskBucketCorrectness.actor.cpp

View File

@ -536,6 +536,16 @@ ACTOR Future<Void> preresolutionProcessing(CommitBatchContext* self) {
state const Optional<UID>& debugID = self->debugID;
state Span span("MP:preresolutionProcessing"_loc, self->span.context);
if (self->localBatchNumber - self->pProxyCommitData->latestLocalCommitBatchResolving.get() >
SERVER_KNOBS->RESET_MASTER_BATCHES &&
now() - self->pProxyCommitData->lastMasterReset > SERVER_KNOBS->RESET_MASTER_DELAY) {
TraceEvent(SevWarnAlways, "ResetMasterNetwork")
.detail("CurrentBatch", self->localBatchNumber)
.detail("InProcessBatch", self->pProxyCommitData->latestLocalCommitBatchResolving.get());
FlowTransport::transport().resetConnection(self->pProxyCommitData->master.address());
self->pProxyCommitData->lastMasterReset = now();
}
// Pre-resolution the commits
TEST(pProxyCommitData->latestLocalCommitBatchResolving.get() < localBatchNumber - 1);
wait(pProxyCommitData->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber - 1));
@ -630,6 +640,18 @@ ACTOR Future<Void> getResolution(CommitBatchContext* self) {
pProxyCommitData, self->releaseDelay, self->localBatchNumber
);
if (self->localBatchNumber - self->pProxyCommitData->latestLocalCommitBatchLogging.get() >
SERVER_KNOBS->RESET_RESOLVER_BATCHES &&
now() - self->pProxyCommitData->lastResolverReset > SERVER_KNOBS->RESET_RESOLVER_DELAY) {
TraceEvent(SevWarnAlways, "ResetResolverNetwork")
.detail("CurrentBatch", self->localBatchNumber)
.detail("InProcessBatch", self->pProxyCommitData->latestLocalCommitBatchLogging.get());
for (int r = 0; r < self->pProxyCommitData->resolvers.size(); r++) {
FlowTransport::transport().resetConnection(self->pProxyCommitData->resolvers[r].address());
}
self->pProxyCommitData->lastResolverReset = now();
}
// Wait for the final resolution
std::vector<ResolveTransactionBatchReply> resolutionResp = wait(getAll(replies));
self->resolution.swap(*const_cast<std::vector<ResolveTransactionBatchReply>*>(&resolutionResp));
@ -1069,14 +1091,16 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
}
self->computeDuration += g_network->timer() - self->computeStart;
if(self->computeDuration > SERVER_KNOBS->MIN_PROXY_COMPUTE && self->batchOperations > 0) {
double computePerOperation = self->computeDuration / self->batchOperations;
if(computePerOperation <= pProxyCommitData->commitComputePerOperation[self->latencyBucket]) {
pProxyCommitData->commitComputePerOperation[self->latencyBucket] = computePerOperation;
} else {
pProxyCommitData->commitComputePerOperation[self->latencyBucket] = SERVER_KNOBS->PROXY_COMPUTE_GROWTH_RATE*computePerOperation + ((1.0-SERVER_KNOBS->PROXY_COMPUTE_GROWTH_RATE)*pProxyCommitData->commitComputePerOperation[self->latencyBucket]);
}
}
if (self->batchOperations > 0) {
double computePerOperation = std::min( SERVER_KNOBS->MAX_COMPUTE_PER_OPERATION, self->computeDuration / self->batchOperations );
if(computePerOperation <= pProxyCommitData->commitComputePerOperation[self->latencyBucket]) {
pProxyCommitData->commitComputePerOperation[self->latencyBucket] = computePerOperation;
} else {
pProxyCommitData->commitComputePerOperation[self->latencyBucket] = SERVER_KNOBS->PROXY_COMPUTE_GROWTH_RATE*computePerOperation + ((1.0-SERVER_KNOBS->PROXY_COMPUTE_GROWTH_RATE)*pProxyCommitData->commitComputePerOperation[self->latencyBucket]);
}
pProxyCommitData->stats.maxComputeNS = std::max<int64_t>(pProxyCommitData->stats.maxComputeNS, 1e9 * pProxyCommitData->commitComputePerOperation[self->latencyBucket]);
pProxyCommitData->stats.minComputeNS = std::min<int64_t>(pProxyCommitData->stats.minComputeNS, 1e9 * pProxyCommitData->commitComputePerOperation[self->latencyBucket]);
}
return Void();
}

View File

@ -181,8 +181,11 @@ public:
}
vector<StorageServerInterface> getLastKnownServerInterfaces() const override {
vector<StorageServerInterface> v(servers.size());
for (const auto& server : servers) v.push_back(server->lastKnownInterface);
vector<StorageServerInterface> v;
v.reserve(servers.size());
for (const auto& server : servers) {
v.push_back(server->lastKnownInterface);
}
return v;
}
int size() const override {
@ -3912,13 +3915,17 @@ ACTOR Future<Void> storageServerTracker(
}
}
} catch( Error &e ) {
if (e.code() != error_code_actor_cancelled && errorOut.canBeSet())
errorOut.sendError(e);
state Error err = e;
TraceEvent("StorageServerTrackerCancelled", self->distributorId)
.suppressFor(1.0)
.detail("Primary", self->primary)
.detail("Server", server->id);
throw;
.detail("Server", server->id)
.error(e, /*includeCancelled*/ true);
if (e.code() != error_code_actor_cancelled && errorOut.canBeSet()) {
errorOut.sendError(e);
wait(delay(0)); // Check for cancellation, since errorOut.sendError(e) could delete self
}
throw err;
}
}
@ -4491,7 +4498,7 @@ ACTOR Future<Void> monitorBatchLimitedTime(Reference<AsyncVar<ServerDBInfo>> db,
loop {
wait( delay(SERVER_KNOBS->METRIC_UPDATE_RATE) );
state Reference<GrvProxyInfo> grvProxies(new GrvProxyInfo(db->get().client.grvProxies));
state Reference<GrvProxyInfo> grvProxies(new GrvProxyInfo(db->get().client.grvProxies, false));
choose {
when (wait(db->onChange())) {}
@ -4673,6 +4680,10 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
zeroHealthyTeams.push_back(Reference<AsyncVar<bool>>( new AsyncVar<bool>(true) ));
int storageTeamSize = configuration.storageTeamSize;
// Stored outside of data distribution tracker to avoid slow tasks
// when tracker is cancelled
state KeyRangeMap<ShardTrackedData> shards;
vector<Future<Void>> actors;
if (configuration.usableRegions > 1) {
tcis.push_back(TeamCollectionInterface());
@ -4685,8 +4696,12 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
anyZeroHealthyTeams = zeroHealthyTeams[0];
}
actors.push_back(pollMoveKeysLock(cx, lock, ddEnabledState));
actors.push_back( reportErrorsExcept( dataDistributionTracker( initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics, getShardMetricsList, getAverageShardBytes.getFuture(), readyToStart, anyZeroHealthyTeams, self->ddId ), "DDTracker", self->ddId, &normalDDQueueErrors() ) );
actors.push_back( pollMoveKeysLock(cx, lock, ddEnabledState) );
actors.push_back(reportErrorsExcept(
dataDistributionTracker(initData, cx, output, shardsAffectedByTeamFailure, getShardMetrics,
getShardMetricsList, getAverageShardBytes.getFuture(), readyToStart,
anyZeroHealthyTeams, self->ddId, &shards),
"DDTracker", self->ddId, &normalDDQueueErrors()));
actors.push_back(reportErrorsExcept(
dataDistributionQueue(cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis,
shardsAffectedByTeamFailure, lock, getAverageShardBytes, self->ddId,
@ -4721,9 +4736,9 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self,
self->teamCollection = nullptr;
primaryTeamCollection = Reference<DDTeamCollection>();
remoteTeamCollection = Reference<DDTeamCollection>();
if( e.code() != error_code_movekeys_conflict )
throw err;
bool ddEnabled = wait(isDataDistributionEnabled(cx, ddEnabledState));
wait(shards.clearAsync());
if (err.code() != error_code_movekeys_conflict) throw err;
bool ddEnabled = wait( isDataDistributionEnabled(cx, ddEnabledState) );
TraceEvent("DataDistributionMoveKeysConflict").detail("DataDistributionEnabled", ddEnabled).error(err);
if( ddEnabled )
throw err;

View File

@ -211,21 +211,54 @@ struct InitialDataDistribution : ReferenceCounted<InitialDataDistribution> {
Optional<Key> initHealthyZoneValue;
};
ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> initData, Database cx,
PromiseStream<RelocateShard> output,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> getShardMetrics,
PromiseStream<GetMetricsListRequest> getShardMetricsList,
FutureStream<Promise<int64_t>> getAverageShardBytes,
Promise<Void> readyToStart, Reference<AsyncVar<bool>> zeroHealthyTeams,
UID distributorId);
struct ShardMetrics {
StorageMetrics metrics;
double lastLowBandwidthStartTime;
int shardCount;
bool operator==(ShardMetrics const& rhs) const {
return metrics == rhs.metrics && lastLowBandwidthStartTime == rhs.lastLowBandwidthStartTime &&
shardCount == rhs.shardCount;
}
ShardMetrics(StorageMetrics const& metrics, double lastLowBandwidthStartTime, int shardCount)
: metrics(metrics), lastLowBandwidthStartTime(lastLowBandwidthStartTime), shardCount(shardCount) {}
};
struct ShardTrackedData {
Future<Void> trackShard;
Future<Void> trackBytes;
Reference<AsyncVar<Optional<ShardMetrics>>> stats;
};
ACTOR Future<Void> dataDistributionTracker(
Reference<InitialDataDistribution> initData,
Database cx,
PromiseStream<RelocateShard> output,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> getShardMetrics,
PromiseStream<GetMetricsListRequest> getShardMetricsList,
FutureStream<Promise<int64_t>> getAverageShardBytes,
Promise<Void> readyToStart,
Reference<AsyncVar<bool>> zeroHealthyTeams,
UID distributorId,
KeyRangeMap<ShardTrackedData>* shards);
ACTOR Future<Void> dataDistributionQueue(
Database cx, PromiseStream<RelocateShard> output, FutureStream<RelocateShard> input,
PromiseStream<GetMetricsRequest> getShardMetrics, Reference<AsyncVar<bool>> processingUnhealthy,
vector<TeamCollectionInterface> teamCollection, Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
MoveKeysLock lock, PromiseStream<Promise<int64_t>> getAverageShardBytes, UID distributorId, int teamSize,
int singleRegionTeamSize, double* lastLimited, const DDEnabledState* ddEnabledState);
Database cx,
PromiseStream<RelocateShard> output,
FutureStream<RelocateShard> input,
PromiseStream<GetMetricsRequest> getShardMetrics,
Reference<AsyncVar<bool>> processingUnhealthy,
vector<TeamCollectionInterface> teamCollection,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
MoveKeysLock lock,
PromiseStream<Promise<int64_t>> getAverageShardBytes,
UID distributorId,
int teamSize,
int singleRegionTeamSize,
double* lastLimited,
const DDEnabledState* ddEnabledState);
//Holds the permitted size and IO Bounds for a shard
struct ShardSizeBounds {

View File

@ -95,7 +95,7 @@ class ParallelTCInfo final : public ReferenceCounted<ParallelTCInfo>, public IDa
template <class T>
vector<T> collect(std::function<vector<T>(IDataDistributionTeam const&)> func) const {
vector<T> result(teams.size());
vector<T> result;
for (const auto& team : teams) {
vector<T> newItems = func(*team);

View File

@ -35,18 +35,6 @@ enum BandwidthStatus {
enum ReadBandwidthStatus { ReadBandwidthStatusNormal, ReadBandwidthStatusHigh };
struct ShardMetrics {
StorageMetrics metrics;
double lastLowBandwidthStartTime;
int shardCount;
bool operator == ( ShardMetrics const& rhs ) const {
return metrics == rhs.metrics && lastLowBandwidthStartTime == rhs.lastLowBandwidthStartTime && shardCount == rhs.shardCount;
}
ShardMetrics(StorageMetrics const& metrics, double lastLowBandwidthStartTime, int shardCount) : metrics(metrics), lastLowBandwidthStartTime(lastLowBandwidthStartTime), shardCount(shardCount) {}
};
BandwidthStatus getBandwidthStatus( StorageMetrics const& metrics ) {
if( metrics.bytesPerKSecond > SERVER_KNOBS->SHARD_MAX_BYTES_PER_KSEC )
return BandwidthStatusHigh;
@ -81,16 +69,11 @@ ACTOR Future<Void> updateMaxShardSize( Reference<AsyncVar<int64_t>> dbSizeEstima
}
}
struct ShardTrackedData {
Future<Void> trackShard;
Future<Void> trackBytes;
Reference<AsyncVar<Optional<ShardMetrics>>> stats;
};
struct DataDistributionTracker {
Database cx;
UID distributorId;
KeyRangeMap< ShardTrackedData > shards;
KeyRangeMap<ShardTrackedData>& shards;
ActorCollection sizeChanges;
int64_t systemSizeEstimate;
@ -108,16 +91,19 @@ struct DataDistributionTracker {
// Read hot detection
PromiseStream<KeyRange> readHotShard;
DataDistributionTracker(Database cx, UID distributorId, Promise<Void> const& readyToStart, PromiseStream<RelocateShard> const& output, Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure, Reference<AsyncVar<bool>> anyZeroHealthyTeams)
: cx(cx), distributorId( distributorId ), dbSizeEstimate( new AsyncVar<int64_t>() ), systemSizeEstimate(0),
maxShardSize( new AsyncVar<Optional<int64_t>>() ),
sizeChanges(false), readyToStart(readyToStart), output( output ), shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), anyZeroHealthyTeams(anyZeroHealthyTeams) {}
DataDistributionTracker(Database cx, UID distributorId, Promise<Void> const& readyToStart,
PromiseStream<RelocateShard> const& output,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
Reference<AsyncVar<bool>> anyZeroHealthyTeams, KeyRangeMap<ShardTrackedData>& shards)
: cx(cx), distributorId(distributorId), dbSizeEstimate(new AsyncVar<int64_t>()), systemSizeEstimate(0),
maxShardSize(new AsyncVar<Optional<int64_t>>()), sizeChanges(false), readyToStart(readyToStart), output(output),
shardsAffectedByTeamFailure(shardsAffectedByTeamFailure), anyZeroHealthyTeams(anyZeroHealthyTeams),
shards(shards) {}
~DataDistributionTracker()
{
//Cancel all actors so they aren't waiting on sizeChanged broken promise
sizeChanges.clear(false);
shards.insert( allKeys, ShardTrackedData() );
}
};
@ -363,7 +349,7 @@ ACTOR Future<int64_t> getFirstSize( Reference<AsyncVar<Optional<ShardMetrics>>>
ACTOR Future<Void> changeSizes( DataDistributionTracker* self, KeyRange keys, int64_t oldShardsEndingSize ) {
state vector<Future<int64_t>> sizes;
state vector<Future<int64_t>> systemSizes;
for (auto it : self->shards.intersectingRanges(keys) ) {
for (auto it : self->shards.intersectingRanges(keys)) {
Future<int64_t> thisSize = getFirstSize( it->value().stats );
sizes.push_back( thisSize );
if(it->range().begin >= systemKeys.begin) {
@ -722,7 +708,7 @@ ACTOR Future<Void> shardTracker(
}
void restartShardTrackers(DataDistributionTracker* self, KeyRangeRef keys, Optional<ShardMetrics> startingMetrics) {
auto ranges = self->shards.getAffectedRangesAfterInsertion( keys, ShardTrackedData() );
auto ranges = self->shards.getAffectedRangesAfterInsertion(keys, ShardTrackedData());
for(int i=0; i<ranges.size(); i++) {
if( !ranges[i].value.trackShard.isValid() && ranges[i].begin != keys.begin ) {
// When starting, key space will be full of "dummy" default contructed entries.
@ -780,7 +766,7 @@ ACTOR Future<Void> fetchShardMetrics_impl( DataDistributionTracker* self, GetMet
loop {
Future<Void> onChange;
StorageMetrics returnMetrics;
for( auto t : self->shards.intersectingRanges( req.keys ) ) {
for (auto t : self->shards.intersectingRanges(req.keys)) {
auto &stats = t.value().stats;
if( !stats->get().present() ) {
onChange = stats->onChange();
@ -816,7 +802,6 @@ ACTOR Future<Void> fetchShardMetrics( DataDistributionTracker* self, GetMetricsR
return Void();
}
ACTOR Future<Void> fetchShardMetricsList_impl( DataDistributionTracker* self, GetMetricsListRequest req ) {
try {
loop {
@ -865,19 +850,16 @@ ACTOR Future<Void> fetchShardMetricsList( DataDistributionTracker* self, GetMetr
return Void();
}
ACTOR Future<Void> dataDistributionTracker(
Reference<InitialDataDistribution> initData,
Database cx,
PromiseStream<RelocateShard> output,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> getShardMetrics,
PromiseStream<GetMetricsListRequest> getShardMetricsList,
FutureStream<Promise<int64_t>> getAverageShardBytes,
Promise<Void> readyToStart,
Reference<AsyncVar<bool>> anyZeroHealthyTeams,
UID distributorId)
{
state DataDistributionTracker self(cx, distributorId, readyToStart, output, shardsAffectedByTeamFailure, anyZeroHealthyTeams);
ACTOR Future<Void> dataDistributionTracker(Reference<InitialDataDistribution> initData, Database cx,
PromiseStream<RelocateShard> output,
Reference<ShardsAffectedByTeamFailure> shardsAffectedByTeamFailure,
PromiseStream<GetMetricsRequest> getShardMetrics,
PromiseStream<GetMetricsListRequest> getShardMetricsList,
FutureStream<Promise<int64_t>> getAverageShardBytes,
Promise<Void> readyToStart, Reference<AsyncVar<bool>> anyZeroHealthyTeams,
UID distributorId, KeyRangeMap<ShardTrackedData>* shards) {
state DataDistributionTracker self(cx, distributorId, readyToStart, output, shardsAffectedByTeamFailure,
anyZeroHealthyTeams, *shards);
state Future<Void> loggingTrigger = Void();
state Future<Void> readHotDetect = readHotDetector(&self);
try {
@ -890,10 +872,10 @@ ACTOR Future<Void> dataDistributionTracker(
}
when( wait( loggingTrigger ) ) {
TraceEvent("DDTrackerStats", self.distributorId)
.detail("Shards", self.shards.size())
.detail("TotalSizeBytes", self.dbSizeEstimate->get())
.detail("SystemSizeBytes", self.systemSizeEstimate)
.trackLatest( "DDTrackerStats" );
.detail("Shards", self.shards.size())
.detail("TotalSizeBytes", self.dbSizeEstimate->get())
.detail("SystemSizeBytes", self.systemSizeEstimate)
.trackLatest("DDTrackerStats");
loggingTrigger = delay(SERVER_KNOBS->DATA_DISTRIBUTION_LOGGING_INTERVAL, TaskPriority::FlushTrace);
}

View File

@ -42,8 +42,35 @@ struct GrvProxyStats {
Future<Void> logger;
int recentRequests;
Deque<int> requestBuckets;
double lastBucketBegin;
double bucketInterval;
void updateRequestBuckets() {
while(now() - lastBucketBegin > bucketInterval) {
lastBucketBegin += bucketInterval;
recentRequests -= requestBuckets.front();
requestBuckets.pop_front();
requestBuckets.push_back(0);
}
}
void addRequest(int transactionCount) {
updateRequestBuckets();
recentRequests += transactionCount;
requestBuckets.back() += transactionCount;
}
int getRecentRequests() {
updateRequestBuckets();
return recentRequests/(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE-(lastBucketBegin+bucketInterval-now()));
}
explicit GrvProxyStats(UID id)
: cc("GrvProxyStats", id.toString()), txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc),
: cc("GrvProxyStats", id.toString()), recentRequests(0), lastBucketBegin(now()),
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc),
txnRequestErrors("TxnRequestErrors", cc), txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc),
txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc),
txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc),
@ -55,6 +82,9 @@ struct GrvProxyStats {
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
grvLatencyBands("GRVLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
logger = traceCounters("GrvProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "GrvProxyMetrics");
for(int i = 0; i < FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS; i++) {
requestBuckets.push_back(0);
}
}
};
@ -280,6 +310,7 @@ ACTOR Future<Void> queueGetReadVersionRequests(
req.reply.send(rep);
TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceeded").suppressFor(60);
} else {
stats->addRequest(req.transactionCount);
// TODO: check whether this is reasonable to do in the fast path
for(auto tag : req.tags) {
(*transactionTagCounter)[tag.first] += tag.second;
@ -389,8 +420,13 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(SpanID parentSpan, Grv
rep.version = repFromMaster.version;
rep.locked = repFromMaster.locked;
rep.metadataVersion = repFromMaster.metadataVersion;
rep.processBusyTime = 1e6 * (g_network->isSimulated() ? deterministicRandom()->random01() : g_network->networkInfo.metrics.lastRunLoopBusyness);
rep.processBusyTime =
FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION *
std::min((std::numeric_limits<int>::max() / FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION) - 1,
grvProxyData->stats.getRecentRequests());
rep.processBusyTime += FLOW_KNOBS->BASIC_LOAD_BALANCE_COMPUTE_PRECISION *
(g_network->isSimulated() ? deterministicRandom()->random01()
: g_network->networkInfo.metrics.lastRunLoopBusyness);
if (debugID.present()) {
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "GrvProxyServer.getLiveCommittedVersion.After");

View File

@ -63,8 +63,6 @@ public:
virtual void enableSnapshot() {}
virtual bool canPipelineCommits() const = 0;
/*
Concurrency contract
Causal consistency:

View File

@ -35,10 +35,10 @@ struct KeyValueStoreCompressTestData final : IKeyValueStore {
KeyValueStoreCompressTestData(IKeyValueStore* store) : store(store) {}
bool canPipelineCommits() const override {return false;}
Future<Void> getError() override { return store->getError(); }
Future<Void> onClosed() override { return store->onClosed(); }
void dispose() override {
virtual Future<Void> getError() override { return store->getError(); }
virtual Future<Void> onClosed() override { return store->onClosed(); }
virtual void dispose() override {
store->dispose();
delete this;
}

View File

@ -63,9 +63,7 @@ public:
// IKeyValueStore
KeyValueStoreType getType() const override { return type; }
virtual bool canPipelineCommits() const override { return false; }
std::tuple<size_t, size_t, size_t> getSize() const override { return data.size(); }
virtual std::tuple<size_t, size_t, size_t> getSize() const override { return data.size(); }
int64_t getAvailableSize() const {
int64_t residentSize = data.sumTo(data.end()) + queue.totalSize() + // doesn't account for overhead in queue

View File

@ -285,8 +285,6 @@ struct RocksDBKeyValueStore : IKeyValueStore {
return errorPromise.getFuture();
}
bool canPipelineCommits() const override { return true; }
ACTOR static void doClose(RocksDBKeyValueStore* self, bool deleteOnClose) {
wait(self->readThreads->stop());
auto a = new Writer::CloseAction(self->path, deleteOnClose);

View File

@ -1451,9 +1451,8 @@ public:
Future<Void> getError() override { return delayed(readThreads->getError() || writeThread->getError()); }
Future<Void> onClosed() override { return stopped.getFuture(); }
KeyValueStoreType getType() const override { return type; }
StorageBytes getStorageBytes() const override;
bool canPipelineCommits() const override { return false; }
virtual KeyValueStoreType getType() const override { return type; }
virtual StorageBytes getStorageBytes() const override;
void set(KeyValueRef keyValue, const Arena* arena = nullptr) override;
void clear(KeyRangeRef range, const Arena* arena = nullptr) override;

View File

@ -348,7 +348,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( COMMIT_TRANSACTION_BATCH_COUNT_MAX, 32768 ); if( randomize && BUGGIFY ) COMMIT_TRANSACTION_BATCH_COUNT_MAX = 1000; // Do NOT increase this number beyond 32768, as CommitIds only budget 2 bytes for storing transaction id within each batch
init( COMMIT_BATCHES_MEM_BYTES_HARD_LIMIT, 8LL << 30 ); if (randomize && BUGGIFY) COMMIT_BATCHES_MEM_BYTES_HARD_LIMIT = deterministicRandom()->randomInt64(100LL << 20, 8LL << 30);
init( COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL, 0.5 );
init( COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR, 10.0 );
init( COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR, 5.0 );
// these settings disable batch bytes scaling. Try COMMIT_TRANSACTION_BATCH_BYTES_MAX=1e6, COMMIT_TRANSACTION_BATCH_BYTES_SCALE_BASE=50000, COMMIT_TRANSACTION_BATCH_BYTES_SCALE_POWER=0.5?
init( COMMIT_TRANSACTION_BATCH_BYTES_MIN, 100000 );
@ -368,13 +368,18 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( REQUIRED_MIN_RECOVERY_DURATION, 0.080 ); if( shortRecoveryDuration ) REQUIRED_MIN_RECOVERY_DURATION = 0.01;
init( ALWAYS_CAUSAL_READ_RISKY, false );
init( MAX_COMMIT_UPDATES, 2000 ); if( randomize && BUGGIFY ) MAX_COMMIT_UPDATES = 1;
init( MIN_PROXY_COMPUTE, 0.001 );
init( MAX_PROXY_COMPUTE, 2.0 );
init( MAX_COMPUTE_PER_OPERATION, 0.1 );
init( PROXY_COMPUTE_BUCKETS, 20000 );
init( PROXY_COMPUTE_GROWTH_RATE, 0.01 );
init( TXN_STATE_SEND_AMOUNT, 4 );
init( REPORT_TRANSACTION_COST_ESTIMATION_DELAY, 0.1 );
init( RESET_MASTER_BATCHES, 200 );
init( RESET_RESOLVER_BATCHES, 200 );
init( RESET_MASTER_DELAY, 300.0 );
init( RESET_RESOLVER_DELAY, 300.0 );
// Master Server
// masterCommitter() in the master server will allow lower priority tasks (e.g. DataDistibution)
// by delay()ing for this amount of time between accepted batches of TransactionRequests.
@ -548,7 +553,6 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( FETCH_KEYS_LOWER_PRIORITY, 0 );
init( BUGGIFY_BLOCK_BYTES, 10000 );
init( STORAGE_COMMIT_BYTES, 10000000 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_BYTES = 2000000;
init( STORAGE_COMMIT_PIPELINE_BYTES_PER_YIELD, 100000 );
init( STORAGE_DURABILITY_LAG_REJECT_THRESHOLD, 0.25 );
init( STORAGE_DURABILITY_LAG_MIN_RATE, 0.1 );
init( STORAGE_COMMIT_INTERVAL, 0.5 ); if( randomize && BUGGIFY ) STORAGE_COMMIT_INTERVAL = 2.0;

View File

@ -299,13 +299,18 @@ public:
double REQUIRED_MIN_RECOVERY_DURATION;
bool ALWAYS_CAUSAL_READ_RISKY;
int MAX_COMMIT_UPDATES;
double MIN_PROXY_COMPUTE;
double MAX_PROXY_COMPUTE;
double MAX_COMPUTE_PER_OPERATION;
int PROXY_COMPUTE_BUCKETS;
double PROXY_COMPUTE_GROWTH_RATE;
int TXN_STATE_SEND_AMOUNT;
double REPORT_TRANSACTION_COST_ESTIMATION_DELAY;
int RESET_MASTER_BATCHES;
int RESET_RESOLVER_BATCHES;
double RESET_MASTER_DELAY;
double RESET_RESOLVER_DELAY;
// Master Server
double COMMIT_SLEEP_TIME;
double MIN_BALANCE_TIME;
@ -481,7 +486,6 @@ public:
double STORAGE_DURABILITY_LAG_MIN_RATE;
int STORAGE_COMMIT_BYTES;
double STORAGE_COMMIT_INTERVAL;
int STORAGE_COMMIT_PIPELINE_BYTES_PER_YIELD;
double UPDATE_SHARD_VERSION_INTERVAL;
int BYTE_SAMPLING_FACTOR;
int BYTE_SAMPLING_OVERHEAD;

View File

@ -65,9 +65,24 @@ struct ProxyStats {
Future<Void> logger;
int64_t maxComputeNS;
int64_t minComputeNS;
int64_t getAndResetMaxCompute() {
int64_t r = maxComputeNS;
maxComputeNS = 0;
return r;
}
int64_t getAndResetMinCompute() {
int64_t r = minComputeNS;
minComputeNS = 1e12;
return r;
}
explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion,
int64_t* commitBatchesMemBytesCountPtr)
: cc("ProxyStats", id.toString()),
: cc("ProxyStats", id.toString()), maxComputeNS(0), minComputeNS(1e12),
txnCommitIn("TxnCommitIn", cc), txnCommitVersionAssigned("TxnCommitVersionAssigned", cc),
txnCommitResolving("TxnCommitResolving", cc), txnCommitResolved("TxnCommitResolved", cc),
txnCommitOut("TxnCommitOut", cc), txnCommitOutSuccess("TxnCommitOutSuccess", cc),
@ -84,6 +99,8 @@ struct ProxyStats {
specialCounter(cc, "CommittedVersion", [pCommittedVersion]() { return pCommittedVersion->get(); });
specialCounter(cc, "CommitBatchesMemBytesCount",
[commitBatchesMemBytesCountPtr]() { return *commitBatchesMemBytesCountPtr; });
specialCounter(cc, "MaxCompute", [this](){ return this->getAndResetMaxCompute(); });
specialCounter(cc, "MinCompute", [this](){ return this->getAndResetMinCompute(); });
logger = traceCounters("ProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ProxyMetrics");
}
};
@ -142,6 +159,8 @@ struct ProxyCommitData {
vector<double> commitComputePerOperation;
UIDTransactionTagMap<TransactionCommitCostEstimation> ssTrTagCommitCost;
double lastMasterReset;
double lastResolverReset;
// The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly
// more CPU efficient. When a tag related to a storage server does change, we empty out all of these vectors to
@ -205,7 +224,8 @@ struct ProxyCommitData {
commitBatchInterval(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN), firstProxy(firstProxy),
cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true)), db(db),
singleKeyMutationEvent(LiteralStringRef("SingleKeyMutation")), commitBatchesMemBytesCount(0), lastTxsPop(0),
lastStartCommit(0), lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION), lastCommitTime(0) {
lastStartCommit(0), lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION),
lastCommitTime(0), lastMasterReset(now()), lastResolverReset(now()) {
commitComputePerOperation.resize(SERVER_KNOBS->PROXY_COMPUTE_BUCKETS, 0.0);
}
};

View File

@ -1047,12 +1047,19 @@ static JsonBuilderObject clientStatusFetcher(std::map<NetworkAddress, std::pair<
return clientStatus;
}
ACTOR static Future<JsonBuilderObject> recoveryStateStatusFetcher(WorkerDetails mWorker, int workerCount, std::set<std::string> *incomplete_reasons, int* statusCode) {
ACTOR static Future<JsonBuilderObject> recoveryStateStatusFetcher(Database cx, WorkerDetails mWorker, int workerCount, std::set<std::string> *incomplete_reasons, int* statusCode) {
state JsonBuilderObject message;
state Transaction tr(cx);
try {
state Future<TraceEventFields> activeGens = timeoutError(mWorker.interf.eventLogRequest.getReply( EventLogRequest( LiteralStringRef("MasterRecoveryGenerations") ) ), 1.0);
TraceEventFields md = wait( timeoutError(mWorker.interf.eventLogRequest.getReply( EventLogRequest( LiteralStringRef("MasterRecoveryState") ) ), 1.0) );
state Future<TraceEventFields> mdActiveGensF = timeoutError(mWorker.interf.eventLogRequest.getReply( EventLogRequest( LiteralStringRef("MasterRecoveryGenerations") ) ), 1.0);
state Future<TraceEventFields> mdF = timeoutError(mWorker.interf.eventLogRequest.getReply( EventLogRequest( LiteralStringRef("MasterRecoveryState") ) ), 1.0);
state Future<TraceEventFields> mDBAvailableF = timeoutError(mWorker.interf.eventLogRequest.getReply( EventLogRequest( LiteralStringRef("MasterRecoveryAvailable") ) ), 1.0);
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
state Future<ErrorOr<Version>> rvF = errorOr(timeoutError(tr.getReadVersion(), 1.0));
wait(success(mdActiveGensF) && success(mdF) && success(rvF) && success(mDBAvailableF));
const TraceEventFields& md = mdF.get();
int mStatusCode = md.getInt("StatusCode");
if (mStatusCode < 0 || mStatusCode >= RecoveryStatus::END)
throw attribute_not_found();
@ -1060,6 +1067,18 @@ ACTOR static Future<JsonBuilderObject> recoveryStateStatusFetcher(WorkerDetails
message = JsonString::makeMessage(RecoveryStatus::names[mStatusCode], RecoveryStatus::descriptions[mStatusCode]);
*statusCode = mStatusCode;
ErrorOr<Version> rv = rvF.get();
const TraceEventFields& dbAvailableMsg = mDBAvailableF.get();
if (dbAvailableMsg.size() > 0) {
int64_t availableAtVersion = dbAvailableMsg.getInt64("AvailableAtVersion");
if (!rv.isError()) {
double lastRecoveredSecondsAgo = std::max((int64_t)0, (int64_t)(rv.get() - availableAtVersion)) / (double)SERVER_KNOBS->VERSIONS_PER_SECOND;
message["seconds_since_last_recovered"] = lastRecoveredSecondsAgo;
}
} else {
message["seconds_since_last_recovered"] = -1;
}
// Add additional metadata for certain statuses
if (mStatusCode == RecoveryStatus::recruiting_transaction_servers) {
int requiredLogs = atoi( md.getValue("RequiredTLogs").c_str() );
@ -1079,7 +1098,7 @@ ACTOR static Future<JsonBuilderObject> recoveryStateStatusFetcher(WorkerDetails
// TODO: time_in_recovery: 0.5
// time_in_state: 0.1
TraceEventFields mdActiveGens = wait(activeGens);
const TraceEventFields& mdActiveGens = mdActiveGensF.get();
if(mdActiveGens.size()) {
int activeGenerations = mdActiveGens.getInt("ActiveGenerations");
message["active_generations"] = activeGenerations;
@ -2473,7 +2492,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
// construct status information for cluster subsections
state int statusCode = (int) RecoveryStatus::END;
state JsonBuilderObject recoveryStateStatus = wait(recoveryStateStatusFetcher(mWorker, workers.size(), &status_incomplete_reasons, &statusCode));
state JsonBuilderObject recoveryStateStatus = wait(recoveryStateStatusFetcher(cx, mWorker, workers.size(), &status_incomplete_reasons, &statusCode));
// machine metrics
state WorkerEvents mMetrics = workerEventsVec[0].present() ? workerEventsVec[0].get().first : WorkerEvents();

View File

@ -1472,10 +1472,11 @@ public:
// If the page is still being read then it's not also being written because a write places
// the new content into readFuture when the write is launched, not when it is completed.
// Read/write ordering is being enforced waiting readers will not see the new write. This
// Read/write ordering is being enforced so waiting readers will not see the new write. This
// is necessary for remap erasure to work correctly since the oldest version of a page, located
// at the original page ID, could have a pending read when that version is expired and the write
// of the next newest version over top of the original page begins.
// at the original page ID, could have a pending read when that version is expired (after which
// future reads of the version are not allowed) and the write of the next newest version over top
// of the original page begins.
if (!cacheEntry.initialized()) {
cacheEntry.writeFuture = writePhysicalPage(pageID, data);
} else if (cacheEntry.reading()) {
@ -1718,7 +1719,7 @@ public:
secondType = RemappedPage::NONE;
} else {
secondType = RemappedPage::getTypeOf(nextEntry->second);
secondAfterOldestRetainedVersion = nextEntry->first >= oldestRetainedVersion;
secondAfterOldestRetainedVersion = nextEntry->first > oldestRetainedVersion;
}
} else {
ASSERT(iVersionPagePair->second == invalidLogicalPageID);
@ -1767,6 +1768,10 @@ public:
self->filename.c_str(), p.toString().c_str(), secondType, ::toString(*iVersionPagePair).c_str(), oldestRetainedVersion);
if(copyNewToOriginal) {
if(g_network->isSimulated()) {
ASSERT(self->remapDestinationsSimOnly.count(p.originalPageID) == 0);
self->remapDestinationsSimOnly.insert(p.originalPageID);
}
debug_printf("DWALPager(%s) remapCleanup copy %s\n", self->filename.c_str(), p.toString().c_str());
// Read the data from the page that the original was mapped to
@ -1829,7 +1834,8 @@ public:
state RemappedPage cutoff(oldestRetainedVersion - self->remapCleanupWindow);
// Minimum version we must pop to before obeying stop command.
state Version minStopVersion = cutoff.version - (self->remapCleanupWindow * SERVER_KNOBS->REDWOOD_REMAP_CLEANUP_LAG);
state Version minStopVersion = cutoff.version - (BUGGIFY ? deterministicRandom()->randomInt(0, 10) : (self->remapCleanupWindow * SERVER_KNOBS->REDWOOD_REMAP_CLEANUP_LAG));
self->remapDestinationsSimOnly.clear();
loop {
state Optional<RemappedPage> p = wait(self->remapQueue.pop(cutoff));
@ -2142,6 +2148,7 @@ private:
RemapQueueT remapQueue;
Version remapCleanupWindow;
std::unordered_set<PhysicalPageID> remapDestinationsSimOnly;
struct SnapshotEntry {
Version version;
@ -5683,12 +5690,13 @@ public:
: m_filePrefix(filePrefix), m_concurrentReads(new FlowLock(SERVER_KNOBS->REDWOOD_KVSTORE_CONCURRENT_READS)) {
// TODO: This constructor should really just take an IVersionedStore
int pageSize = BUGGIFY ? deterministicRandom()->randomInt(1000, 4096*4) : SERVER_KNOBS->REDWOOD_DEFAULT_PAGE_SIZE;
int64_t pageCacheBytes = g_network->isSimulated()
? (BUGGIFY ? FLOW_KNOBS->BUGGIFY_SIM_PAGE_CACHE_4K : FLOW_KNOBS->SIM_PAGE_CACHE_4K)
? (BUGGIFY ? deterministicRandom()->randomInt(pageSize, FLOW_KNOBS->BUGGIFY_SIM_PAGE_CACHE_4K) : FLOW_KNOBS->SIM_PAGE_CACHE_4K)
: FLOW_KNOBS->PAGE_CACHE_4K;
Version remapCleanupWindow = BUGGIFY ? deterministicRandom()->randomInt64(0, 1000) : SERVER_KNOBS->REDWOOD_REMAP_CLEANUP_WINDOW;
IPager2* pager = new DWALPager(SERVER_KNOBS->REDWOOD_DEFAULT_PAGE_SIZE, filePrefix, pageCacheBytes, remapCleanupWindow);
IPager2* pager = new DWALPager(pageSize, filePrefix, pageCacheBytes, remapCleanupWindow);
m_tree = new VersionedBTree(pager, filePrefix);
m_init = catchError(init_impl(this));
}
@ -5738,8 +5746,6 @@ public:
KeyValueStoreType getType() const override { return KeyValueStoreType::SSD_REDWOOD_V1; }
bool canPipelineCommits() const override { return true; }
StorageBytes getStorageBytes() const override { return m_tree->getStorageBytes(); }
Future<Void> getError() { return delayed(m_error.getFuture()); };
@ -7237,7 +7243,7 @@ TEST_CASE("!/redwood/correctness/btree") {
shortTest ? 200 : (deterministicRandom()->coinflip() ? 4096 : deterministicRandom()->randomInt(200, 400));
state int64_t targetPageOps = shortTest ? 50000 : 1000000;
state bool pagerMemoryOnly = shortTest && (deterministicRandom()->random01() < .01);
state bool pagerMemoryOnly = shortTest && (deterministicRandom()->random01() < .001);
state int maxKeySize = deterministicRandom()->randomInt(1, pageSize * 2);
state int maxValueSize = randomSize(pageSize * 25);
state int maxCommitSize = shortTest ? 1000 : randomSize(std::min<int>((maxKeySize + maxValueSize) * 20000, 10e6));
@ -7246,10 +7252,9 @@ TEST_CASE("!/redwood/correctness/btree") {
state double clearPostSetProbability = deterministicRandom()->random01() * .1;
state double coldStartProbability = pagerMemoryOnly ? 0 : (deterministicRandom()->random01() * 0.3);
state double advanceOldVersionProbability = deterministicRandom()->random01();
state int64_t cacheSizeBytes =
pagerMemoryOnly ? 2e9 : (BUGGIFY ? deterministicRandom()->randomInt(1, 10 * pageSize) : 0);
state int64_t cacheSizeBytes = pagerMemoryOnly ? 2e9 : (pageSize * deterministicRandom()->randomInt(1, (BUGGIFY ? 2 : 10000) + 1));
state Version versionIncrement = deterministicRandom()->randomInt64(1, 1e8);
state Version remapCleanupWindow = deterministicRandom()->randomInt64(0, versionIncrement * 50);
state Version remapCleanupWindow = BUGGIFY ? 0 : deterministicRandom()->randomInt64(1, versionIncrement * 50);
state int maxVerificationMapEntries = 300e3;
printf("\n");

View File

@ -1277,6 +1277,7 @@ ACTOR Future<Void> trackTlogRecovery( Reference<MasterData> self, Reference<Asyn
TraceEvent("MasterRecoveryState", self->dbgid)
.detail("StatusCode", RecoveryStatus::fully_recovered)
.detail("Status", RecoveryStatus::names[RecoveryStatus::fully_recovered])
.detail("FullyRecoveredAtVersion", self->version)
.trackLatest("MasterRecoveryState");
TraceEvent("MasterRecoveryGenerations", self->dbgid)
@ -1702,6 +1703,10 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
.detail("RecoveryDuration", recoveryDuration)
.trackLatest("MasterRecoveryState");
TraceEvent("MasterRecoveryAvailable", self->dbgid)
.detail("AvailableAtVersion", self->version)
.trackLatest("MasterRecoveryAvailable");
if( self->resolvers.size() > 1 )
self->addActor.send( resolutionBalancing(self) );

View File

@ -152,11 +152,10 @@ struct ShardInfo : ReferenceCounted<ShardInfo>, NonCopyable {
};
struct StorageServerDisk {
explicit StorageServerDisk( struct StorageServer* data, IKeyValueStore* storage ) : data(data), storage(storage), _canPipelineCommits(storage->canPipelineCommits()) {}
explicit StorageServerDisk(struct StorageServer* data, IKeyValueStore* storage) : data(data), storage(storage) {}
void makeNewStorageServerDurable();
// Asyncronously move data from mutation log into SE's commit buffer for next commit.
Future<bool> asyncPrepareVersionsForCommit(Version desiredOldestVersion, Future<Void> durable, Future<Void>durableMinDelay);
bool makeVersionMutationsDurable(Version& prevStorageVersion, Version newStorageVersion, int64_t& bytesLeft);
void makeVersionDurable( Version version );
Future<bool> restoreDurableState();
@ -180,14 +179,11 @@ struct StorageServerDisk {
StorageBytes getStorageBytes() const { return storage->getStorageBytes(); }
std::tuple<size_t, size_t, size_t> getSize() const { return storage->getSize(); }
bool canPipelineCommits() const {return _canPipelineCommits;}
void set(KeyValueRef kv) { storage->set(kv);}
void clear(KeyRangeRef kr) { storage->clear(kr);}
private:
struct StorageServer* data;
IKeyValueStore* storage;
const bool _canPipelineCommits;
void writeMutations(const VectorRef<MutationRef>& mutations, Version debugVersion, const char* debugContext);
ACTOR static Future<Key> readFirstKey( IKeyValueStore* storage, KeyRangeRef range ) {
Standalone<RangeResultRef> r = wait( storage->readRange( range, 1 ) );
@ -3091,67 +3087,79 @@ ACTOR Future<Void> updateStorage(StorageServer* data) {
wait( delay(0, TaskPriority::UpdateStorage) );
state Promise<Void> durableInProgress;
data->durableInProgress = durableInProgress.getFuture();
state Version desiredOldestVersion = data->desiredOldestVersion.get();
state Version startOldestVersion = data->storageVersion();
state Version newOldestVersion = data->storageVersion();
state Version desiredVersion = data->desiredOldestVersion.get();
state int64_t bytesLeft = SERVER_KNOBS->STORAGE_COMMIT_BYTES;
state Future<Void> durableMinDelay = Void();
state Future<Void> durable = Void();
state int64_t ssCommitQuotaBytes;
state Version pendingCommitVersion;
state int64_t bytesWritten = 0;
state bool finalCommit = false;
state bool done = false;
// Write mutations to storage until we reach the desiredVersion or have written too much (bytesleft)
loop {
// Keep making data from mutation log durable, until no data left whose version is <= desiredOldestVersion
pendingCommitVersion = data->storageVersion();
ssCommitQuotaBytes = SERVER_KNOBS->STORAGE_COMMIT_BYTES;
durableInProgress.reset();
data->durableInProgress = durableInProgress.getFuture();
durable = data->storage.commit(); // Commit data up to(inclusive) version pendingCommitVersion
durableMinDelay = delay(SERVER_KNOBS->STORAGE_COMMIT_INTERVAL, TaskPriority::UpdateStorage);
if (finalCommit) {
wait(durable && durableMinDelay);
done = true;
} else {
// Move data start from pendingCommitVersion+1 to SE's commit buffer.
bool _finalCommit = wait(data->storage.asyncPrepareVersionsForCommit(desiredOldestVersion, durable, durableMinDelay));
finalCommit = _finalCommit;
}
debug_advanceMinCommittedVersion( data->thisServerID, pendingCommitVersion );
if(pendingCommitVersion > data->rebootAfterDurableVersion) {
TraceEvent("RebootWhenDurableTriggered", data->thisServerID).detail("PendingCommitVersion", pendingCommitVersion).detail("RebootAfterDurableVersion", data->rebootAfterDurableVersion);
// To avoid brokenPromise error, which is caused by the sender of the durableInProgress (i.e., this process)
// never sets durableInProgress, we should set durableInProgress before send the please_reboot() error.
// Otherwise, in the race situation when storage server receives both reboot and
// brokenPromise of durableInProgress, the worker of the storage server will die.
// We will eventually end up with no worker for storage server role.
// The data distributor's buildTeam() will get stuck in building a team
durableInProgress.sendError(please_reboot());
throw please_reboot();
}
durableInProgress.send(Void());
wait( delay(0, TaskPriority::UpdateStorage) ); //Setting durableInProgess could cause the storage server to shut down, so delay to check for cancellation
// Taking and releasing the durableVersionLock ensures that no eager reads both begin before the commit was effective and
// are applied after we change the durable version. Also ensure that we have to lock while calling changeDurableVersion,
// because otherwise the latest version of mutableData might be partially loaded.
wait( data->durableVersionLock.take() );
data->popVersion( data->durableVersion.get() + 1 );
// Update durableVersion to pendingCommitVersion, which has been committed.
while (!changeDurableVersion( data, pendingCommitVersion )) {
if(g_network->check_yield(TaskPriority::UpdateStorage)) {
data->durableVersionLock.release();
wait(delay(0, TaskPriority::UpdateStorage));
wait( data->durableVersionLock.take() );
}
}
data->durableVersionLock.release();
state bool done = data->storage.makeVersionMutationsDurable(newOldestVersion, desiredVersion, bytesLeft);
// We want to forget things from these data structures atomically with changing oldestVersion (and "before",
// since oldestVersion.set() may trigger waiting actors) forgetVersionsBeforeAsync visibly forgets
// immediately (without waiting) but asynchronously frees memory.
Future<Void> finishedForgetting =
data->mutableData().forgetVersionsBeforeAsync(newOldestVersion, TaskPriority::UpdateStorage);
data->oldestVersion.set(newOldestVersion);
wait(finishedForgetting);
wait(yield(TaskPriority::UpdateStorage));
if (done) break;
}
// Set the new durable version as part of the outstanding change set, before commit
if (startOldestVersion != newOldestVersion) data->storage.makeVersionDurable(newOldestVersion);
debug_advanceMaxCommittedVersion(data->thisServerID, newOldestVersion);
state Future<Void> durable = data->storage.commit();
state Future<Void> durableDelay = Void();
if (bytesLeft > 0) {
durableDelay = delay(SERVER_KNOBS->STORAGE_COMMIT_INTERVAL, TaskPriority::UpdateStorage);
}
wait(durable);
debug_advanceMinCommittedVersion(data->thisServerID, newOldestVersion);
if (newOldestVersion > data->rebootAfterDurableVersion) {
TraceEvent("RebootWhenDurableTriggered", data->thisServerID)
.detail("NewOldestVersion", newOldestVersion)
.detail("RebootAfterDurableVersion", data->rebootAfterDurableVersion);
// To avoid brokenPromise error, which is caused by the sender of the durableInProgress (i.e., this process)
// never sets durableInProgress, we should set durableInProgress before send the please_reboot() error.
// Otherwise, in the race situation when storage server receives both reboot and
// brokenPromise of durableInProgress, the worker of the storage server will die.
// We will eventually end up with no worker for storage server role.
// The data distributor's buildTeam() will get stuck in building a team
durableInProgress.sendError(please_reboot());
throw please_reboot();
}
durableInProgress.send(Void());
wait(delay(0, TaskPriority::UpdateStorage)); // Setting durableInProgess could cause the storage server to shut
// down, so delay to check for cancellation
// Taking and releasing the durableVersionLock ensures that no eager reads both begin before the commit was
// effective and are applied after we change the durable version. Also ensure that we have to lock while calling
// changeDurableVersion, because otherwise the latest version of mutableData might be partially loaded.
wait(data->durableVersionLock.take());
data->popVersion(data->durableVersion.get() + 1);
while (!changeDurableVersion(data, newOldestVersion)) {
if (g_network->check_yield(TaskPriority::UpdateStorage)) {
data->durableVersionLock.release();
wait(delay(0, TaskPriority::UpdateStorage));
wait(data->durableVersionLock.take());
}
}
data->durableVersionLock.release();
//TraceEvent("StorageServerDurable", data->thisServerID).detail("Version", newOldestVersion);
wait(durableDelay);
}
}
@ -3224,101 +3232,37 @@ void StorageServerDisk::writeMutation( MutationRef mutation ) {
ASSERT(false);
}
ACTOR Future<int64_t> asyncWriteMutationsToCommitBuffer(StorageServer* data, VectorRef<MutationRef> mutations, Version debugVersion, const char* debugContext, int64_t ssCommitQuotaBytes) {
state int bytesWritten = 0;
state int i = 0;
for (;i < mutations.size(); i++) {
const auto& m = mutations[i];
void StorageServerDisk::writeMutations(const VectorRef<MutationRef>& mutations, Version debugVersion,
const char* debugContext) {
for (const auto& m : mutations) {
DEBUG_MUTATION(debugContext, debugVersion, m).detail("UID", data->thisServerID);
if (m.type == MutationRef::SetValue) {
data->storage.set(KeyValueRef(m.param1, m.param2));
storage->set(KeyValueRef(m.param1, m.param2));
} else if (m.type == MutationRef::ClearRange) {
data->storage.clear(KeyRangeRef(m.param1, m.param2));
}
auto mutationBytes = mvccStorageBytes(m);
bytesWritten += mutationBytes;
ssCommitQuotaBytes -= mutationBytes;
if (data->storage.canPipelineCommits() && bytesWritten >= SERVER_KNOBS->STORAGE_COMMIT_PIPELINE_BYTES_PER_YIELD) {
bytesWritten = 0;
wait(yield());
storage->clear(KeyRangeRef(m.param1, m.param2));
}
}
return ssCommitQuotaBytes;
}
ACTOR Future<bool> asyncPrepareVersionsForCommit_impl(StorageServerDisk* self, StorageServer* data, Version desiredOldestVersion, Future<Void> durable, Future<Void>durableMinDelay) {
state int64_t ssCommitQuotaBytes = SERVER_KNOBS->STORAGE_COMMIT_BYTES;
state bool finalCommit = false;
state Version startOldestVersion = data->storageVersion();
state Version newOldestVersion = data->storageVersion();
state SignalableActorCollection forgetter;
loop {
// While committing previously written data, keep writting new data from later versions until
// 1.) commit is done, or
// 2.) ssCommitQuotaBytes <= 0, or
// 3.) no data in mutation log to write.
if (!data->storage.canPipelineCommits()) {
// Don't write version data while a commit is going on if the storage engine does not support pipelining
wait(durable && durableMinDelay);
}
state Future<Void> stopEarly = data->storage.canPipelineCommits() ? (durable && durableMinDelay) : Never();
// Apply mutations from the mutationLog
auto u = data->getMutationLog().upper_bound(newOldestVersion);
if (u != data->getMutationLog().end() && u->first <= desiredOldestVersion) {
VerUpdateRef const& v = u->second;
newOldestVersion = v.version;
ASSERT( newOldestVersion > data->storageVersion() && newOldestVersion <= desiredOldestVersion );
// TODO(alexmiller): Update to version tracking.
DEBUG_KEY_RANGE("asyncPrepareVersionsForCommit", newOldestVersion, KeyRangeRef());
int64_t _ssCommitQuotaBytes = wait(asyncWriteMutationsToCommitBuffer(data, v.mutations, newOldestVersion, "asyncPrepareVersionsForCommit", ssCommitQuotaBytes));
ssCommitQuotaBytes = _ssCommitQuotaBytes;
bool StorageServerDisk::makeVersionMutationsDurable(Version& prevStorageVersion, Version newStorageVersion,
int64_t& bytesLeft) {
if (bytesLeft <= 0) return true;
// We want to forget things from these data structures atomically with changing oldestVersion (and "before", since oldestVersion.set() may trigger waiting actors)
// forgetVersionsBeforeAsync visibly forgets immediately (without waiting) but asynchronously frees memory.
forgetter.add(data->mutableData().forgetVersionsBeforeAsync( newOldestVersion, TaskPriority::UpdateStorage ));
data->oldestVersion.set( newOldestVersion );
if (ssCommitQuotaBytes <= 0) {
// No quota left. Wait for previous commit to finish.
wait(durable && durableMinDelay);
break;
}
if (stopEarly.isReady()) {
// Previous commit is done.
if (stopEarly.isError()) {
throw stopEarly.getError();
}
break;
}
} else {
// Since there is no data in mutation log, in order to make progress,
// advance it to desiredOldestVersion directly
newOldestVersion = desiredOldestVersion;
// We want to forget things from these data structures atomically with changing oldestVersion (and "before", since oldestVersion.set() may trigger waiting actors)
// forgetVersionsBeforeAsync visibly forgets immediately (without waiting) but asynchronously frees memory.
forgetter.add(data->mutableData().forgetVersionsBeforeAsync( newOldestVersion, TaskPriority::UpdateStorage ));
data->oldestVersion.set( newOldestVersion );
// No more data in mutation log can be written.
finalCommit = true;
// Wait the previously written data to be committed
wait(durable && durableMinDelay);
break;
}
// Apply mutations from the mutationLog
auto u = data->getMutationLog().upper_bound(prevStorageVersion);
if (u != data->getMutationLog().end() && u->first <= newStorageVersion) {
VerUpdateRef const& v = u->second;
ASSERT(v.version > prevStorageVersion && v.version <= newStorageVersion);
// TODO(alexmiller): Update to version tracking.
DEBUG_KEY_RANGE("makeVersionMutationsDurable", v.version, KeyRangeRef());
writeMutations(v.mutations, v.version, "makeVersionDurable");
for (const auto& m : v.mutations) bytesLeft -= mvccStorageBytes(m);
prevStorageVersion = v.version;
return false;
} else {
prevStorageVersion = newStorageVersion;
return true;
}
if (newOldestVersion > startOldestVersion){
// Set the new durable version as part of the outstanding change set, before commit
data->storage.makeVersionDurable( newOldestVersion );
}
debug_advanceMaxCommittedVersion(data->thisServerID, newOldestVersion);
wait(forgetter.signal());
return finalCommit;
}
Future<bool> StorageServerDisk::asyncPrepareVersionsForCommit(Version desiredOldestVersion, Future<Void> durable, Future<Void>durableMinDelay) {
return asyncPrepareVersionsForCommit_impl(this, data, desiredOldestVersion, durable, durableMinDelay);
}
// Update data->storage to persist the changes from (data->storageVersion(),version]
@ -4059,7 +4003,7 @@ ACTOR Future<Void> replaceInterface( StorageServer* self, StorageServerInterface
loop {
state Future<Void> infoChanged = self->db->onChange();
state Reference<CommitProxyInfo> commitProxies(new CommitProxyInfo(self->db->get().client.commitProxies));
state Reference<CommitProxyInfo> commitProxies(new CommitProxyInfo(self->db->get().client.commitProxies, false));
choose {
when(GetStorageServerRejoinInfoReply _rep =
wait(commitProxies->size()

View File

@ -423,7 +423,11 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
for (size_t j = 0; j < key_size; ++j)
skey.append(1, (char) deterministicRandom()->randomInt(0, 256));
return Key(skey);
// 15% (= 20% * 75%) of the time generating keys after \xff\xff to test special keys code
if (deterministicRandom()->random01() < 0.2)
return Key(skey).withPrefix(specialKeys.begin);
else
return Key(skey);
}
static Value makeValue() {
@ -672,7 +676,8 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
limit = deterministicRandom()->randomInt(0, INT_MAX)+1;
}
bool isSpecialKeyRange = specialKeys.contains(keysel1.getKey()) && keysel2.getKey() <= specialKeys.end;
bool isSpecialKeyRange = specialKeys.contains(keysel1.getKey()) && specialKeys.begin <= keysel2.getKey() &&
keysel2.getKey() <= specialKeys.end;
contract = {
std::make_pair(error_code_range_limits_invalid, ExceptionContract::possibleButRequiredIf(limit < 0)),
@ -710,7 +715,8 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
keysel2 = makeKeySel();
limits = makeRangeLimits();
bool isSpecialKeyRange = specialKeys.contains(keysel1.getKey()) && keysel2.getKey() <= specialKeys.end;
bool isSpecialKeyRange = specialKeys.contains(keysel1.getKey()) && specialKeys.begin <= keysel2.getKey() &&
keysel2.getKey() <= specialKeys.end;
contract = {
std::make_pair(error_code_range_limits_invalid,
@ -760,7 +766,7 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
limit = deterministicRandom()->randomInt(0, INT_MAX)+1;
}
bool isSpecialKeyRange = specialKeys.contains(key1) && key2 <= specialKeys.end;
bool isSpecialKeyRange = specialKeys.contains(key1) && specialKeys.begin <= key2 && key2 <= specialKeys.end;
contract = {
std::make_pair(error_code_inverted_range, ExceptionContract::requiredIf(key1 > key2)),
@ -800,7 +806,7 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
key2 = makeKey();
limits = makeRangeLimits();
bool isSpecialKeyRange = specialKeys.contains(key1) && key2 <= specialKeys.end;
bool isSpecialKeyRange = specialKeys.contains(key1) && specialKeys.begin <= key2 && key2 <= specialKeys.end;
contract = {
std::make_pair(error_code_inverted_range, ExceptionContract::requiredIf(key1 > key2)),

View File

@ -0,0 +1,117 @@
/*
* RestoreBackup.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/FDBTypes.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbrpc/simulator.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/BackupContainer.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct RestoreBackupWorkload final : TestWorkload {
FileBackupAgent backupAgent;
Reference<IBackupContainer> backupContainer;
Standalone<StringRef> backupDir;
Standalone<StringRef> tag;
double delayFor;
bool stopWhenDone;
RestoreBackupWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
backupDir = getOption(options, LiteralStringRef("backupDir"), LiteralStringRef("file://simfdb/backups/"));
tag = getOption(options, LiteralStringRef("tag"), LiteralStringRef("default"));
delayFor = getOption(options, LiteralStringRef("delayFor"), 10.0);
stopWhenDone = getOption(options, LiteralStringRef("stopWhenDone"), false);
}
static constexpr const char* DESCRIPTION = "RestoreBackup";
ACTOR static Future<Void> waitOnBackup(RestoreBackupWorkload* self, Database cx) {
state Version waitForVersion;
state UID backupUID;
state Transaction tr(cx);
loop {
try {
Version v = wait(tr.getReadVersion());
waitForVersion = v;
break;
} catch (Error& e) {
wait(tr.onError(e));
}
}
EBackupState backupState = wait(self->backupAgent.waitBackup(cx, self->tag.toString(), self->stopWhenDone,
&self->backupContainer, &backupUID));
if (backupState == EBackupState::STATE_COMPLETED) {
return Void();
} else if (backupState == EBackupState::STATE_RUNNING_DIFFERENTIAL) {
ASSERT(!self->stopWhenDone);
loop {
BackupDescription desc = wait(self->backupContainer->describeBackup(true));
TraceEvent("BackupVersionGate")
.detail("MaxLogEndVersion", desc.maxLogEnd.present() ? desc.maxLogEnd.get() : invalidVersion)
.detail("ContiguousLogEndVersion",
desc.contiguousLogEnd.present() ? desc.contiguousLogEnd.get() : invalidVersion)
.detail("TargetVersion", waitForVersion);
if (desc.contiguousLogEnd.present() && desc.contiguousLogEnd.get() >= waitForVersion) {
wait(self->backupAgent.discontinueBackup(cx, self->tag));
return Void();
}
wait(delay(5.0));
}
} else {
TraceEvent(SevError, "BadBackupState").detail("BackupState", BackupAgentBase::getStateText(backupState));
ASSERT(false);
return Void();
}
}
ACTOR static Future<Void> clearDatabase(Database cx) {
// TODO: Batch to avoid large clear ranges?
state Transaction tr(cx);
loop {
try {
tr.clear(normalKeys);
wait(tr.commit());
return Void();
} catch (Error& e) {
wait(tr.onError(e));
}
}
}
ACTOR static Future<Void> _start(RestoreBackupWorkload* self, Database cx) {
wait(delay(self->delayFor));
wait(waitOnBackup(self, cx));
wait(clearDatabase(cx));
wait(success(self->backupAgent.restore(cx, cx, self->tag, Key(self->backupContainer->getURL()), true,
::invalidVersion, true)));
return Void();
}
std::string description() const override { return DESCRIPTION; }
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(Database const& cx) override { return clientId ? Void() : _start(this, cx); }
Future<bool> check(Database const& cx) override { return true; }
void getMetrics(vector<PerfMetric>& m) override {}
};
WorkloadFactory<RestoreBackupWorkload> RestoreBackupWorkloadFactory(RestoreBackupWorkload::DESCRIPTION);

View File

@ -420,6 +420,18 @@ struct SpecialKeySpaceCorrectnessWorkload : TestWorkload {
ASSERT(e.code() == error_code_special_keys_cross_module_clear);
tx->reset();
}
// base key of the end key selector not in (\xff\xff, \xff\xff\xff), throw key_outside_legal_range()
try {
const KeySelector startKeySelector = KeySelectorRef(LiteralStringRef("\xff\xff/test"), true, -200);
const KeySelector endKeySelector = KeySelectorRef(LiteralStringRef("test"), true, -10);
Standalone<RangeResultRef> result =
wait(tx->getRange(startKeySelector, endKeySelector, GetRangeLimits(CLIENT_KNOBS->TOO_MANY)));
ASSERT(false);
} catch (Error& e) {
if (e.code() == error_code_actor_cancelled) throw;
ASSERT(e.code() == error_code_key_outside_legal_range);
tx->reset();
}
return Void();
}

View File

@ -0,0 +1,74 @@
/*
* SubmitBackup.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbclient/FDBTypes.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbrpc/simulator.h"
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/BackupContainer.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct SubmitBackupWorkload final : TestWorkload {
FileBackupAgent backupAgent;
Standalone<StringRef> backupDir;
Standalone<StringRef> tag;
double delayFor;
int snapshotInterval;
bool stopWhenDone;
bool incremental;
SubmitBackupWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
backupDir = getOption(options, LiteralStringRef("backupDir"), LiteralStringRef("file://simfdb/backups/"));
tag = getOption(options, LiteralStringRef("tag"), LiteralStringRef("default"));
delayFor = getOption(options, LiteralStringRef("delayFor"), 10.0);
snapshotInterval = getOption(options, LiteralStringRef("snapshotInterval"), 1e8);
stopWhenDone = getOption(options, LiteralStringRef("stopWhenDone"), true);
incremental = getOption(options, LiteralStringRef("incremental"), false);
}
static constexpr const char* DESCRIPTION = "SubmitBackup";
ACTOR static Future<Void> _start(SubmitBackupWorkload* self, Database cx) {
wait(delay(self->delayFor));
Standalone<VectorRef<KeyRangeRef>> backupRanges;
backupRanges.push_back_deep(backupRanges.arena(), normalKeys);
try {
wait(self->backupAgent.submitBackup(cx, self->backupDir, self->snapshotInterval, self->tag.toString(),
backupRanges, self->stopWhenDone, false, self->incremental));
} catch (Error& e) {
TraceEvent("BackupSubmitError").error(e);
if (e.code() != error_code_backup_duplicate) {
throw;
}
}
return Void();
}
std::string description() const override { return DESCRIPTION; }
Future<Void> setup(Database const& cx) override { return Void(); }
Future<Void> start(Database const& cx) override { return clientId ? Void() : _start(this, cx); }
Future<bool> check(Database const& cx) override { return true; }
void getMetrics(vector<PerfMetric>& m) override {}
};
WorkloadFactory<SubmitBackupWorkload> SubmitBackupWorkloadFactory(SubmitBackupWorkload::DESCRIPTION);

View File

@ -162,7 +162,17 @@ struct VersionStampWorkload : TestWorkload {
state ReadYourWritesTransaction tr(cx);
// We specifically wish to grab the smalles read version that we can get and maintain it, to
// have the strictest check we can on versionstamps monotonically increasing.
state Version readVersion = wait(tr.getReadVersion());
state Version readVersion;
loop {
try {
Version _readVersion = wait(tr.getReadVersion());
readVersion = _readVersion;
break;
}
catch(Error &e) {
wait(tr.onError(e));
}
}
if(BUGGIFY) {
if(deterministicRandom()->random01() < 0.5) {

View File

@ -107,14 +107,14 @@ ErrorCodeTable& Error::errorCodeTable() {
}
const char* Error::name() const {
auto table = errorCodeTable();
const auto& table = errorCodeTable();
auto it = table.find(error_code);
if (it == table.end()) return "UNKNOWN_ERROR";
return it->second.first;
}
const char* Error::what() const {
auto table = errorCodeTable();
const auto& table = errorCodeTable();
auto it = table.find(error_code);
if (it == table.end()) return "UNKNOWN_ERROR";
return it->second.second;

View File

@ -503,6 +503,8 @@ public:
Map(Map&& r) noexcept : set(std::move(r.set)) {}
void operator=(Map&& r) noexcept { set = std::move(r.set); }
Future<Void> clearAsync();
private:
Map( Map<Key,Value,Pair> const& ); // unimplemented
void operator=( Map<Key,Value,Pair> const& ); // unimplemented
@ -1311,4 +1313,9 @@ Future<Void> IndexedSet<T, Metric>::eraseAsync(typename IndexedSet<T,Metric>::it
return uncancellable(ISFreeNodes(toFree, false));
}
template <class Key, class Value, class Pair, class Metric>
Future<Void> Map<Key, Value, Pair, Metric>::clearAsync() {
return set.eraseAsync(set.begin(), set.end());
}
#endif

View File

@ -84,6 +84,8 @@ void FlowKnobs::initialize(bool randomize, bool isSimulated) {
init( TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT, 20.0 );
init( PEER_UNAVAILABLE_FOR_LONG_TIME_TIMEOUT, 3600.0 );
init( INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING, 5.0 );
init( PING_LOGGING_INTERVAL, 3.0 );
init( PING_SAMPLE_AMOUNT, 100 );
init( TLS_CERT_REFRESH_DELAY_SECONDS, 12*60*60 );
init( TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT, 9.0 );
@ -216,7 +218,10 @@ void FlowKnobs::initialize(bool randomize, bool isSimulated) {
init( BASIC_LOAD_BALANCE_UPDATE_RATE, 10.0 ); //should be longer than the rate we log network metrics
init( BASIC_LOAD_BALANCE_MAX_CHANGE, 0.10 );
init( BASIC_LOAD_BALANCE_MAX_PROB, 2.0 );
init( BASIC_LOAD_BALANCE_MIN_AMOUNT, 50000 ); //Will not update probabilities if the average proxy busyness is less than 5%
init( BASIC_LOAD_BALANCE_MIN_REQUESTS, 20 ); //do not adjust LB probabilities if the proxies are less than releasing less than 20 transactions per second
init( BASIC_LOAD_BALANCE_MIN_CPU, 0.05 ); //do not adjust LB probabilities if the proxies are less than 5% utilized
init( BASIC_LOAD_BALANCE_BUCKETS, 40 ); //proxies bin recent GRV requests into 40 time bins
init( BASIC_LOAD_BALANCE_COMPUTE_PRECISION, 10000 ); //determines how much of the LB usage is holding the CPU usage of the proxy
// Health Monitor
init( FAILURE_DETECTION_DELAY, 4.0 ); if( randomize && BUGGIFY ) FAILURE_DETECTION_DELAY = 1.0;

View File

@ -98,6 +98,8 @@ public:
double ALWAYS_ACCEPT_DELAY;
int ACCEPT_BATCH_SIZE;
double INCOMPATIBLE_PEER_DELAY_BEFORE_LOGGING;
double PING_LOGGING_INTERVAL;
int PING_SAMPLE_AMOUNT;
int TLS_CERT_REFRESH_DELAY_SECONDS;
double TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT;
@ -235,7 +237,10 @@ public:
double BASIC_LOAD_BALANCE_UPDATE_RATE;
double BASIC_LOAD_BALANCE_MAX_CHANGE;
double BASIC_LOAD_BALANCE_MAX_PROB;
double BASIC_LOAD_BALANCE_MIN_AMOUNT;
int BASIC_LOAD_BALANCE_BUCKETS;
int BASIC_LOAD_BALANCE_COMPUTE_PRECISION;
double BASIC_LOAD_BALANCE_MIN_REQUESTS;
double BASIC_LOAD_BALANCE_MIN_CPU;
// Health Monitor
int FAILURE_DETECTION_DELAY;

View File

@ -629,30 +629,10 @@ public:
init();
}
ACTOR static void doAcceptHandshake( Reference<SSLConnection> self, Promise<Void> connected) {
state std::pair<IPAddress,uint16_t> peerIP;
ACTOR static void doAcceptHandshake( Reference<SSLConnection> self, Promise<Void> connected ) {
state Hold<int> holder;
try {
peerIP = std::make_pair(self->getPeerAddress().ip, static_cast<uint16_t>(0));
auto iter(g_network->networkInfo.serverTLSConnectionThrottler.find(peerIP));
if(iter != g_network->networkInfo.serverTLSConnectionThrottler.end()) {
if (now() < iter->second.second) {
if(iter->second.first >= FLOW_KNOBS->TLS_SERVER_CONNECTION_THROTTLE_ATTEMPTS) {
TraceEvent("TLSIncomingConnectionThrottlingWarning").suppressFor(1.0).detail("PeerIP", peerIP.first.toString());
wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT));
self->closeSocket();
connected.sendError(connection_failed());
return;
}
} else {
g_network->networkInfo.serverTLSConnectionThrottler.erase(peerIP);
}
}
wait(g_network->networkInfo.handshakeLock->take());
state FlowLock::Releaser releaser(*g_network->networkInfo.handshakeLock);
Future<Void> onHandshook;
// If the background handshakers are not all busy, use one
@ -672,24 +652,51 @@ public:
wait(delay(0, TaskPriority::Handshake));
connected.send(Void());
} catch (...) {
auto iter(g_network->networkInfo.serverTLSConnectionThrottler.find(peerIP));
if(iter != g_network->networkInfo.serverTLSConnectionThrottler.end()) {
iter->second.first++;
} else {
g_network->networkInfo.serverTLSConnectionThrottler[peerIP] = std::make_pair(0,now() + FLOW_KNOBS->TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT);
}
self->closeSocket();
connected.sendError(connection_failed());
}
}
ACTOR static Future<Void> acceptHandshakeWrapper( Reference<SSLConnection> self ) {
state std::pair<IPAddress,uint16_t> peerIP;
peerIP = std::make_pair(self->getPeerAddress().ip, static_cast<uint16_t>(0));
auto iter(g_network->networkInfo.serverTLSConnectionThrottler.find(peerIP));
if(iter != g_network->networkInfo.serverTLSConnectionThrottler.end()) {
if (now() < iter->second.second) {
if(iter->second.first >= FLOW_KNOBS->TLS_SERVER_CONNECTION_THROTTLE_ATTEMPTS) {
TraceEvent("TLSIncomingConnectionThrottlingWarning").suppressFor(1.0).detail("PeerIP", peerIP.first.toString());
wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT));
self->closeSocket();
throw connection_failed();
}
} else {
g_network->networkInfo.serverTLSConnectionThrottler.erase(peerIP);
}
}
wait(g_network->networkInfo.handshakeLock->take());
state FlowLock::Releaser releaser(*g_network->networkInfo.handshakeLock);
Promise<Void> connected;
doAcceptHandshake(self, connected);
try {
wait(connected.getFuture());
return Void();
choose {
when(wait(connected.getFuture())) {
return Void();
}
when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) {
throw connection_failed();
}
}
} catch (Error& e) {
if(e.code() != error_code_actor_cancelled) {
auto iter(g_network->networkInfo.serverTLSConnectionThrottler.find(peerIP));
if(iter != g_network->networkInfo.serverTLSConnectionThrottler.end()) {
iter->second.first++;
} else {
g_network->networkInfo.serverTLSConnectionThrottler[peerIP] = std::make_pair(0,now() + FLOW_KNOBS->TLS_SERVER_CONNECTION_THROTTLE_TIMEOUT);
}
}
// Either the connection failed, or was cancelled by the caller
self->closeSocket();
throw;
@ -704,9 +711,6 @@ public:
state Hold<int> holder;
try {
wait(g_network->networkInfo.handshakeLock->take());
state FlowLock::Releaser releaser(*g_network->networkInfo.handshakeLock);
Future<Void> onHandshook;
// If the background handshakers are not all busy, use one
if(N2::g_net2->sslPoolHandshakesInProgress < N2::g_net2->sslHandshakerThreadsStarted) {
@ -725,26 +729,37 @@ public:
wait(delay(0, TaskPriority::Handshake));
connected.send(Void());
} catch (...) {
std::pair<IPAddress,uint16_t> peerIP = std::make_pair(self->peer_address.ip, self->peer_address.port);
auto iter(g_network->networkInfo.serverTLSConnectionThrottler.find(peerIP));
if(iter != g_network->networkInfo.serverTLSConnectionThrottler.end()) {
iter->second.first++;
} else {
g_network->networkInfo.serverTLSConnectionThrottler[peerIP] = std::make_pair(0,now() + FLOW_KNOBS->TLS_CLIENT_CONNECTION_THROTTLE_TIMEOUT);
}
self->closeSocket();
connected.sendError(connection_failed());
}
}
ACTOR static Future<Void> connectHandshakeWrapper( Reference<SSLConnection> self ) {
wait(g_network->networkInfo.handshakeLock->take());
state FlowLock::Releaser releaser(*g_network->networkInfo.handshakeLock);
Promise<Void> connected;
doConnectHandshake(self, connected);
try {
wait(connected.getFuture());
return Void();
choose {
when(wait(connected.getFuture())) {
return Void();
}
when(wait(delay(FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT))) {
throw connection_failed();
}
}
} catch (Error& e) {
// Either the connection failed, or was cancelled by the caller
if(e.code() != error_code_actor_cancelled) {
std::pair<IPAddress,uint16_t> peerIP = std::make_pair(self->peer_address.ip, self->peer_address.port);
auto iter(g_network->networkInfo.serverTLSConnectionThrottler.find(peerIP));
if(iter != g_network->networkInfo.serverTLSConnectionThrottler.end()) {
iter->second.first++;
} else {
g_network->networkInfo.serverTLSConnectionThrottler[peerIP] = std::make_pair(0,now() + FLOW_KNOBS->TLS_CLIENT_CONNECTION_THROTTLE_TIMEOUT);
}
}
self->closeSocket();
throw;
}

View File

@ -163,6 +163,7 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta
}
for (auto const& itr : loggedDurations) {
// PriorityBusyX measures the amount of time spent busy at exactly priority X
n.detail(format("PriorityBusy%d", itr.first).c_str(), itr.second);
}
@ -174,6 +175,9 @@ SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *sta
itr.windowedTimer = now();
}
// PriorityStarvedBelowX: how much of the elapsed time we were running tasks at a priority at or above X
// PriorityMaxStarvedBelowX: The longest single span of time that you were starved below that priority,
// which could tell you if you are doing work in bursts.
n.detail(format("PriorityStarvedBelow%d", itr.priority).c_str(), std::min(currentStats.elapsed, itr.duration));
n.detail(format("PriorityMaxStarvedBelow%d", itr.priority).c_str(), itr.maxDuration);

View File

@ -89,6 +89,7 @@ struct SuppressionMap {
}
};
#define TRACE_BATCH_IMPLICIT_SEVERITY SevInfo
TraceBatch g_traceBatch;
std::atomic<trace_clock_t> g_trace_clock{ TRACE_CLOCK_NOW };
@ -1140,6 +1141,9 @@ bool TraceBatch::dumpImmediately() {
}
void TraceBatch::addEvent( const char *name, uint64_t id, const char *location ) {
if(FLOW_KNOBS->MIN_TRACE_SEVERITY > TRACE_BATCH_IMPLICIT_SEVERITY) {
return;
}
auto& eventInfo = eventBatch.emplace_back(EventInfo(TraceEvent::getCurrentTime(), name, id, location));
if (dumpImmediately())
dump();
@ -1148,6 +1152,9 @@ void TraceBatch::addEvent( const char *name, uint64_t id, const char *location )
}
void TraceBatch::addAttach( const char *name, uint64_t id, uint64_t to ) {
if(FLOW_KNOBS->MIN_TRACE_SEVERITY > TRACE_BATCH_IMPLICIT_SEVERITY) {
return;
}
auto& attachInfo = attachBatch.emplace_back(AttachInfo(TraceEvent::getCurrentTime(), name, id, to));
if (dumpImmediately())
dump();
@ -1156,6 +1163,9 @@ void TraceBatch::addAttach( const char *name, uint64_t id, uint64_t to ) {
}
void TraceBatch::addBuggify( int activated, int line, std::string file ) {
if(FLOW_KNOBS->MIN_TRACE_SEVERITY > TRACE_BATCH_IMPLICIT_SEVERITY) {
return;
}
if( g_network ) {
auto& buggifyInfo = buggifyBatch.emplace_back(BuggifyInfo(TraceEvent::getCurrentTime(), activated, line, file));
if (dumpImmediately())
@ -1168,7 +1178,7 @@ void TraceBatch::addBuggify( int activated, int line, std::string file ) {
}
void TraceBatch::dump() {
if (!g_traceLog.isOpen())
if (!g_traceLog.isOpen() || FLOW_KNOBS->MIN_TRACE_SEVERITY > TRACE_BATCH_IMPLICIT_SEVERITY)
return;
std::string machine;
if(g_network->isSimulated()) {
@ -1204,7 +1214,7 @@ void TraceBatch::dump() {
}
TraceBatch::EventInfo::EventInfo(double time, const char *name, uint64_t id, const char *location) {
fields.addField("Severity", format("%d", (int)SevInfo));
fields.addField("Severity", format("%d", (int)TRACE_BATCH_IMPLICIT_SEVERITY));
fields.addField("Time", format("%.6f", time));
fields.addField("Type", name);
fields.addField("ID", format("%016" PRIx64, id));
@ -1212,7 +1222,7 @@ TraceBatch::EventInfo::EventInfo(double time, const char *name, uint64_t id, con
}
TraceBatch::AttachInfo::AttachInfo(double time, const char *name, uint64_t id, uint64_t to) {
fields.addField("Severity", format("%d", (int)SevInfo));
fields.addField("Severity", format("%d", (int)TRACE_BATCH_IMPLICIT_SEVERITY));
fields.addField("Time", format("%.6f", time));
fields.addField("Type", name);
fields.addField("ID", format("%016" PRIx64, id));
@ -1220,7 +1230,7 @@ TraceBatch::AttachInfo::AttachInfo(double time, const char *name, uint64_t id, u
}
TraceBatch::BuggifyInfo::BuggifyInfo(double time, int activated, int line, std::string file) {
fields.addField("Severity", format("%d", (int)SevInfo));
fields.addField("Severity", format("%d", (int)TRACE_BATCH_IMPLICIT_SEVERITY));
fields.addField("Time", format("%.6f", time));
fields.addField("Type", "BuggifySection");
fields.addField("Activated", format("%d", activated));

View File

@ -347,7 +347,7 @@ struct NetworkMetrics {
static const std::vector<int> starvationBins;
NetworkMetrics() {
NetworkMetrics() : lastRunLoopBusyness(0) {
for(int priority : starvationBins) {
starvationTrackers.emplace_back(static_cast<TaskPriority>(priority));
}

View File

@ -20,7 +20,8 @@
COMPOSE_PROJECT_NAME=fdbpythonsample
FDB_API_VERSION=620
FDB_VERSION=6.2.11
FDB_VERSION=6.2.26
FDB_COORDINATOR=fdb-coordinator
FDB_NETWORKING_MODE=container
FDB_COORDINATOR_PORT=4500
FDB_ADDITIONAL_VERSIONS=""

View File

@ -11,13 +11,14 @@ docker-compose up -d
```
This will start:
* 1 coordinator,
* 2 fdbservers,
* 1 Flask application.
* 1 coordinator
* 2 fdbservers
* 1 Flask application
The Flask application can be accessed using curl:
```sh
```bash
# retrieve counter
curl http://0.0.0.0:5000/counter # 0
@ -29,7 +30,6 @@ curl -X POST http://0.0.0.0:5000/counter/increment # 2
curl http://0.0.0.0:5000/counter # 2
```
## Access the FoundationDB cluster
If you want to access the cluster from your machine, here's a `cluster file` ready for you:
@ -41,7 +41,7 @@ FDB_CLUSTER_FILE=./docker.cluster fdbcli
## Stop the Python demo
```
```bash
docker-compose down
```

View File

@ -20,9 +20,9 @@
ARG FDB_VERSION
FROM foundationdb/foundationdb:${FDB_VERSION} as fdb
FROM python:3.8
FROM python:3.8-slim
RUN apt-get update; apt-get install -y dnsutils
RUN apt-get update; apt-get install -y dnsutils curl && rm -rf /var/lib/apt/lists/*
RUN mkdir -p /app
WORKDIR /app
@ -32,12 +32,11 @@ COPY --from=fdb /usr/bin/fdbcli /usr/bin/
COPY --from=fdb /var/fdb/scripts/create_cluster_file.bash /app
ARG FDB_WEBSITE=https://foundationdb.org
ARG FDB_OLDER_VERSIONS=""
ARG FDB_ADDITIONAL_VERSIONS
ENV FDB_NETWORK_OPTION_EXTERNAL_CLIENT_DIRECTORY=/usr/lib/fdb-multiversion
RUN \
mkdir /usr/lib/fdb-multiversion; \
for version in ${FDB_OLDER_VERSIONS}; do \
wget ${FDB_WEBSITE}/downloads/$version/linux/libfdb_c_$version.so -O /usr/lib/fdb-multiversion/libfdb_c_$version.so; \
RUN mkdir /usr/lib/fdb-multiversion; \
for version in ${FDB_ADDITIONAL_VERSIONS}; do \
curl ${FDB_WEBSITE}/downloads/$version/linux/libfdb_c_$version.so -o /usr/lib/fdb-multiversion/libfdb_c_$version.so; \
done
COPY requirements.txt /app

View File

@ -18,4 +18,4 @@
#
Flask==1.1.1
foundationdb==6.1.11
foundationdb==6.2.10

View File

@ -17,16 +17,16 @@
# limitations under the License.
#
import os
from flask import Flask
import fdb
import os
app = Flask(__name__)
fdb.api_version(int(os.getenv('FDB_API_VERSION')))
db=fdb.open()
db = fdb.open()
COUNTER_KEY=fdb.tuple.pack(('counter',))
COUNTER_KEY = fdb.tuple.pack(('counter',))
def _increment_counter(tr):
counter_value = tr[COUNTER_KEY]
if counter_value == None:
@ -41,8 +41,8 @@ def get_counter():
counter_value = db[COUNTER_KEY]
if counter_value == None:
return '0'
else:
return str(fdb.tuple.unpack(counter_value)[0])
return str(fdb.tuple.unpack(counter_value)[0])
@app.route("/counter/increment", methods=['POST'])
def increment_counter():

View File

@ -21,14 +21,14 @@
set -xe;
if [[ ! -n "${FDB_CLUSTER_FILE:-}" || ! -s "${FDB_CLUSTER_FILE}" ]]; then
if [[ -z "${FDB_CLUSTER_FILE:-}" || ! -s "${FDB_CLUSTER_FILE}" ]]; then
/app/create_cluster_file.bash
FDB_CLUSTER_FILE="${FDB_CLUSTER_FILE:-/etc/foundationdb/fdb.cluster}"
# Attempt to connect. Configure the database if necessary.
if ! /usr/bin/fdbcli -C $FDB_CLUSTER_FILE --exec status --timeout 3 ; then
if ! /usr/bin/fdbcli -C "${FDB_CLUSTER_FILE}" --exec status --timeout 3 ; then
echo "creating the database"
if ! fdbcli -C $FDB_CLUSTER_FILE --exec "configure new single memory ; status" --timeout 10 ; then
if ! fdbcli -C "${FDB_CLUSTER_FILE}" --exec "configure new single memory ; status" --timeout 10 ; then
echo "Unable to configure new FDB cluster."
exit 1
fi

View File

@ -55,6 +55,7 @@ services:
context: app
args:
FDB_VERSION: ${FDB_VERSION}
FDB_ADDITIONAL_VERSIONS: ${FDB_ADDITIONAL_VERSIONS}
ports:
- 5000:5000/tcp
environment:

View File

@ -32,7 +32,7 @@
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
<Product Name='$(var.Title)'
Id='{0AB36B0F-2187-4ECD-9E7E-983EDD966CEB}'
Id='{4F01CFD6-596A-4224-BF7D-4AEF30BA2083}'
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
Version='$(var.Version)'
Manufacturer='$(var.Manufacturer)'

View File

@ -198,6 +198,9 @@ if(WITH_PYTHON)
add_fdb_test(
TEST_FILES restarting/from_5.2.0/ClientTransactionProfilingCorrectness-1.txt
restarting/from_5.2.0/ClientTransactionProfilingCorrectness-2.txt)
add_fdb_test(
TEST_FILES restarting/from_7.0.0/UpgradeAndBackupRestore-1.txt
restarting/from_7.0.0/UpgradeAndBackupRestore-2.txt)
add_fdb_test(
TEST_FILES restarting/to_6.3.5/CycleTestRestart-1.txt
restarting/to_6.3.5/CycleTestRestart-2.txt)

View File

@ -0,0 +1,43 @@
testTitle=SubmitBackup
simBackupAgents=BackupToFile
clearAfterTest = false
testName=SubmitBackup
delayFor = 0
stopWhenDone = false
testTitle=FirstCycleTest
clearAfterTest=false
testName=Cycle
nodeCount = 30000
transactionsPerSecond = 2500.0
testDuration = 30.0
expectedRate = 0
keyPrefix=BeforeRestart
testName=RandomClogging
testDuration = 90.0
testName=Rollback
meanDelay = 90.0
testDuration = 90.0
testName=Attrition
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 90.0
testName=Attrition
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 90.0
testTitle=SaveDatabase
clearAfterTest = false
testName=SaveAndKill
restartInfoLocation=simfdb/restartInfo.ini
testDuration=30.0

View File

@ -0,0 +1,56 @@
testTitle=SecondCycleTest
simBackupAgents=BackupToFile
clearAfterTest=false
testName=Cycle
nodeCount = 30000
transactionsPerSecond = 2500.0
testDuration = 30.0
expectedRate = 0
keyPrefix=AfterRestart
testName=Cycle
nodeCount = 30000
transactionsPerSecond = 2500.0
testDuration = 30.0
expectedRate = 0
keyPrefix=BeforeRestart
testName=RandomClogging
testDuration = 90.0
testName=Rollback
meanDelay = 90.0
testDuration = 90.0
testName=Attrition
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 90.0
testName=Attrition
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 90.0
testTitle=RestoreBackup
simBackupAgents=BackupToFile
clearAfterTest=false
testName=RestoreBackup
tag=default
testTitle=CheckCycles
checkOnly=true
testName=Cycle
nodeCount=30000
keyPrefix=AfterRestart
expectedRate=0
testName=Cycle
nodeCount = 30000
keyPrefix=BeforeRestart
expectedRate = 0