Merge branch 'master' into mengxu/performant-restore-PR
This commit is contained in:
commit
7ff46e6772
|
@ -76,7 +76,7 @@ services:
|
|||
|
||||
snapshot-correctness: &snapshot-correctness
|
||||
<<: *build-setup
|
||||
command: scl enable devtoolset-8 python27 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -DCMAKE_COLOR_MAKEFILE=0 -DFDB_RELEASE=1 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && make -j "$${MAKEJOBS}" && ctest -j "$${MAKEJOBS}" --output-on-failure
|
||||
command: scl enable devtoolset-8 python27 rh-python36 rh-ruby24 -- bash -c 'mkdir -p "$${BUILD_DIR}" && cd "$${BUILD_DIR}" && cmake -DCMAKE_COLOR_MAKEFILE=0 -DFDB_RELEASE=1 /__this_is_some_very_long_name_dir_needed_to_fix_a_bug_with_debug_rpms__/foundationdb && make -j "$${MAKEJOBS}" && ctest -j "$${MAKEJOBS}" --output-on-failure'
|
||||
|
||||
prb-correctness:
|
||||
<<: *snapshot-correctness
|
||||
|
|
|
@ -10,38 +10,38 @@ macOS
|
|||
|
||||
The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server.
|
||||
|
||||
* `FoundationDB-6.1.12.pkg <https://www.foundationdb.org/downloads/6.1.12/macOS/installers/FoundationDB-6.1.12.pkg>`_
|
||||
* `FoundationDB-6.2.0.pkg <https://www.foundationdb.org/downloads/6.2.0/macOS/installers/FoundationDB-6.2.0.pkg>`_
|
||||
|
||||
Ubuntu
|
||||
------
|
||||
|
||||
The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x.
|
||||
|
||||
* `foundationdb-clients-6.1.12-1_amd64.deb <https://www.foundationdb.org/downloads/6.1.12/ubuntu/installers/foundationdb-clients_6.1.12-1_amd64.deb>`_
|
||||
* `foundationdb-server-6.1.12-1_amd64.deb <https://www.foundationdb.org/downloads/6.1.12/ubuntu/installers/foundationdb-server_6.1.12-1_amd64.deb>`_ (depends on the clients package)
|
||||
* `foundationdb-clients-6.2.0-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.0/ubuntu/installers/foundationdb-clients_6.2.0-1_amd64.deb>`_
|
||||
* `foundationdb-server-6.2.0-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.0/ubuntu/installers/foundationdb-server_6.2.0-1_amd64.deb>`_ (depends on the clients package)
|
||||
|
||||
RHEL/CentOS EL6
|
||||
---------------
|
||||
|
||||
The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x.
|
||||
|
||||
* `foundationdb-clients-6.1.12-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.1.12/rhel6/installers/foundationdb-clients-6.1.12-1.el6.x86_64.rpm>`_
|
||||
* `foundationdb-server-6.1.12-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.1.12/rhel6/installers/foundationdb-server-6.1.12-1.el6.x86_64.rpm>`_ (depends on the clients package)
|
||||
* `foundationdb-clients-6.2.0-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.0/rhel6/installers/foundationdb-clients-6.2.0-1.el6.x86_64.rpm>`_
|
||||
* `foundationdb-server-6.2.0-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.0/rhel6/installers/foundationdb-server-6.2.0-1.el6.x86_64.rpm>`_ (depends on the clients package)
|
||||
|
||||
RHEL/CentOS EL7
|
||||
---------------
|
||||
|
||||
The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x.
|
||||
|
||||
* `foundationdb-clients-6.1.12-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.1.12/rhel7/installers/foundationdb-clients-6.1.12-1.el7.x86_64.rpm>`_
|
||||
* `foundationdb-server-6.1.12-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.1.12/rhel7/installers/foundationdb-server-6.1.12-1.el7.x86_64.rpm>`_ (depends on the clients package)
|
||||
* `foundationdb-clients-6.2.0-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.0/rhel7/installers/foundationdb-clients-6.2.0-1.el7.x86_64.rpm>`_
|
||||
* `foundationdb-server-6.2.0-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.0/rhel7/installers/foundationdb-server-6.2.0-1.el7.x86_64.rpm>`_ (depends on the clients package)
|
||||
|
||||
Windows
|
||||
-------
|
||||
|
||||
The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server.
|
||||
|
||||
* `foundationdb-6.1.12-x64.msi <https://www.foundationdb.org/downloads/6.1.12/windows/installers/foundationdb-6.1.12-x64.msi>`_
|
||||
* `foundationdb-6.2.0-x64.msi <https://www.foundationdb.org/downloads/6.2.0/windows/installers/foundationdb-6.2.0-x64.msi>`_
|
||||
|
||||
API Language Bindings
|
||||
=====================
|
||||
|
@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part
|
|||
|
||||
If you need to use the FoundationDB Python API from other Python installations or paths, download the Python package:
|
||||
|
||||
* `foundationdb-6.1.12.tar.gz <https://www.foundationdb.org/downloads/6.1.12/bindings/python/foundationdb-6.1.12.tar.gz>`_
|
||||
* `foundationdb-6.2.0.tar.gz <https://www.foundationdb.org/downloads/6.2.0/bindings/python/foundationdb-6.2.0.tar.gz>`_
|
||||
|
||||
Ruby 1.9.3/2.0.0+
|
||||
-----------------
|
||||
|
||||
* `fdb-6.1.12.gem <https://www.foundationdb.org/downloads/6.1.12/bindings/ruby/fdb-6.1.12.gem>`_
|
||||
* `fdb-6.2.0.gem <https://www.foundationdb.org/downloads/6.2.0/bindings/ruby/fdb-6.2.0.gem>`_
|
||||
|
||||
Java 8+
|
||||
-------
|
||||
|
||||
* `fdb-java-6.1.12.jar <https://www.foundationdb.org/downloads/6.1.12/bindings/java/fdb-java-6.1.12.jar>`_
|
||||
* `fdb-java-6.1.12-javadoc.jar <https://www.foundationdb.org/downloads/6.1.12/bindings/java/fdb-java-6.1.12-javadoc.jar>`_
|
||||
* `fdb-java-6.2.0.jar <https://www.foundationdb.org/downloads/6.2.0/bindings/java/fdb-java-6.2.0.jar>`_
|
||||
* `fdb-java-6.2.0-javadoc.jar <https://www.foundationdb.org/downloads/6.2.0/bindings/java/fdb-java-6.2.0-javadoc.jar>`_
|
||||
|
||||
Go 1.11+
|
||||
--------
|
||||
|
|
|
@ -74,7 +74,7 @@ Example: A Server Interface
|
|||
|
||||
Below is a actor that runs on single server communicating over the network. Its functionality is to maintain a count in response to asynchronous messages from other actors. It supports an interface implemented with a loop containing a ``choose`` statement with a ``when`` for each request type. Each ``when`` uses ``waitNext()`` to asynchronously wait for the next request in the stream. The add and subtract interfaces modify the count itself, stored with a state variable. The get interface takes a ``Promise<int>`` instead of just an ``int`` to facilitate sending back the return message.
|
||||
|
||||
To write the equivalent code directly in C++, a developer would have to implement a complex set of callbacks with exception-handling, requiring far more engineering effort. Flow makes it much easier to implement this sort of asynchronous coordination, with no loss of performance.:
|
||||
To write the equivalent code directly in C++, a developer would have to implement a complex set of callbacks with exception-handling, requiring far more engineering effort. Flow makes it much easier to implement this sort of asynchronous coordination, with no loss of performance:
|
||||
|
||||
.. code-block:: c
|
||||
|
||||
|
|
|
@ -5,43 +5,40 @@ Release Notes
|
|||
6.2.0
|
||||
=====
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
* Improved team collection for data distribution that builds a balanced number of teams per server and gurantees that each server has at least one team. `(PR #1785) <https://github.com/apple/foundationdb/pull/1785>`_.
|
||||
* Added the option to have data distribution FetchKeys to run at a lower priority by setting the knob ``FETCH_KEYS_LOWER_PRIORITY`` `(PR #1791) <https://github.com/apple/foundationdb/pull/1791>`_.
|
||||
* CMake is now our official build system. The Makefile based build system is deprecated.
|
||||
* Added local ratekeeper, to throttle reads at a per-storage-process level. `(PR #1447) <https://github.com/apple/foundationdb/pull/1477>`_.
|
||||
* FDB backups based on disk snapshots, provides an ability to take cluster level backup based on disk level snapshots of storage, tlogs and coordinators. `(PR #1733) <https://github.com/apple/foundationdb/pull/1733>`_.
|
||||
|
||||
* Foundationdb now uses the flatbuffers serialization format for all network messages by default. This can be controlled with the ``--object-serializer`` cli argument or ``use_object_serializer`` network option. Note that network communications only work if each peer uses the same object serializer setting. `(PR 1090) <https://github.com/apple/foundationdb/pull/1090>`_.
|
||||
|
||||
Performance
|
||||
-----------
|
||||
|
||||
* A new transaction log spilling implementation is now the default. Write bandwidth and latency will no longer degrade during storage server or remote region failures. `(PR #1731) <https://github.com/apple/foundationdb/pull/1731>`_.
|
||||
* Storage servers will locally throttle incoming read traffic when they are falling behind. `(PR #1447) <https://github.com/apple/foundationdb/pull/1477>`_.
|
||||
* Use CRC32 checksum for SQLite pages. `(PR #1582) <https://github.com/apple/foundationdb/pull/1582>`_.
|
||||
* Added a 96-byte fast allocator, so storage queue nodes use less memory. `(PR #1336) <https://github.com/apple/foundationdb/pull/1336>`_.
|
||||
* Handle large packets better. `(PR #1684) <https://github.com/apple/foundationdb/pull/1684>`_.
|
||||
* A new Transaction Log spilling implementation is now the default. Write bandwidth and latency will no longer degrade during storage server or remote region failures. `(PR #1731) <https://github.com/apple/foundationdb/pull/1731>`_.
|
||||
* Log routers will prefer to peek from satellites at ``log_version >= 4``. `(PR #1795) <https://github.com/apple/foundationdb/pull/1795>`_.
|
||||
* Improved network performance when sending large packets. `(PR #1684) <https://github.com/apple/foundationdb/pull/1684>`_.
|
||||
* Spilled data can be consumed from transaction logs more quickly and with less overhead. `(PR #1584) <https://github.com/apple/foundationdb/pull/1584>`_.
|
||||
* Improved the speed of recoveries on large clusters. `(PR #1729) <https://github.com/apple/foundationdb/pull/1729>`_.
|
||||
* Monitor leader only when proxies are unknown or any dies. `(PR #1059) <https://github.com/apple/foundationdb/pull/1059>`_.
|
||||
* Clients no longer talk to cluster controller for failure monitoring. `(PR #1640) <https://github.com/apple/foundationdb/pull/1640>`_.
|
||||
* Make clients cheaper by reducing the connection monitoring messages between clients and servers and ensuring that unused connections are destroyed. `(PR #1768) <https://github.com/apple/foundationdb/pull/1768>`_.
|
||||
* Clients no longer talk to the cluster controller for failure monitoring information. `(PR #1640) <https://github.com/apple/foundationdb/pull/1640>`_.
|
||||
* Reduced the number of connection monitoring messages between clients and servers. `(PR #1768) <https://github.com/apple/foundationdb/pull/1768>`_.
|
||||
* Close connections which have been idle for a long period of time. `(PR #1768) <https://github.com/apple/foundationdb/pull/1768>`_.
|
||||
* Each client connects to exactly one coordinator, and at most five proxies. `(PR #1909) <https://github.com/apple/foundationdb/pull/1909>`_.
|
||||
* Ratekeeper will throttle traffic when too many storage servers are not making versions durable fast enough. `(PR #1784) <https://github.com/apple/foundationdb/pull/1784>`_.
|
||||
* Storage servers recovering a memory storage engine will abort recovery if the cluster is already healthy. `(PR #1713) <https://github.com/apple/foundationdb/pull/1713>`_.
|
||||
* Improved how the data distribution algorithm balances data across teams of storage servers. `(PR #1785) <https://github.com/apple/foundationdb/pull/1785>`_.
|
||||
* Lowered the priority for data distribution team removal, to avoid prioritizing team removal work over splitting shards. `(PR #1853) <https://github.com/apple/foundationdb/pull/1853>`_.
|
||||
* Made the storage cache eviction policy configurable, and added an LRU policy. `(PR #1506) <https://github.com/apple/foundationdb/pull/1506>`_.
|
||||
* Improved the speed of recoveries on large clusters at ``log_version >= 4``. `(PR #1729) <https://github.com/apple/foundationdb/pull/1729>`_.
|
||||
* Log routers will prefer to peek from satellites at ``log_version >= 4``. `(PR #1795) <https://github.com/apple/foundationdb/pull/1795>`_.
|
||||
|
||||
Fixes
|
||||
-----
|
||||
|
||||
* Set the priority of redundant teams to remove as PRIORITY_TEAM_REDUNDANT, instead of PRIORITY_TEAM_UNHEALTHY. `(PR #1802) <https://github.com/apple/foundationdb/pull/1802>`_.
|
||||
* During an upgrade, the multi-version client now persists database default options and transaction options that aren't reset on retry (e.g. transaction timeout). In order for these options to function correctly during an upgrade, a 6.2 or later client should be used as the primary client. `(PR #1767) <https://github.com/apple/foundationdb/pull/1767>`_.
|
||||
* If a cluster is upgraded during an ``onError`` call, the cluster could return a ``cluster_version_changed`` error. `(PR #1734) <https://github.com/apple/foundationdb/pull/1734>`_.
|
||||
* Do not set doBuildTeams in StorageServerTracker unless a storage server's interface changes, in order to avoid unnecessary work. `(PR #1779) <https://github.com/apple/foundationdb/pull/1779>`_.
|
||||
* Data distribution will now pick a random destination when merging shards in the ``\xff`` keyspace. This avoids an issue with backup where the write-heavy mutation log shards could concentrate on a single process that has less data than everybody else. `(PR #1916) <https://github.com/apple/foundationdb/pull/1916>`_.
|
||||
* Setting ``--machine_id`` (or ``-i``) for an ``fdbserver`` process now sets ``locality_machineid`` in addition to ``locality_zoneid``. `(PR #1928) <https://github.com/apple/foundationdb/pull/1928>`_.
|
||||
* File descriptors opened by clients and servers set close-on-exec, if available on the platform. `(PR #1581) <https://github.com/apple/foundationdb/pull/1581>`_.
|
||||
* ``fdbrestore`` commands other than ``start`` required a default cluster file to be found but did not actually use it. `(PR #1912) <https://github.com/apple/foundationdb/pull/1912>`_.
|
||||
* Fix reference counting used for managing peer connections. `(PR #1768) <https://github.com/apple/foundationdb/pull/1768>`_.
|
||||
* Unneeded network connections were not being closed because peer reference counts were handled improperly. `(PR #1768) <https://github.com/apple/foundationdb/pull/1768>`_.
|
||||
* Under certain conditions, cross region replication could stall for 10 minute periods. `(PR #1818) <https://github.com/apple/foundationdb/pull/1818>`_.
|
||||
* In very rare scenarios, master recovery would restart because system metadata was loaded incorrectly. `(PR #1919) <https://github.com/apple/foundationdb/pull/1919>`_.
|
||||
* Ratekeeper will aggressively throttle when unable to fetch the list of storage servers for a considerable period of time. `(PR #1858) <https://github.com/apple/foundationdb/pull/1858>`_.
|
||||
|
||||
Status
|
||||
------
|
||||
|
@ -53,34 +50,39 @@ Status
|
|||
* Added ``local_rate`` to the ``roles`` section to record the throttling rate of the local ratekeeper `(PR #1712) <http://github.com/apple/foundationdb/pull/1712>`_.
|
||||
* Renamed ``cluster.fault_tolerance`` fields ``max_machines_without_losing_availability`` and ``max_machines_without_losing_data`` to ``max_zones_without_losing_availability`` and ``max_zones_without_losing_data`` `(PR #1925) <https://github.com/apple/foundationdb/pull/1925>`_.
|
||||
* ``fdbcli`` status now reports the configured zone count. The fault tolerance is now reported in terms of the number of zones unless machine IDs are being used as zone IDs. `(PR #1924) <https://github.com/apple/foundationdb/pull/1924>`_.
|
||||
* ``connected_clients`` is now only a sample of the connected clients, rather than a complete list. `(PR #1902) <https://github.com/apple/foundationdb/pull/1902>`_.
|
||||
* Added ``max_protocol_clients`` to the ``supported_versions`` section, which provides a sample of connected clients which cannot connect to any higher protocol version. `(PR #1902) <https://github.com/apple/foundationdb/pull/1902>`_.
|
||||
|
||||
Bindings
|
||||
--------
|
||||
|
||||
* Add a transaction size limit for both database option and transaction option. `(PR #1725) <https://github.com/apple/foundationdb/pull/1725>`_.
|
||||
* Add a transaction size limit as both a database option and a transaction option. `(PR #1725) <https://github.com/apple/foundationdb/pull/1725>`_.
|
||||
* Added a new API to get the approximated transaction size before commit, e.g., ``fdb_transaction_get_approximate_size`` in the C binding. `(PR #1756) <https://github.com/apple/foundationdb/pull/1756>`_.
|
||||
* C: ``fdb_future_get_version`` has been renamed to ``fdb_future_get_int64``. `(PR #1756) <https://github.com/apple/foundationdb/pull/1756>`_.
|
||||
* C: Applications linking to libfdb_c can now use ``pkg-config foundationdb-client`` or ``find_package(FoundationDB-Client ...)`` (for cmake) to get the proper flags for compiling and linking. `(PR #1636) <https://github.com/apple/foundationdb/pull/1636>`_.
|
||||
* C: Applications linking to ``libfdb_c`` can now use ``pkg-config foundationdb-client`` or ``find_package(FoundationDB-Client ...)`` (for cmake) to get the proper flags for compiling and linking. `(PR #1636) <https://github.com/apple/foundationdb/pull/1636>`_.
|
||||
* Go: The Go bindings now require Go version 1.11 or later.
|
||||
* Go: Fix issue with finalizers running too early that could lead to undefined behavior. `(PR #1451) <https://github.com/apple/foundationdb/pull/1451>`_.
|
||||
* Added transaction option to control the field length of keys and values in debug transaction logging in order to avoid truncation. `(PR #1844) <https://github.com/apple/foundationdb/pull/1844>`_.
|
||||
* Go: Finalizers could run too early leading to undefined behavior. `(PR #1451) <https://github.com/apple/foundationdb/pull/1451>`_.
|
||||
* Added a transaction option to control the field length of keys and values in debug transaction logging in order to avoid truncation. `(PR #1844) <https://github.com/apple/foundationdb/pull/1844>`_.
|
||||
|
||||
Other Changes
|
||||
-------------
|
||||
|
||||
* Added the primitives for FDB backups based on disk snapshots. This provides an ability to take a cluster level backup based on disk level snapshots of the storage, tlogs and coordinators. `(PR #1733) <https://github.com/apple/foundationdb/pull/1733>`_.
|
||||
* Foundationdb now uses the flatbuffers serialization format for all network messages. `(PR 1090) <https://github.com/apple/foundationdb/pull/1090>`_.
|
||||
* Clients will throw ``transaction_too_old`` when attempting to read if ``setVersion`` was called with a version smaller than the smallest read version obtained from the cluster. This is a protection against reading from the wrong cluster in multi-cluster scenarios. `(PR #1413) <https://github.com/apple/foundationdb/pull/1413>`_.
|
||||
* Trace files are now ordered lexicographically. This means that the filename format for trace files did change. `(PR #1828) <https://github.com/apple/foundationdb/pull/1828>`_.
|
||||
* Trace files are now ordered lexicographically. This means that the filename format for trace files has changed. `(PR #1828) <https://github.com/apple/foundationdb/pull/1828>`_.
|
||||
* Improved ``TransactionMetrics`` log events by adding a random UID to distinguish multiple open connections, a flag to identify internal vs. client connections, and logging of rates and roughness in addition to total count for several metrics. `(PR #1808) <https://github.com/apple/foundationdb/pull/1808>`_.
|
||||
* FoundationDB can now be built with clang and libc++ on Linux `(PR #1666) <https://github.com/apple/foundationdb/pull/1666>`_.
|
||||
* Added experimental framework to run C and Java clients in simulator `(PR #1678) <https://github.com/apple/foundationdb/pull/1678>`_.
|
||||
* Added new network option for client buggify which will randomly throw expected exceptions in the client. Intended for client testing `(PR #1417) <https://github.com/apple/foundationdb/pull/1417>`_.
|
||||
* FoundationDB can now be built with clang and libc++ on Linux. `(PR #1666) <https://github.com/apple/foundationdb/pull/1666>`_.
|
||||
* Added experimental framework to run C and Java clients in simulator. `(PR #1678) <https://github.com/apple/foundationdb/pull/1678>`_.
|
||||
* Added new network options for client buggify which will randomly throw expected exceptions in the client. This is intended to be used for client testing. `(PR #1417) <https://github.com/apple/foundationdb/pull/1417>`_.
|
||||
* Added ``--cache_memory`` parameter for ``fdbserver`` processes to control the amount of memory dedicated to caching pages read from disk. `(PR #1889) <https://github.com/apple/foundationdb/pull/1889>`_.
|
||||
* Added ``MakoWorkload``, used as a benchmark to do performance testing of FDB. `(PR #1586) <https://github.com/apple/foundationdb/pull/1586>`_.
|
||||
* Added two knobs ``LOAD_BALANCE_ZONE_ID_LOCALITY_ENABLED`` and ``LOAD_BALANCE_DC_ID_LOCALITY_ENABLED`` allowing locality-based decision-making to be toggled on/off during load balancing. `(PR #1820) <https://github.com/apple/foundationdb/pull/1820>`_.
|
||||
* Ratekeeper will aggressively throttle when unable to fetch the list of storage servers for a considerable period of time. `(PR #1858) <https://github.com/apple/foundationdb/pull/1858>`_.
|
||||
* ``fdbserver`` now accepts a comma separated list of public and listen addresses. `(PR #1721) <https://github.com/apple/foundationdb/pull/1721>`_.
|
||||
* ``CAUSAL_READ_RISKY`` has been enhanced to further reduce the chance of causally inconsistent reads. Existing users of ``CAUSAL_READ_RISKY`` may see increased GRV latency if proxies are distantly located from logs. `(PR #1841) <https://github.com/apple/foundationdb/pull/1841>`_.
|
||||
* Added ``no_wait`` option in ``fdbcli`` exclude command to avoid blocking. `(PR #1852) <https://github.com/apple/foundationdb/pull/1852>`_.
|
||||
* ``CAUSAL_READ_RISKY`` can be turned on for all transactions using a database option. `(PR #1841) <https://github.com/apple/foundationdb/pull/1841>`_.
|
||||
* Added a ``no_wait`` option to the ``fdbcli`` exclude command to avoid blocking. `(PR #1852) <https://github.com/apple/foundationdb/pull/1852>`_.
|
||||
* Idle clusters will fsync much less frequently. `(PR #1697) <https://github.com/apple/foundationdb/pull/1697>`_.
|
||||
* CMake is now the official build system. The Makefile based build system is deprecated.
|
||||
|
||||
Earlier release notes
|
||||
---------------------
|
||||
|
|
|
@ -126,7 +126,6 @@ enum {
|
|||
OPT_CLEANUP,
|
||||
|
||||
OPT_TRACE_FORMAT,
|
||||
OPT_USE_OBJECT_SERIALIZER
|
||||
};
|
||||
|
||||
CSimpleOpt::SOption g_rgAgentOptions[] = {
|
||||
|
@ -143,8 +142,6 @@ CSimpleOpt::SOption g_rgAgentOptions[] = {
|
|||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
|
||||
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
|
||||
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
|
||||
{ OPT_CRASHONERROR, "--crash", SO_NONE },
|
||||
{ OPT_LOCALITY, "--locality_", SO_REQ_SEP },
|
||||
|
@ -184,8 +181,6 @@ CSimpleOpt::SOption g_rgBackupStartOptions[] = {
|
|||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
|
||||
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
|
||||
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
|
||||
{ OPT_QUIET, "-q", SO_NONE },
|
||||
{ OPT_QUIET, "--quiet", SO_NONE },
|
||||
|
@ -253,8 +248,6 @@ CSimpleOpt::SOption g_rgBackupStatusOptions[] = {
|
|||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
|
||||
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
|
||||
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
|
||||
{ OPT_VERSION, "--version", SO_NONE },
|
||||
{ OPT_VERSION, "-v", SO_NONE },
|
||||
|
@ -285,8 +278,6 @@ CSimpleOpt::SOption g_rgBackupAbortOptions[] = {
|
|||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
|
||||
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
|
||||
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
|
||||
{ OPT_QUIET, "-q", SO_NONE },
|
||||
{ OPT_QUIET, "--quiet", SO_NONE },
|
||||
|
@ -318,8 +309,6 @@ CSimpleOpt::SOption g_rgBackupDiscontinueOptions[] = {
|
|||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
|
||||
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
|
||||
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
|
||||
{ OPT_QUIET, "-q", SO_NONE },
|
||||
{ OPT_QUIET, "--quiet", SO_NONE },
|
||||
|
@ -351,8 +340,6 @@ CSimpleOpt::SOption g_rgBackupWaitOptions[] = {
|
|||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
|
||||
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
|
||||
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
|
||||
{ OPT_QUIET, "-q", SO_NONE },
|
||||
{ OPT_QUIET, "--quiet", SO_NONE },
|
||||
|
@ -380,8 +367,6 @@ CSimpleOpt::SOption g_rgBackupPauseOptions[] = {
|
|||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
|
||||
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
|
||||
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
|
||||
{ OPT_QUIET, "-q", SO_NONE },
|
||||
{ OPT_QUIET, "--quiet", SO_NONE },
|
||||
|
@ -411,8 +396,6 @@ CSimpleOpt::SOption g_rgBackupExpireOptions[] = {
|
|||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
|
||||
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
|
||||
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
|
||||
{ OPT_QUIET, "-q", SO_NONE },
|
||||
{ OPT_QUIET, "--quiet", SO_NONE },
|
||||
|
@ -450,8 +433,6 @@ CSimpleOpt::SOption g_rgBackupDeleteOptions[] = {
|
|||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
|
||||
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
|
||||
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
|
||||
{ OPT_QUIET, "-q", SO_NONE },
|
||||
{ OPT_QUIET, "--quiet", SO_NONE },
|
||||
|
@ -483,8 +464,6 @@ CSimpleOpt::SOption g_rgBackupDescribeOptions[] = {
|
|||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
|
||||
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
|
||||
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
|
||||
{ OPT_QUIET, "-q", SO_NONE },
|
||||
{ OPT_QUIET, "--quiet", SO_NONE },
|
||||
|
@ -549,8 +528,6 @@ CSimpleOpt::SOption g_rgBackupListOptions[] = {
|
|||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
|
||||
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
|
||||
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
|
||||
{ OPT_QUIET, "-q", SO_NONE },
|
||||
{ OPT_QUIET, "--quiet", SO_NONE },
|
||||
|
@ -593,8 +570,6 @@ CSimpleOpt::SOption g_rgRestoreOptions[] = {
|
|||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
|
||||
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
|
||||
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
|
||||
{ OPT_QUIET, "-q", SO_NONE },
|
||||
{ OPT_QUIET, "--quiet", SO_NONE },
|
||||
|
@ -632,8 +607,6 @@ CSimpleOpt::SOption g_rgDBAgentOptions[] = {
|
|||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
|
||||
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
|
||||
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
|
||||
{ OPT_CRASHONERROR, "--crash", SO_NONE },
|
||||
{ OPT_LOCALITY, "--locality_", SO_REQ_SEP },
|
||||
|
@ -664,8 +637,6 @@ CSimpleOpt::SOption g_rgDBStartOptions[] = {
|
|||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
|
||||
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
|
||||
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
|
||||
{ OPT_QUIET, "-q", SO_NONE },
|
||||
{ OPT_QUIET, "--quiet", SO_NONE },
|
||||
|
@ -699,8 +670,6 @@ CSimpleOpt::SOption g_rgDBStatusOptions[] = {
|
|||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
|
||||
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
|
||||
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
|
||||
{ OPT_VERSION, "--version", SO_NONE },
|
||||
{ OPT_VERSION, "-v", SO_NONE },
|
||||
|
@ -732,8 +701,6 @@ CSimpleOpt::SOption g_rgDBSwitchOptions[] = {
|
|||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
|
||||
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
|
||||
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
|
||||
{ OPT_QUIET, "-q", SO_NONE },
|
||||
{ OPT_QUIET, "--quiet", SO_NONE },
|
||||
|
@ -767,8 +734,6 @@ CSimpleOpt::SOption g_rgDBAbortOptions[] = {
|
|||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
|
||||
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
|
||||
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
|
||||
{ OPT_QUIET, "-q", SO_NONE },
|
||||
{ OPT_QUIET, "--quiet", SO_NONE },
|
||||
|
@ -798,8 +763,6 @@ CSimpleOpt::SOption g_rgDBPauseOptions[] = {
|
|||
{ OPT_TRACE, "--log", SO_NONE },
|
||||
{ OPT_TRACE_DIR, "--logdir", SO_REQ_SEP },
|
||||
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
|
||||
{ OPT_TRACE_LOG_GROUP, "--loggroup", SO_REQ_SEP },
|
||||
{ OPT_QUIET, "-q", SO_NONE },
|
||||
{ OPT_QUIET, "--quiet", SO_NONE },
|
||||
|
@ -2893,7 +2856,6 @@ int main(int argc, char* argv[]) {
|
|||
bool dryRun = false;
|
||||
std::string traceDir = "";
|
||||
std::string traceFormat = "";
|
||||
bool useObjectSerializer = true;
|
||||
std::string traceLogGroup;
|
||||
uint64_t traceRollSize = TRACE_DEFAULT_ROLL_SIZE;
|
||||
uint64_t traceMaxLogsSize = TRACE_DEFAULT_MAX_LOGS_SIZE;
|
||||
|
@ -3004,18 +2966,6 @@ int main(int argc, char* argv[]) {
|
|||
}
|
||||
traceFormat = args->OptionArg();
|
||||
break;
|
||||
case OPT_USE_OBJECT_SERIALIZER: {
|
||||
std::string s = args->OptionArg();
|
||||
std::transform(s.begin(), s.end(), s.begin(), ::tolower);
|
||||
if (s == "on" || s == "true" || s == "1") {
|
||||
useObjectSerializer = true;
|
||||
} else if (s == "off" || s == "false" || s == "0") {
|
||||
useObjectSerializer = false;
|
||||
} else {
|
||||
fprintf(stderr, "ERROR: Could not parse object serializer option: `%s'\n", s.c_str());
|
||||
}
|
||||
break;
|
||||
}
|
||||
case OPT_TRACE_LOG_GROUP:
|
||||
traceLogGroup = args->OptionArg();
|
||||
break;
|
||||
|
@ -3369,11 +3319,6 @@ int main(int argc, char* argv[]) {
|
|||
setNetworkOption(FDBNetworkOptions::ENABLE_SLOW_TASK_PROFILING);
|
||||
}
|
||||
setNetworkOption(FDBNetworkOptions::DISABLE_CLIENT_STATISTICS_LOGGING);
|
||||
// The USE_OBJECT_SERIALIZER network option expects an 8 byte little endian integer which is interpreted as
|
||||
// zero = false, non-zero = true.
|
||||
setNetworkOption(FDBNetworkOptions::USE_OBJECT_SERIALIZER,
|
||||
useObjectSerializer ? LiteralStringRef("\x01\x00\x00\x00\x00\x00\x00\x00")
|
||||
: LiteralStringRef("\x00\x00\x00\x00\x00\x00\x00\x00"));
|
||||
|
||||
// deferred TLS options
|
||||
if (tlsCertPath.size()) {
|
||||
|
|
|
@ -70,8 +70,7 @@ enum {
|
|||
OPT_NO_STATUS,
|
||||
OPT_STATUS_FROM_JSON,
|
||||
OPT_VERSION,
|
||||
OPT_TRACE_FORMAT,
|
||||
OPT_USE_OBJECT_SERIALIZER
|
||||
OPT_TRACE_FORMAT
|
||||
};
|
||||
|
||||
CSimpleOpt::SOption g_rgOptions[] = { { OPT_CONNFILE, "-C", SO_REQ_SEP },
|
||||
|
@ -89,8 +88,6 @@ CSimpleOpt::SOption g_rgOptions[] = { { OPT_CONNFILE, "-C", SO_REQ_SEP },
|
|||
{ OPT_VERSION, "--version", SO_NONE },
|
||||
{ OPT_VERSION, "-v", SO_NONE },
|
||||
{ OPT_TRACE_FORMAT, "--trace_format", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
|
||||
|
||||
#ifndef TLS_DISABLED
|
||||
TLS_OPTION_FLAGS
|
||||
|
@ -489,10 +486,6 @@ void initHelp() {
|
|||
"include all|<ADDRESS>*",
|
||||
"permit previously-excluded servers to rejoin the database",
|
||||
"If `all' is specified, the excluded servers list is cleared.\n\nFor each IP address or IP:port pair in <ADDRESS>*, removes any matching exclusions from the excluded servers list. (A specified IP will match all IP:* exclusion entries)");
|
||||
helpMap["snapshot"] = CommandHelp(
|
||||
"snapshot <BINARY-PATH>:<ARG1=VAL1>,<ARG2=VAL2>,...",
|
||||
"snapshot the database",
|
||||
"invokes binary provided in binary-path with the arg,value pairs on TLog, Storage and Coordinators nodes. UID is a reserved ARG key.");
|
||||
helpMap["setclass"] = CommandHelp(
|
||||
"setclass <ADDRESS> <unset|storage|transaction|default>",
|
||||
"change the class of a process",
|
||||
|
@ -558,11 +551,12 @@ void initHelp() {
|
|||
"Calling this command with `on' prevents data distribution from moving data away from the processes with the specified ZONEID. Data distribution will automatically be turned back on for ZONEID after the specified SECONDS have elapsed, or after a storage server with a different ZONEID fails. Only one ZONEID can be marked for maintenance. Calling this command with no arguments will display any ongoing maintenance. Calling this command with `off' will disable maintenance.\n");
|
||||
helpMap["consistencycheck"] = CommandHelp(
|
||||
"consistencycheck [on|off]",
|
||||
"enables or disables consistencycheck",
|
||||
"Calling this command with `on' enables consistency check to run and `off' will disable the same. Calling this command with no arguments will display setting for consistency check.\n");
|
||||
"permits or prevents consistency checking",
|
||||
"Calling this command with `on' permits consistency check processes to run and `off' will halt their checking. Calling this command with no arguments will display if consistency checking is currently allowed.\n");
|
||||
|
||||
hiddenCommands.insert("expensive_data_check");
|
||||
hiddenCommands.insert("datadistribution");
|
||||
hiddenCommands.insert("snapshot");
|
||||
}
|
||||
|
||||
void printVersion() {
|
||||
|
@ -2415,7 +2409,6 @@ struct CLIOptions {
|
|||
bool trace;
|
||||
std::string traceDir;
|
||||
std::string traceFormat;
|
||||
bool useObjectSerializer = true;
|
||||
int exit_timeout;
|
||||
Optional<std::string> exec;
|
||||
bool initialStatusCheck;
|
||||
|
@ -2517,20 +2510,6 @@ struct CLIOptions {
|
|||
}
|
||||
traceFormat = args.OptionArg();
|
||||
break;
|
||||
case OPT_USE_OBJECT_SERIALIZER: {
|
||||
std::string s = args.OptionArg();
|
||||
std::transform(s.begin(), s.end(), s.begin(), ::tolower);
|
||||
if (s == "on" || s == "true" || s == "1") {
|
||||
useObjectSerializer = true;
|
||||
} else if (s == "off" || s == "false" || s == "0") {
|
||||
useObjectSerializer = false;
|
||||
} else {
|
||||
fprintf(stderr, "ERROR: Could not parse object serializer option: `%s'\n", s.c_str());
|
||||
printProgramUsage(program_name.c_str());
|
||||
flushAndExit(FDB_EXIT_ERROR);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case OPT_VERSION:
|
||||
printVersion();
|
||||
return FDB_EXIT_SUCCESS;
|
||||
|
@ -3647,12 +3626,6 @@ int main(int argc, char **argv) {
|
|||
}
|
||||
setNetworkOption(FDBNetworkOptions::ENABLE_SLOW_TASK_PROFILING);
|
||||
}
|
||||
// The USE_OBJECT_SERIALIZER network option expects an 8 byte little endian integer which is interpreted as zero =
|
||||
// false, non-zero = true.
|
||||
setNetworkOption(FDBNetworkOptions::USE_OBJECT_SERIALIZER,
|
||||
opt.useObjectSerializer ? LiteralStringRef("\x01\x00\x00\x00\x00\x00\x00\x00")
|
||||
: LiteralStringRef("\x00\x00\x00\x00\x00\x00\x00\x00"));
|
||||
|
||||
initHelp();
|
||||
|
||||
// deferred TLS options
|
||||
|
|
|
@ -511,7 +511,7 @@ ACTOR Future<Void> asyncDeserializeClusterInterface(Reference<AsyncVar<Value>> s
|
|||
Reference<AsyncVar<Optional<ClusterInterface>>> outKnownLeader) {
|
||||
state Reference<AsyncVar<Optional<ClusterControllerClientInterface>>> knownLeader(
|
||||
new AsyncVar<Optional<ClusterControllerClientInterface>>{});
|
||||
state Future<Void> deserializer = asyncDeserialize(serializedInfo, knownLeader, g_network->useObjectSerializer());
|
||||
state Future<Void> deserializer = asyncDeserialize(serializedInfo, knownLeader, FLOW_KNOBS->USE_OBJECT_SERIALIZER);
|
||||
loop {
|
||||
choose {
|
||||
when(wait(deserializer)) { UNSTOPPABLE_ASSERT(false); }
|
||||
|
@ -645,7 +645,7 @@ ACTOR Future<Void> monitorLeaderForProxies( Key clusterKey, vector<NetworkAddres
|
|||
}
|
||||
|
||||
if (leader.get().first.serializedInfo.size()) {
|
||||
if (g_network->useObjectSerializer()) {
|
||||
if (FLOW_KNOBS->USE_OBJECT_SERIALIZER) {
|
||||
ObjectReader reader(leader.get().first.serializedInfo.begin(), IncludeVersion());
|
||||
ClusterControllerClientInterface res;
|
||||
reader.deserialize(res);
|
||||
|
|
|
@ -67,7 +67,7 @@ template <class LeaderInterface>
|
|||
struct LeaderDeserializer {
|
||||
Future<Void> operator()(const Reference<AsyncVar<Value>>& serializedInfo,
|
||||
const Reference<AsyncVar<Optional<LeaderInterface>>>& outKnownLeader) {
|
||||
return asyncDeserialize(serializedInfo, outKnownLeader, g_network->useObjectSerializer());
|
||||
return asyncDeserialize(serializedInfo, outKnownLeader, FLOW_KNOBS->USE_OBJECT_SERIALIZER);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -837,9 +837,6 @@ const UniqueOrderedOptionList<FDBTransactionOptions>& Database::getTransactionDe
|
|||
void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> value) {
|
||||
switch(option) {
|
||||
// SOMEDAY: If the network is already started, should these three throw an error?
|
||||
case FDBNetworkOptions::USE_OBJECT_SERIALIZER:
|
||||
networkOptions.useObjectSerializer = extractIntOption(value) != 0;
|
||||
break;
|
||||
case FDBNetworkOptions::TRACE_ENABLE:
|
||||
networkOptions.traceDirectory = value.present() ? value.get().toString() : "";
|
||||
break;
|
||||
|
@ -990,7 +987,7 @@ void setupNetwork(uint64_t transportId, bool useMetrics) {
|
|||
if (!networkOptions.logClientInfo.present())
|
||||
networkOptions.logClientInfo = true;
|
||||
|
||||
g_network = newNet2(false, useMetrics || networkOptions.traceDirectory.present(), networkOptions.useObjectSerializer);
|
||||
g_network = newNet2(false, useMetrics || networkOptions.traceDirectory.present());
|
||||
FlowTransport::createInstance(true, transportId);
|
||||
Net2FileSystem::newFileSystem();
|
||||
|
||||
|
@ -3351,7 +3348,7 @@ void enableClientInfoLogging() {
|
|||
}
|
||||
|
||||
ACTOR Future<Void> snapshotDatabase(Reference<DatabaseContext> cx, StringRef snapPayload, UID snapUID, Optional<UID> debugID) {
|
||||
TraceEvent("NativeAPI.SnapshotDatabaseEnter")
|
||||
TraceEvent("SnapshotDatabaseEnter")
|
||||
.detail("SnapPayload", snapPayload)
|
||||
.detail("SnapUID", snapUID);
|
||||
try {
|
||||
|
@ -3368,10 +3365,10 @@ ACTOR Future<Void> snapshotDatabase(Reference<DatabaseContext> cx, StringRef sna
|
|||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
TraceEvent("NativeAPI.SnapshotDatabaseError")
|
||||
TraceEvent("SnapshotDatabaseError")
|
||||
.error(e)
|
||||
.detail("SnapPayload", snapPayload)
|
||||
.detail("SnapUID", snapUID)
|
||||
.error(e, true /* includeCancelled */);
|
||||
.detail("SnapUID", snapUID);
|
||||
throw;
|
||||
}
|
||||
return Void();
|
||||
|
|
|
@ -61,13 +61,12 @@ struct NetworkOptions {
|
|||
Optional<bool> logClientInfo;
|
||||
Standalone<VectorRef<ClientVersionRef>> supportedVersions;
|
||||
bool slowTaskProfilingEnabled;
|
||||
bool useObjectSerializer;
|
||||
|
||||
// The default values, TRACE_DEFAULT_ROLL_SIZE and TRACE_DEFAULT_MAX_LOGS_SIZE are located in Trace.h.
|
||||
NetworkOptions()
|
||||
: localAddress(""), clusterFile(""), traceDirectory(Optional<std::string>()),
|
||||
traceRollSize(TRACE_DEFAULT_ROLL_SIZE), traceMaxLogsSize(TRACE_DEFAULT_MAX_LOGS_SIZE), traceLogGroup("default"),
|
||||
traceFormat("xml"), slowTaskProfilingEnabled(false), useObjectSerializer(true) {}
|
||||
traceFormat("xml"), slowTaskProfilingEnabled(false) {}
|
||||
};
|
||||
|
||||
class Database {
|
||||
|
|
|
@ -36,14 +36,6 @@ const KeyRef keyServersEnd = keyServersKeys.end;
|
|||
const KeyRangeRef keyServersKeyServersKeys ( LiteralStringRef("\xff/keyServers/\xff/keyServers/"), LiteralStringRef("\xff/keyServers/\xff/keyServers0"));
|
||||
const KeyRef keyServersKeyServersKey = keyServersKeyServersKeys.begin;
|
||||
|
||||
// list of reserved exec commands
|
||||
const StringRef execSnap = LiteralStringRef("snap"); // snapshot persistent state of
|
||||
// storage, TLog and coordinated state
|
||||
const StringRef execDisableTLogPop = LiteralStringRef("\xff/TLogDisablePop"); // disable pop on TLog
|
||||
const StringRef execEnableTLogPop = LiteralStringRef("\xff/TLogEnablePop"); // enable pop on TLog
|
||||
// used to communicate snap failures between TLog and SnapTest Workload, used only in simulator
|
||||
const StringRef snapTestFailStatus = LiteralStringRef("\xff/SnapTestFailStatus/");
|
||||
|
||||
const Key keyServersKey( const KeyRef& k ) {
|
||||
return k.withPrefix( keyServersPrefix );
|
||||
}
|
||||
|
|
|
@ -306,10 +306,6 @@ extern const KeyRef rebalanceDDIgnoreKey;
|
|||
|
||||
const Value healthyZoneValue( StringRef const& zoneId, Version version );
|
||||
std::pair<Key,Version> decodeHealthyZoneValue( ValueRef const& );
|
||||
extern const StringRef execSnap;
|
||||
extern const StringRef execDisableTLogPop;
|
||||
extern const StringRef execEnableTLogPop;
|
||||
extern const StringRef snapTestFailStatus;
|
||||
|
||||
// All mutations done to this range are blindly copied into txnStateStore.
|
||||
// Used to create artifically large txnStateStore instances in testing.
|
||||
|
|
|
@ -33,9 +33,6 @@ description is not currently required but encouraged.
|
|||
<Option name="local_address" code="10"
|
||||
paramType="String" paramDescription="IP:PORT"
|
||||
description="Deprecated"/>
|
||||
<Option name="use_object_serializer" code="11"
|
||||
paramType="Int" paramDescription="0 is false, every other value is true"
|
||||
description="enable the object serializer for network communication"/>
|
||||
<Option name="cluster_file" code="20"
|
||||
paramType="String" paramDescription="path to cluster file"
|
||||
description="Deprecated"/>
|
||||
|
|
|
@ -201,7 +201,6 @@ struct YieldMockNetwork : INetwork, ReferenceCounted<YieldMockNetwork> {
|
|||
virtual void run() { return baseNetwork->run(); }
|
||||
virtual void getDiskBytes(std::string const& directory, int64_t& free, int64_t& total) { return baseNetwork->getDiskBytes(directory,free,total); }
|
||||
virtual bool isAddressOnThisHost(NetworkAddress const& addr) { return baseNetwork->isAddressOnThisHost(addr); }
|
||||
virtual bool useObjectSerializer() const { return baseNetwork->useObjectSerializer(); }
|
||||
};
|
||||
|
||||
struct NonserializableThing {};
|
||||
|
|
|
@ -337,7 +337,7 @@ struct Peer : NonCopyable {
|
|||
|
||||
pkt.connectPacketLength = sizeof(pkt) - sizeof(pkt.connectPacketLength);
|
||||
pkt.protocolVersion = currentProtocolVersion;
|
||||
if (g_network->useObjectSerializer()) {
|
||||
if (FLOW_KNOBS->USE_OBJECT_SERIALIZER) {
|
||||
pkt.protocolVersion.addObjectSerializerFlag();
|
||||
}
|
||||
pkt.connectionId = transport->transportId;
|
||||
|
@ -654,7 +654,7 @@ ACTOR static void deliver(TransportData* self, Endpoint destination, ArenaReader
|
|||
if (receiver) {
|
||||
try {
|
||||
g_currentDeliveryPeerAddress = destination.addresses;
|
||||
if (g_network->useObjectSerializer()) {
|
||||
if (FLOW_KNOBS->USE_OBJECT_SERIALIZER) {
|
||||
StringRef data = reader.arenaReadAll();
|
||||
ASSERT(data.size() > 8);
|
||||
ArenaObjectReader objReader(reader.arena(), reader.arenaReadAll(), AssumeVersion(reader.protocolVersion()));
|
||||
|
@ -858,7 +858,7 @@ ACTOR static Future<Void> connectionReader(
|
|||
serializer(pktReader, pkt);
|
||||
|
||||
uint64_t connectionId = pkt.connectionId;
|
||||
if(g_network->useObjectSerializer() != pkt.protocolVersion.hasObjectSerializerFlag() ||
|
||||
if((FLOW_KNOBS->USE_OBJECT_SERIALIZER != 0) != pkt.protocolVersion.hasObjectSerializerFlag() ||
|
||||
!pkt.protocolVersion.isCompatible(currentProtocolVersion)) {
|
||||
incompatibleProtocolVersionNewer = pkt.protocolVersion > currentProtocolVersion;
|
||||
NetworkAddress addr = pkt.canonicalRemotePort
|
||||
|
@ -896,8 +896,7 @@ ACTOR static Future<Void> connectionReader(
|
|||
TraceEvent("ConnectionEstablished", conn->getDebugID())
|
||||
.suppressFor(1.0)
|
||||
.detail("Peer", conn->getPeerAddress())
|
||||
.detail("ConnectionId", connectionId)
|
||||
.detail("UseObjectSerializer", g_network->useObjectSerializer());
|
||||
.detail("ConnectionId", connectionId);
|
||||
}
|
||||
|
||||
if(connectionId > 1) {
|
||||
|
@ -1149,7 +1148,7 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c
|
|||
// SOMEDAY: Would it be better to avoid (de)serialization by doing this check in flow?
|
||||
|
||||
Standalone<StringRef> copy;
|
||||
if (g_network->useObjectSerializer()) {
|
||||
if (FLOW_KNOBS->USE_OBJECT_SERIALIZER) {
|
||||
ObjectWriter wr(AssumeVersion(currentProtocolVersion));
|
||||
what.serializeObjectWriter(wr);
|
||||
copy = wr.toStringRef();
|
||||
|
@ -1198,7 +1197,7 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c
|
|||
|
||||
wr.writeAhead(packetInfoSize , &packetInfoBuffer);
|
||||
wr << destination.token;
|
||||
what.serializePacketWriter(wr, g_network->useObjectSerializer());
|
||||
what.serializePacketWriter(wr, FLOW_KNOBS->USE_OBJECT_SERIALIZER);
|
||||
pb = wr.finish();
|
||||
len = wr.size() - packetInfoSize;
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ ACTOR template <class T>
|
|||
void networkSender(Future<T> input, Endpoint endpoint) {
|
||||
try {
|
||||
T value = wait(input);
|
||||
if (g_network->useObjectSerializer()) {
|
||||
if (FLOW_KNOBS->USE_OBJECT_SERIALIZER) {
|
||||
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(value), endpoint);
|
||||
} else {
|
||||
FlowTransport::transport().sendUnreliable(SerializeBoolAnd<T>(true, value), endpoint, false);
|
||||
|
@ -42,7 +42,7 @@ void networkSender(Future<T> input, Endpoint endpoint) {
|
|||
} catch (Error& err) {
|
||||
// if (err.code() == error_code_broken_promise) return;
|
||||
ASSERT(err.code() != error_code_actor_cancelled);
|
||||
if (g_network->useObjectSerializer()) {
|
||||
if (FLOW_KNOBS->USE_OBJECT_SERIALIZER) {
|
||||
FlowTransport::transport().sendUnreliable(SerializeSource<ErrorOr<EnsureTable<T>>>(err), endpoint);
|
||||
} else {
|
||||
FlowTransport::transport().sendUnreliable(SerializeBoolAnd<Error>(false, err), endpoint, false);
|
||||
|
|
|
@ -1092,10 +1092,6 @@ public:
|
|||
return primaryTLogsDead || primaryProcessesDead.validate(storagePolicy);
|
||||
}
|
||||
|
||||
virtual bool useObjectSerializer() const {
|
||||
return net2->useObjectSerializer();
|
||||
}
|
||||
|
||||
// The following function will determine if the specified configuration of available and dead processes can allow the cluster to survive
|
||||
virtual bool canKillProcesses(std::vector<ProcessInfo*> const& availableProcesses, std::vector<ProcessInfo*> const& deadProcesses, KillType kt, KillType* newKillType) const
|
||||
{
|
||||
|
@ -1588,10 +1584,10 @@ public:
|
|||
machines.erase(machineId);
|
||||
}
|
||||
|
||||
Sim2(bool objSerializer) : time(0.0), taskCount(0), yielded(false), yield_limit(0), currentTaskID(TaskPriority::Zero) {
|
||||
Sim2() : time(0.0), taskCount(0), yielded(false), yield_limit(0), currentTaskID(TaskPriority::Zero) {
|
||||
// Not letting currentProcess be NULL eliminates some annoying special cases
|
||||
currentProcess = new ProcessInfo("NoMachine", LocalityData(Optional<Standalone<StringRef>>(), StringRef(), StringRef(), StringRef()), ProcessClass(), {NetworkAddress()}, this, "", "");
|
||||
g_network = net2 = newNet2(false, true, objSerializer);
|
||||
g_network = net2 = newNet2(false, true);
|
||||
Net2FileSystem::newFileSystem();
|
||||
check_yield(TaskPriority::Zero);
|
||||
}
|
||||
|
@ -1699,9 +1695,9 @@ public:
|
|||
int yield_limit; // how many more times yield may return false before next returning true
|
||||
};
|
||||
|
||||
void startNewSimulator(bool objSerializer) {
|
||||
void startNewSimulator() {
|
||||
ASSERT( !g_network );
|
||||
g_network = g_pSimulator = new Sim2(objSerializer);
|
||||
g_network = g_pSimulator = new Sim2();
|
||||
g_simulator.connectionFailuresDisableDuration = deterministicRandom()->random01() < 0.5 ? 0 : 1e6;
|
||||
}
|
||||
|
||||
|
|
|
@ -155,7 +155,6 @@ public:
|
|||
virtual bool isAvailable() const = 0;
|
||||
virtual bool datacenterDead(Optional<Standalone<StringRef>> dcId) const = 0;
|
||||
virtual void displayWorkers() const;
|
||||
virtual bool useObjectSerializer() const = 0;
|
||||
|
||||
virtual void addRole(NetworkAddress const& address, std::string const& role) {
|
||||
roleAddresses[address][role] ++;
|
||||
|
@ -341,7 +340,7 @@ private:
|
|||
extern ISimulator* g_pSimulator;
|
||||
#define g_simulator (*g_pSimulator)
|
||||
|
||||
void startNewSimulator(bool useObjectSerializer);
|
||||
void startNewSimulator();
|
||||
|
||||
//Parameters used to simulate disk performance
|
||||
struct DiskParameters : ReferenceCounted<DiskParameters> {
|
||||
|
|
|
@ -2723,6 +2723,11 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
|
|||
}
|
||||
}
|
||||
|
||||
// Failed server should not trigger DD if SS failures are set to be ignored
|
||||
if (!badTeam && self->healthyZone.get().present() && (self->healthyZone.get().get() == ignoreSSFailuresZoneString)) {
|
||||
ASSERT_WE_THINK(serversLeft == self->configuration.storageTeamSize);
|
||||
}
|
||||
|
||||
if( !self->initialFailureReactionDelay.isReady() ) {
|
||||
change.push_back( self->initialFailureReactionDelay );
|
||||
}
|
||||
|
@ -2880,11 +2885,6 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
|
|||
rs.keys = shards[i];
|
||||
rs.priority = maxPriority;
|
||||
|
||||
// Failed server should not trigger DD if SS failures are set to be ignored
|
||||
if (rs.priority == PRIORITY_TEAM_UNHEALTHY) {
|
||||
ASSERT_WE_THINK(!(!badTeam && self->healthyZone.get().present() &&
|
||||
(self->healthyZone.get().get() == ignoreSSFailuresZoneString)));
|
||||
}
|
||||
self->output.send(rs);
|
||||
if(deterministicRandom()->random01() < 0.01) {
|
||||
TraceEvent("SendRelocateToDDQx100", self->distributorId)
|
||||
|
@ -4138,7 +4138,7 @@ static std::set<int> const& normalDataDistributorErrors() {
|
|||
|
||||
ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<AsyncVar<struct ServerDBInfo>> db ) {
|
||||
state Database cx = openDBOnServer(db, TaskPriority::DefaultDelay, true, true);
|
||||
TraceEvent("SnapDataDistributor.SnapReqEnter")
|
||||
TraceEvent("SnapDataDistributor_SnapReqEnter")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID);
|
||||
try {
|
||||
|
@ -4152,12 +4152,12 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
|
|||
}
|
||||
wait(waitForAll(disablePops));
|
||||
|
||||
TraceEvent("SnapDataDistributor.AfterDisableTLogPop")
|
||||
TraceEvent("SnapDataDistributor_AfterDisableTLogPop")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID);
|
||||
// snap local storage nodes
|
||||
std::vector<WorkerInterface> storageWorkers = wait(getStorageWorkers(cx, db, true /* localOnly */));
|
||||
TraceEvent("SnapDataDistributor.GotStorageWorkers")
|
||||
TraceEvent("SnapDataDistributor_GotStorageWorkers")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID);
|
||||
std::vector<Future<Void>> storageSnapReqs;
|
||||
|
@ -4168,7 +4168,7 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
|
|||
}
|
||||
wait(waitForAll(storageSnapReqs));
|
||||
|
||||
TraceEvent("SnapDataDistributor.AfterSnapStorage")
|
||||
TraceEvent("SnapDataDistributor_AfterSnapStorage")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID);
|
||||
// snap local tlog nodes
|
||||
|
@ -4180,7 +4180,7 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
|
|||
}
|
||||
wait(waitForAll(tLogSnapReqs));
|
||||
|
||||
TraceEvent("SnapDataDistributor.AfterTLogStorage")
|
||||
TraceEvent("SnapDataDistributor_AfterTLogStorage")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID);
|
||||
// enable tlog pop on local tlog nodes
|
||||
|
@ -4192,12 +4192,12 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
|
|||
}
|
||||
wait(waitForAll(enablePops));
|
||||
|
||||
TraceEvent("SnapDataDistributor.AfterEnableTLogPops")
|
||||
TraceEvent("SnapDataDistributor_AfterEnableTLogPops")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID);
|
||||
// snap the coordinators
|
||||
std::vector<WorkerInterface> coordWorkers = wait(getCoordWorkers(cx, db));
|
||||
TraceEvent("SnapDataDistributor.GotCoordWorkers")
|
||||
TraceEvent("SnapDataDistributor_GotCoordWorkers")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID);
|
||||
std::vector<Future<Void>> coordSnapReqs;
|
||||
|
@ -4207,11 +4207,11 @@ ACTOR Future<Void> ddSnapCreateCore(DistributorSnapRequest snapReq, Reference<As
|
|||
);
|
||||
}
|
||||
wait(waitForAll(coordSnapReqs));
|
||||
TraceEvent("SnapDataDistributor.AfterSnapCoords")
|
||||
TraceEvent("SnapDataDistributor_AfterSnapCoords")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID);
|
||||
} catch (Error& e) {
|
||||
TraceEvent("SnapDataDistributor.SnapReqExit")
|
||||
TraceEvent("SnapDataDistributor_SnapReqExit")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID)
|
||||
.error(e, true /*includeCancelled */);
|
||||
|
|
|
@ -81,6 +81,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
|
|||
init( TLOG_DEGRADED_DELAY_COUNT, 5 );
|
||||
init( TLOG_DEGRADED_DURATION, 5.0 );
|
||||
init( TLOG_IGNORE_POP_AUTO_ENABLE_DELAY, 300.0 );
|
||||
init( TXS_POPPED_MAX_DELAY, 1.0 ); if ( randomize && BUGGIFY ) TXS_POPPED_MAX_DELAY = deterministicRandom()->random01();
|
||||
|
||||
// disk snapshot max timeout, to be put in TLog, storage and coordinator nodes
|
||||
init( SNAP_CREATE_MAX_TIMEOUT, 300.0 );
|
||||
|
|
|
@ -83,6 +83,7 @@ public:
|
|||
int DISK_QUEUE_MAX_TRUNCATE_BYTES; // A truncate larger than this will cause the file to be replaced instead.
|
||||
int TLOG_DEGRADED_DELAY_COUNT;
|
||||
double TLOG_DEGRADED_DURATION;
|
||||
double TXS_POPPED_MAX_DELAY;
|
||||
|
||||
// Data distribution queue
|
||||
double HEALTH_POLL_TIME;
|
||||
|
|
|
@ -61,9 +61,9 @@ Future<Void> tryBecomeLeader( ServerCoordinators const& coordinators,
|
|||
Reference<AsyncVar<Value>> serializedInfo(new AsyncVar<Value>);
|
||||
Future<Void> m = tryBecomeLeaderInternal(
|
||||
coordinators,
|
||||
g_network->useObjectSerializer() ? ObjectWriter::toValue(proposedInterface, IncludeVersion()) : BinaryWriter::toValue(proposedInterface, IncludeVersion()),
|
||||
FLOW_KNOBS->USE_OBJECT_SERIALIZER ? ObjectWriter::toValue(proposedInterface, IncludeVersion()) : BinaryWriter::toValue(proposedInterface, IncludeVersion()),
|
||||
serializedInfo, hasConnected, asyncPriorityInfo);
|
||||
return m || asyncDeserialize(serializedInfo, outKnownLeader, g_network->useObjectSerializer());
|
||||
return m || asyncDeserialize(serializedInfo, outKnownLeader, FLOW_KNOBS->USE_OBJECT_SERIALIZER);
|
||||
}
|
||||
|
||||
#pragma endregion
|
||||
|
|
|
@ -662,6 +662,8 @@ struct ILogSystem {
|
|||
virtual Reference<IPeekCursor> peekTxs( UID dbgid, Version begin, int8_t peekLocality, Version localEnd, bool canDiscardPopped ) = 0;
|
||||
// Same contract as peek(), but only for peeking the txsLocality. It allows specifying a preferred peek locality.
|
||||
|
||||
virtual Future<Version> getTxsPoppedVersion() = 0;
|
||||
|
||||
virtual Version getKnownCommittedVersion() = 0;
|
||||
|
||||
virtual Future<Void> onKnownCommittedVersionChange() = 0;
|
||||
|
|
|
@ -194,6 +194,6 @@ Future<LogSystemDiskQueueAdapter::CommitMessage> LogSystemDiskQueueAdapter::getC
|
|||
return pcm.getFuture();
|
||||
}
|
||||
|
||||
LogSystemDiskQueueAdapter* openDiskQueueAdapter( Reference<ILogSystem> logSystem, Reference<AsyncVar<PeekTxsInfo>> peekLocality ) {
|
||||
return new LogSystemDiskQueueAdapter( logSystem, peekLocality );
|
||||
LogSystemDiskQueueAdapter* openDiskQueueAdapter( Reference<ILogSystem> logSystem, Reference<AsyncVar<PeekTxsInfo>> peekLocality, Version txsPoppedVersion ) {
|
||||
return new LogSystemDiskQueueAdapter( logSystem, peekLocality, txsPoppedVersion, true );
|
||||
}
|
||||
|
|
|
@ -52,10 +52,10 @@ public:
|
|||
|
||||
// It does, however, peek the specified tag directly at recovery time.
|
||||
|
||||
LogSystemDiskQueueAdapter( Reference<ILogSystem> logSystem, Reference<AsyncVar<PeekTxsInfo>> peekLocality, bool recover=true ) : logSystem(logSystem), peekLocality(peekLocality), enableRecovery(recover), recoveryLoc(1), recoveryQueueLoc(1), poppedUpTo(0), nextCommit(1), recoveryQueueDataSize(0), peekTypeSwitches(0), hasDiscardedData(false), totalRecoveredBytes(0) {
|
||||
LogSystemDiskQueueAdapter( Reference<ILogSystem> logSystem, Reference<AsyncVar<PeekTxsInfo>> peekLocality, Version txsPoppedVersion, bool recover ) : logSystem(logSystem), peekLocality(peekLocality), enableRecovery(recover), recoveryLoc(txsPoppedVersion), recoveryQueueLoc(txsPoppedVersion), poppedUpTo(0), nextCommit(1), recoveryQueueDataSize(0), peekTypeSwitches(0), hasDiscardedData(false), totalRecoveredBytes(0) {
|
||||
if (enableRecovery) {
|
||||
localityChanged = peekLocality ? peekLocality->onChange() : Never();
|
||||
cursor = logSystem->peekTxs( UID(), 1, peekLocality ? peekLocality->get().primaryLocality : tagLocalityInvalid, peekLocality ? peekLocality->get().knownCommittedVersion : invalidVersion, true );
|
||||
cursor = logSystem->peekTxs( UID(), txsPoppedVersion, peekLocality ? peekLocality->get().primaryLocality : tagLocalityInvalid, peekLocality ? peekLocality->get().knownCommittedVersion : invalidVersion, true );
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -115,6 +115,6 @@ private:
|
|||
friend class LogSystemDiskQueueAdapterImpl;
|
||||
};
|
||||
|
||||
LogSystemDiskQueueAdapter* openDiskQueueAdapter( Reference<ILogSystem> logSystem, Reference<AsyncVar<PeekTxsInfo>> peekLocality );
|
||||
LogSystemDiskQueueAdapter* openDiskQueueAdapter( Reference<ILogSystem> logSystem, Reference<AsyncVar<PeekTxsInfo>> peekLocality, Version txsPoppedVersion );
|
||||
|
||||
#endif
|
||||
|
|
|
@ -1042,7 +1042,7 @@ ACTOR Future<Void> bufferedGetMore( ILogSystem::BufferedCursor* self, TaskPriori
|
|||
cursor->advanceTo(self->messageVersion);
|
||||
}
|
||||
self->messageIndex = self->messages.size();
|
||||
if (self->messages.size() > 0 && self->messages[self->messages.size()-1].version < self->messageVersion) {
|
||||
if (self->messages.size() > 0 && self->messages[self->messages.size()-1].version.version < self->poppedVersion) {
|
||||
self->hasNextMessage = false;
|
||||
} else {
|
||||
auto iter = std::lower_bound(self->messages.begin(), self->messages.end(),
|
||||
|
|
|
@ -1455,7 +1455,7 @@ ACTOR Future<Void> lastCommitUpdater(ProxyCommitData* self, PromiseStream<Future
|
|||
}
|
||||
|
||||
ACTOR Future<Void> proxySnapCreate(ProxySnapRequest snapReq, ProxyCommitData* commitData) {
|
||||
TraceEvent("SnapMasterProxy.SnapReqEnter")
|
||||
TraceEvent("SnapMasterProxy_SnapReqEnter")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID);
|
||||
try {
|
||||
|
@ -1463,7 +1463,7 @@ ACTOR Future<Void> proxySnapCreate(ProxySnapRequest snapReq, ProxyCommitData* co
|
|||
ExecCmdValueString execArg(snapReq.snapPayload);
|
||||
StringRef binPath = execArg.getBinaryPath();
|
||||
if (!isWhitelisted(commitData->whitelistedBinPathVec, binPath)) {
|
||||
TraceEvent("SnapMasterProxy.WhiteListCheckFailed")
|
||||
TraceEvent("SnapMasterProxy_WhiteListCheckFailed")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID);
|
||||
throw transaction_not_permitted();
|
||||
|
@ -1475,7 +1475,7 @@ ACTOR Future<Void> proxySnapCreate(ProxySnapRequest snapReq, ProxyCommitData* co
|
|||
// Currently, snapshot of old tlog generation is not
|
||||
// supported and hence failing the snapshot request until
|
||||
// cluster is fully_recovered.
|
||||
TraceEvent("SnapMasterProxy.ClusterNotFullyRecovered")
|
||||
TraceEvent("SnapMasterProxy_ClusterNotFullyRecovered")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID);
|
||||
throw cluster_not_fully_recovered();
|
||||
|
@ -1490,7 +1490,7 @@ ACTOR Future<Void> proxySnapCreate(ProxySnapRequest snapReq, ProxyCommitData* co
|
|||
// FIXME: logAntiQuorum not supported, remove it later,
|
||||
// In version2, we probably don't need this limtiation, but this needs to be tested.
|
||||
if (logAntiQuorum > 0) {
|
||||
TraceEvent("SnapMasterProxy.LogAnitQuorumNotSupported")
|
||||
TraceEvent("SnapMasterProxy_LogAnitQuorumNotSupported")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID);
|
||||
throw txn_exec_log_anti_quorum();
|
||||
|
@ -1506,7 +1506,7 @@ ACTOR Future<Void> proxySnapCreate(ProxySnapRequest snapReq, ProxyCommitData* co
|
|||
try {
|
||||
wait(throwErrorOr(ddSnapReq));
|
||||
} catch (Error& e) {
|
||||
TraceEvent("SnapMasterProxy.DDSnapResponseError")
|
||||
TraceEvent("SnapMasterProxy_DDSnapResponseError")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID)
|
||||
.error(e, true /*includeCancelled*/ );
|
||||
|
@ -1514,7 +1514,7 @@ ACTOR Future<Void> proxySnapCreate(ProxySnapRequest snapReq, ProxyCommitData* co
|
|||
}
|
||||
snapReq.reply.send(Void());
|
||||
} catch (Error& e) {
|
||||
TraceEvent("SnapMasterProxy.SnapReqError")
|
||||
TraceEvent("SnapMasterProxy_SnapReqError")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID)
|
||||
.error(e, true /*includeCancelled*/);
|
||||
|
@ -1524,7 +1524,7 @@ ACTOR Future<Void> proxySnapCreate(ProxySnapRequest snapReq, ProxyCommitData* co
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
TraceEvent("SnapMasterProxy.SnapReqExit")
|
||||
TraceEvent("SnapMasterProxy_SnapReqExit")
|
||||
.detail("SnapPayload", snapReq.snapPayload)
|
||||
.detail("SnapUID", snapReq.snapUID);
|
||||
return Void();
|
||||
|
@ -1576,7 +1576,7 @@ ACTOR Future<Void> masterProxyServerCore(
|
|||
r->value().emplace_back(0,0);
|
||||
|
||||
commitData.logSystem = ILogSystem::fromServerDBInfo(proxy.id(), commitData.db->get(), false, addActor);
|
||||
commitData.logAdapter = new LogSystemDiskQueueAdapter(commitData.logSystem, Reference<AsyncVar<PeekTxsInfo>>(), false);
|
||||
commitData.logAdapter = new LogSystemDiskQueueAdapter(commitData.logSystem, Reference<AsyncVar<PeekTxsInfo>>(), 1, false);
|
||||
commitData.txnStateStore = keyValueStoreLogSystem(commitData.logAdapter, proxy.id(), 2e9, true, true, true);
|
||||
createWhitelistBinPathVec(whitelistBinPaths, commitData.whitelistedBinPathVec);
|
||||
|
||||
|
|
|
@ -906,6 +906,41 @@ namespace oldTLog_4_6 {
|
|||
|
||||
state Version endVersion = logData->version.get() + 1;
|
||||
|
||||
Version poppedVer = poppedVersion(logData, oldTag);
|
||||
if(poppedVer > req.begin) {
|
||||
TLogPeekReply rep;
|
||||
rep.maxKnownVersion = logData->version.get();
|
||||
rep.minKnownCommittedVersion = 0;
|
||||
rep.popped = poppedVer;
|
||||
rep.end = poppedVer;
|
||||
rep.onlySpilled = false;
|
||||
|
||||
if(req.sequence.present()) {
|
||||
auto& trackerData = self->peekTracker[peekId];
|
||||
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
||||
trackerData.lastUpdate = now();
|
||||
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
|
||||
req.reply.sendError(timed_out());
|
||||
if (!sequenceData.isSet())
|
||||
sequenceData.sendError(timed_out());
|
||||
return Void();
|
||||
}
|
||||
if(sequenceData.isSet()) {
|
||||
if(sequenceData.getFuture().get() != rep.end) {
|
||||
TEST(true); //tlog peek second attempt ended at a different version
|
||||
req.reply.sendError(timed_out());
|
||||
return Void();
|
||||
}
|
||||
} else {
|
||||
sequenceData.send(rep.end);
|
||||
}
|
||||
rep.begin = req.begin;
|
||||
}
|
||||
|
||||
req.reply.send( rep );
|
||||
return Void();
|
||||
}
|
||||
|
||||
//grab messages from disk
|
||||
//TraceEvent("TLogPeekMessages", self->dbgid).detail("ReqBeginEpoch", req.begin.epoch).detail("ReqBeginSeq", req.begin.sequence).detail("Epoch", self->epoch()).detail("PersistentDataSeq", self->persistentDataSequence).detail("Tag1", req.tag1).detail("Tag2", req.tag2);
|
||||
if( req.begin <= logData->persistentDataDurableVersion ) {
|
||||
|
@ -948,19 +983,13 @@ namespace oldTLog_4_6 {
|
|||
//TraceEvent("TLogPeekResults", self->dbgid).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress()).detail("MessageBytes", messages.getLength()).detail("NextEpoch", next_pos.epoch).detail("NextSeq", next_pos.sequence).detail("NowSeq", self->sequence.getNextSequence());
|
||||
}
|
||||
|
||||
Version poppedVer = poppedVersion(logData, oldTag);
|
||||
|
||||
TLogPeekReply reply;
|
||||
reply.maxKnownVersion = logData->version.get();
|
||||
reply.minKnownCommittedVersion = 0;
|
||||
reply.onlySpilled = false;
|
||||
if(poppedVer > req.begin) {
|
||||
reply.popped = poppedVer;
|
||||
reply.end = poppedVer;
|
||||
} else {
|
||||
reply.messages = messages.toValue();
|
||||
reply.end = endVersion;
|
||||
}
|
||||
reply.messages = messages.toValue();
|
||||
reply.end = endVersion;
|
||||
|
||||
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress());
|
||||
|
||||
if(req.sequence.present()) {
|
||||
|
|
|
@ -1056,6 +1056,65 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Version> getPoppedFromTLog( Reference<AsyncVar<OptionalInterface<TLogInterface>>> log, Tag tag ) {
|
||||
loop {
|
||||
choose {
|
||||
when( TLogPeekReply rep = wait( log->get().present() ? brokenPromiseToNever(log->get().interf().peekMessages.getReply(TLogPeekRequest(-1, tag, false, false))) : Never() ) ) {
|
||||
ASSERT(rep.popped.present());
|
||||
return rep.popped.get();
|
||||
}
|
||||
when( wait( log->onChange() ) ) {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Version> getPoppedTxs(TagPartitionedLogSystem* self) {
|
||||
state std::vector<std::vector<Future<Version>>> poppedFutures;
|
||||
state std::vector<Future<Void>> poppedReady;
|
||||
if(self->tLogs.size()) {
|
||||
poppedFutures.push_back( std::vector<Future<Version>>() );
|
||||
for(auto& it : self->tLogs) {
|
||||
for(auto& log : it->logServers) {
|
||||
poppedFutures.back().push_back(getPoppedFromTLog(log, self->tLogs[0]->tLogVersion < TLogVersion::V4 ? txsTag : Tag(tagLocalityTxs, 0)));
|
||||
}
|
||||
}
|
||||
poppedReady.push_back(waitForAny(poppedFutures.back()));
|
||||
}
|
||||
|
||||
for(auto& old : self->oldLogData) {
|
||||
if(old.tLogs.size()) {
|
||||
poppedFutures.push_back( std::vector<Future<Version>>() );
|
||||
for(auto& it : old.tLogs) {
|
||||
for(auto& log : it->logServers) {
|
||||
poppedFutures.back().push_back(getPoppedFromTLog(log, old.tLogs[0]->tLogVersion < TLogVersion::V4 ? txsTag : Tag(tagLocalityTxs, 0)));
|
||||
}
|
||||
}
|
||||
poppedReady.push_back(waitForAny(poppedFutures.back()));
|
||||
}
|
||||
}
|
||||
|
||||
state Future<Void> maxGetPoppedDuration = delay(SERVER_KNOBS->TXS_POPPED_MAX_DELAY);
|
||||
wait( waitForAll(poppedReady) || maxGetPoppedDuration );
|
||||
|
||||
if(maxGetPoppedDuration.isReady()) {
|
||||
TraceEvent(SevWarnAlways, "PoppedTxsNotReady", self->dbgid);
|
||||
}
|
||||
|
||||
Version maxPopped = 1;
|
||||
for(auto &it : poppedFutures) {
|
||||
for(auto &v : it) {
|
||||
if(v.isReady()) {
|
||||
maxPopped = std::max(maxPopped, v.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
return maxPopped;
|
||||
}
|
||||
|
||||
virtual Future<Version> getTxsPoppedVersion() {
|
||||
return getPoppedTxs(this);
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> confirmEpochLive_internal(Reference<LogSet> logSet, Optional<UID> debugID) {
|
||||
state vector<Future<Void>> alive;
|
||||
int numPresent = 0;
|
||||
|
|
|
@ -82,57 +82,12 @@
|
|||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
enum {
|
||||
OPT_CONNFILE,
|
||||
OPT_SEEDCONNFILE,
|
||||
OPT_SEEDCONNSTRING,
|
||||
OPT_ROLE,
|
||||
OPT_LISTEN,
|
||||
OPT_PUBLICADDR,
|
||||
OPT_DATAFOLDER,
|
||||
OPT_LOGFOLDER,
|
||||
OPT_PARENTPID,
|
||||
OPT_NEWCONSOLE,
|
||||
OPT_NOBOX,
|
||||
OPT_TESTFILE,
|
||||
OPT_RESTARTING,
|
||||
OPT_RESTORING,
|
||||
OPT_RANDOMSEED,
|
||||
OPT_KEY,
|
||||
OPT_MEMLIMIT,
|
||||
OPT_STORAGEMEMLIMIT,
|
||||
OPT_CACHEMEMLIMIT,
|
||||
OPT_MACHINEID,
|
||||
OPT_DCID,
|
||||
OPT_MACHINE_CLASS,
|
||||
OPT_BUGGIFY,
|
||||
OPT_VERSION,
|
||||
OPT_CRASHONERROR,
|
||||
OPT_HELP,
|
||||
OPT_NETWORKIMPL,
|
||||
OPT_NOBUFSTDOUT,
|
||||
OPT_BUFSTDOUTERR,
|
||||
OPT_TRACECLOCK,
|
||||
OPT_NUMTESTERS,
|
||||
OPT_DEVHELP,
|
||||
OPT_ROLLSIZE,
|
||||
OPT_MAXLOGS,
|
||||
OPT_MAXLOGSSIZE,
|
||||
OPT_KNOB,
|
||||
OPT_TESTSERVERS,
|
||||
OPT_TEST_ON_SERVERS,
|
||||
OPT_METRICSCONNFILE,
|
||||
OPT_METRICSPREFIX,
|
||||
OPT_LOGGROUP,
|
||||
OPT_LOCALITY,
|
||||
OPT_IO_TRUST_SECONDS,
|
||||
OPT_IO_TRUST_WARN_ONLY,
|
||||
OPT_FILESYSTEM,
|
||||
OPT_PROFILER_RSS_SIZE,
|
||||
OPT_KVFILE,
|
||||
OPT_TRACE_FORMAT,
|
||||
OPT_USE_OBJECT_SERIALIZER,
|
||||
OPT_WHITELIST_BINPATH,
|
||||
OPT_BLOB_CREDENTIAL_FILE
|
||||
OPT_CONNFILE, OPT_SEEDCONNFILE, OPT_SEEDCONNSTRING, OPT_ROLE, OPT_LISTEN, OPT_PUBLICADDR, OPT_DATAFOLDER, OPT_LOGFOLDER, OPT_PARENTPID, OPT_NEWCONSOLE,
|
||||
OPT_NOBOX, OPT_TESTFILE, OPT_RESTARTING, OPT_RESTORING, OPT_RANDOMSEED, OPT_KEY, OPT_MEMLIMIT, OPT_STORAGEMEMLIMIT, OPT_CACHEMEMLIMIT, OPT_MACHINEID,
|
||||
OPT_DCID, OPT_MACHINE_CLASS, OPT_BUGGIFY, OPT_VERSION, OPT_CRASHONERROR, OPT_HELP, OPT_NETWORKIMPL, OPT_NOBUFSTDOUT, OPT_BUFSTDOUTERR, OPT_TRACECLOCK,
|
||||
OPT_NUMTESTERS, OPT_DEVHELP, OPT_ROLLSIZE, OPT_MAXLOGS, OPT_MAXLOGSSIZE, OPT_KNOB, OPT_TESTSERVERS, OPT_TEST_ON_SERVERS, OPT_METRICSCONNFILE,
|
||||
OPT_METRICSPREFIX, OPT_LOGGROUP, OPT_LOCALITY, OPT_IO_TRUST_SECONDS, OPT_IO_TRUST_WARN_ONLY, OPT_FILESYSTEM, OPT_PROFILER_RSS_SIZE, OPT_KVFILE,
|
||||
OPT_TRACE_FORMAT, OPT_WHITELIST_BINPATH, OPT_BLOB_CREDENTIAL_FILE
|
||||
};
|
||||
|
||||
CSimpleOpt::SOption g_rgOptions[] = {
|
||||
|
@ -210,8 +165,6 @@ CSimpleOpt::SOption g_rgOptions[] = {
|
|||
{ OPT_IO_TRUST_SECONDS, "--io_trust_seconds", SO_REQ_SEP },
|
||||
{ OPT_IO_TRUST_WARN_ONLY, "--io_trust_warn_only", SO_NONE },
|
||||
{ OPT_TRACE_FORMAT , "--trace_format", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "-S", SO_REQ_SEP },
|
||||
{ OPT_USE_OBJECT_SERIALIZER, "--object-serializer", SO_REQ_SEP },
|
||||
{ OPT_WHITELIST_BINPATH, "--whitelist_binpath", SO_REQ_SEP },
|
||||
|
||||
#ifndef TLS_DISABLED
|
||||
|
@ -648,10 +601,6 @@ static void printUsage( const char *name, bool devhelp ) {
|
|||
" Machine class (valid options are storage, transaction,\n"
|
||||
" resolution, proxy, master, test, unset, stateless, log, router,\n"
|
||||
" and cluster_controller).\n");
|
||||
printf(" -S ON|OFF, --object-serializer ON|OFF\n"
|
||||
" Use object serializer for sending messages. The object serializer\n"
|
||||
" is currently a beta feature and it allows fdb processes to talk to\n"
|
||||
" each other even if they don't have the same version\n");
|
||||
#ifndef TLS_DISABLED
|
||||
printf(TLS_HELP);
|
||||
#endif
|
||||
|
@ -1017,7 +966,6 @@ int main(int argc, char* argv[]) {
|
|||
std::vector<std::string> blobCredentials; // used for fast restore workers
|
||||
// const char *blobCredsFromENV = nullptr;
|
||||
uint64_t rsssize = -1;
|
||||
bool useObjectSerializer = true;
|
||||
|
||||
if( argc == 1 ) {
|
||||
printUsage(argv[0], false);
|
||||
|
@ -1398,21 +1346,6 @@ int main(int argc, char* argv[]) {
|
|||
fprintf(stderr, "WARNING: Unrecognized trace format `%s'\n", args.OptionArg());
|
||||
}
|
||||
break;
|
||||
case OPT_USE_OBJECT_SERIALIZER:
|
||||
{
|
||||
std::string s = args.OptionArg();
|
||||
std::transform(s.begin(), s.end(), s.begin(), ::tolower);
|
||||
if (s == "on" || s == "true" || s == "1") {
|
||||
useObjectSerializer = true;
|
||||
} else if (s == "off" || s == "false" || s == "0") {
|
||||
useObjectSerializer = false;
|
||||
} else {
|
||||
fprintf(stderr, "ERROR: Could not parse object serializer option: `%s'\n", s.c_str());
|
||||
printHelpTeaser(argv[0]);
|
||||
flushAndExit(FDB_EXIT_ERROR);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case OPT_WHITELIST_BINPATH:
|
||||
whitelistBinPaths = args.OptionArg();
|
||||
break;
|
||||
|
@ -1629,10 +1562,10 @@ int main(int argc, char* argv[]) {
|
|||
|
||||
if (role == Simulation || role == CreateTemplateDatabase) {
|
||||
//startOldSimulator();
|
||||
startNewSimulator(useObjectSerializer);
|
||||
startNewSimulator();
|
||||
openTraceFile(NetworkAddress(), rollsize, maxLogsSize, logFolder, "trace", logGroup);
|
||||
} else {
|
||||
g_network = newNet2(useThreadPool, true, useObjectSerializer);
|
||||
g_network = newNet2(useThreadPool, true);
|
||||
FlowTransport::createInstance(false, 1);
|
||||
|
||||
const bool expectsPublicAddress = (role == FDBD || role == NetworkTestServer || role == Restore);
|
||||
|
|
|
@ -618,7 +618,7 @@ ACTOR Future<Void> updateLocalityForDcId(Optional<Key> dcId, Reference<ILogSyste
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> readTransactionSystemState( Reference<MasterData> self, Reference<ILogSystem> oldLogSystem ) {
|
||||
ACTOR Future<Void> readTransactionSystemState( Reference<MasterData> self, Reference<ILogSystem> oldLogSystem, Version txsPoppedVersion ) {
|
||||
state Reference<AsyncVar<PeekTxsInfo>> myLocality = Reference<AsyncVar<PeekTxsInfo>>( new AsyncVar<PeekTxsInfo>(PeekTxsInfo(tagLocalityInvalid,tagLocalityInvalid,invalidVersion) ) );
|
||||
state Future<Void> localityUpdater = updateLocalityForDcId(self->myInterface.locality.dcId(), oldLogSystem, myLocality);
|
||||
// Peek the txnStateTag in oldLogSystem and recover self->txnStateStore
|
||||
|
@ -630,7 +630,7 @@ ACTOR Future<Void> readTransactionSystemState( Reference<MasterData> self, Refer
|
|||
|
||||
// Recover transaction state store
|
||||
if(self->txnStateStore) self->txnStateStore->close();
|
||||
self->txnStateLogAdapter = openDiskQueueAdapter( oldLogSystem, myLocality );
|
||||
self->txnStateLogAdapter = openDiskQueueAdapter( oldLogSystem, myLocality, txsPoppedVersion );
|
||||
self->txnStateStore = keyValueStoreLogSystem( self->txnStateLogAdapter, self->dbgid, self->memoryLimit, false, false, true );
|
||||
|
||||
// Versionstamped operations (particularly those applied from DR) define a minimum commit version
|
||||
|
@ -802,7 +802,7 @@ void updateConfigForForcedRecovery(Reference<MasterData> self, vector<Standalone
|
|||
initialConfChanges->push_back(regionCommit);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> recoverFrom( Reference<MasterData> self, Reference<ILogSystem> oldLogSystem, vector<StorageServerInterface>* seedServers, vector<Standalone<CommitTransactionRef>>* initialConfChanges ) {
|
||||
ACTOR Future<Void> recoverFrom( Reference<MasterData> self, Reference<ILogSystem> oldLogSystem, vector<StorageServerInterface>* seedServers, vector<Standalone<CommitTransactionRef>>* initialConfChanges, Future<Version> poppedTxsVersion ) {
|
||||
TraceEvent("MasterRecoveryState", self->dbgid)
|
||||
.detail("StatusCode", RecoveryStatus::reading_transaction_system_state)
|
||||
.detail("Status", RecoveryStatus::names[RecoveryStatus::reading_transaction_system_state])
|
||||
|
@ -812,7 +812,8 @@ ACTOR Future<Void> recoverFrom( Reference<MasterData> self, Reference<ILogSystem
|
|||
if(BUGGIFY)
|
||||
wait( delay(10.0) );
|
||||
|
||||
wait( readTransactionSystemState( self, oldLogSystem ) );
|
||||
Version txsPoppedVersion = wait( poppedTxsVersion );
|
||||
wait( readTransactionSystemState( self, oldLogSystem, txsPoppedVersion ) );
|
||||
for (auto& itr : *initialConfChanges) {
|
||||
for(auto& m : itr.mutations) {
|
||||
self->configuration.applyMutation( m );
|
||||
|
@ -1249,6 +1250,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
|
|||
state vector<Standalone<CommitTransactionRef>> initialConfChanges;
|
||||
state Future<Void> logChanges;
|
||||
state Future<Void> minRecoveryDuration;
|
||||
state Future<Version> poppedTxsVersion;
|
||||
|
||||
loop {
|
||||
Reference<ILogSystem> oldLogSystem = oldLogSystems->get();
|
||||
|
@ -1256,6 +1258,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
|
|||
logChanges = triggerUpdates(self, oldLogSystem);
|
||||
if(!minRecoveryDuration.isValid()) {
|
||||
minRecoveryDuration = delay(SERVER_KNOBS->ENFORCED_MIN_RECOVERY_DURATION);
|
||||
poppedTxsVersion = oldLogSystem->getTxsPoppedVersion();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1263,7 +1266,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
|
|||
self->registrationTrigger.trigger();
|
||||
|
||||
choose {
|
||||
when (wait( oldLogSystem ? recoverFrom(self, oldLogSystem, &seedServers, &initialConfChanges) : Never() )) { reg.cancel(); break; }
|
||||
when (wait( oldLogSystem ? recoverFrom(self, oldLogSystem, &seedServers, &initialConfChanges, poppedTxsVersion) : Never() )) { reg.cancel(); break; }
|
||||
when (wait( oldLogSystems->onChange() )) {}
|
||||
when (wait( reg )) { throw internal_error(); }
|
||||
when (wait( recoverAndEndEpoch )) {}
|
||||
|
|
|
@ -35,16 +35,21 @@ static std::set<int> const& normalAttritionErrors() {
|
|||
return s;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> resetHealthyZoneAfter(Database cx, double duration) {
|
||||
ACTOR Future<bool> ignoreSSFailuresForDuration(Database cx, double duration) {
|
||||
// duration doesn't matter since this won't timeout
|
||||
TraceEvent("IgnoreSSFailureStart");
|
||||
bool _ = wait(setHealthyZone(cx, ignoreSSFailuresZoneString, 0));
|
||||
TraceEvent("IgnoreSSFailureWait");
|
||||
wait(delay(duration));
|
||||
TraceEvent("IgnoreSSFailureClear");
|
||||
state Transaction tr(cx);
|
||||
state Future<Void> delayF = delay(duration);
|
||||
loop {
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
|
||||
wait(delayF);
|
||||
tr.clear(healthyZoneKey);
|
||||
wait(tr.commit());
|
||||
return Void();
|
||||
TraceEvent("IgnoreSSFailureComplete");
|
||||
return true;
|
||||
} catch (Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
|
@ -61,6 +66,7 @@ struct MachineAttritionWorkload : TestWorkload {
|
|||
bool replacement;
|
||||
bool waitForVersion;
|
||||
bool allowFaultInjection;
|
||||
Future<bool> ignoreSSFailures;
|
||||
|
||||
// This is set in setup from the list of workers when the cluster is started
|
||||
std::vector<LocalityData> machines;
|
||||
|
@ -78,6 +84,7 @@ struct MachineAttritionWorkload : TestWorkload {
|
|||
replacement = getOption( options, LiteralStringRef("replacement"), reboot && deterministicRandom()->random01() < 0.5 );
|
||||
waitForVersion = getOption( options, LiteralStringRef("waitForVersion"), false );
|
||||
allowFaultInjection = getOption( options, LiteralStringRef("allowFaultInjection"), true );
|
||||
ignoreSSFailures = true;
|
||||
}
|
||||
|
||||
static vector<ISimulator::ProcessInfo*> getServers() {
|
||||
|
@ -121,7 +128,7 @@ struct MachineAttritionWorkload : TestWorkload {
|
|||
throw please_reboot();
|
||||
return Void();
|
||||
}
|
||||
virtual Future<bool> check( Database const& cx ) { return true; }
|
||||
virtual Future<bool> check( Database const& cx ) { return ignoreSSFailures; }
|
||||
virtual void getMetrics( vector<PerfMetric>& m ) {
|
||||
}
|
||||
|
||||
|
@ -185,7 +192,6 @@ struct MachineAttritionWorkload : TestWorkload {
|
|||
|
||||
// decide on a machine to kill
|
||||
state LocalityData targetMachine = self->machines.back();
|
||||
state Future<Void> resetHealthyZone = Future<Void>(Void());
|
||||
if(BUGGIFY_WITH_PROB(0.01)) {
|
||||
TEST(true); //Marked a zone for maintenance before killing it
|
||||
bool _ =
|
||||
|
@ -193,9 +199,7 @@ struct MachineAttritionWorkload : TestWorkload {
|
|||
// }
|
||||
} else if (BUGGIFY_WITH_PROB(0.005)) {
|
||||
TEST(true); // Disable DD for all storage server failures
|
||||
bool _ = wait(setHealthyZone(cx, ignoreSSFailuresZoneString,
|
||||
0)); // duration doesn't matter since this won't timeout
|
||||
resetHealthyZone = resetHealthyZoneAfter(cx, deterministicRandom()->random01() * 5);
|
||||
self->ignoreSSFailures = ignoreSSFailuresForDuration(cx, deterministicRandom()->random01() * 5);
|
||||
}
|
||||
|
||||
TraceEvent("Assassination").detail("TargetMachine", targetMachine.toString())
|
||||
|
@ -226,7 +230,7 @@ struct MachineAttritionWorkload : TestWorkload {
|
|||
if(!self->replacement)
|
||||
self->machines.pop_back();
|
||||
|
||||
wait(delay(meanDelay - delayBeforeKill) && resetHealthyZone);
|
||||
wait(delay(meanDelay - delayBeforeKill) && success(self->ignoreSSFailures));
|
||||
|
||||
delayBeforeKill = deterministicRandom()->random01() * meanDelay;
|
||||
TraceEvent("WorkerKillAfterMeanDelay").detail("DelayBeforeKill", delayBeforeKill);
|
||||
|
|
|
@ -122,7 +122,7 @@ public: // workload functions
|
|||
// read the key SnapFailedTLog.$UID
|
||||
loop {
|
||||
try {
|
||||
Standalone<StringRef> keyStr = snapTestFailStatus.withSuffix(StringRef(self->snapUID.toString()));
|
||||
Standalone<StringRef> keyStr = LiteralStringRef("\xff/SnapTestFailStatus/").withSuffix(StringRef(self->snapUID.toString()));
|
||||
TraceEvent("TestKeyStr").detail("Value", keyStr);
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
Optional<Value> val = wait(tr.get(keyStr));
|
||||
|
|
|
@ -68,6 +68,7 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
|
|||
init( RECONNECTION_TIME_GROWTH_RATE, 1.2 );
|
||||
init( RECONNECTION_RESET_TIME, 5.0 );
|
||||
init( CONNECTION_ACCEPT_DELAY, 0.01 );
|
||||
init( USE_OBJECT_SERIALIZER, 1 );
|
||||
init( TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY, 5.0 );
|
||||
init( TOO_MANY_CONNECTIONS_CLOSED_TIMEOUT, 20.0 );
|
||||
|
||||
|
|
|
@ -88,6 +88,7 @@ public:
|
|||
double RECONNECTION_TIME_GROWTH_RATE;
|
||||
double RECONNECTION_RESET_TIME;
|
||||
double CONNECTION_ACCEPT_DELAY;
|
||||
int USE_OBJECT_SERIALIZER;
|
||||
|
||||
int TLS_CERT_REFRESH_DELAY_SECONDS;
|
||||
|
||||
|
|
|
@ -111,7 +111,7 @@ thread_local INetwork* thread_network = 0;
|
|||
class Net2 sealed : public INetwork, public INetworkConnections {
|
||||
|
||||
public:
|
||||
Net2(bool useThreadPool, bool useMetrics, bool useObjectSerializer);
|
||||
Net2(bool useThreadPool, bool useMetrics);
|
||||
void run();
|
||||
void initMetrics();
|
||||
|
||||
|
@ -148,11 +148,9 @@ public:
|
|||
|
||||
virtual flowGlobalType global(int id) { return (globals.size() > id) ? globals[id] : NULL; }
|
||||
virtual void setGlobal(size_t id, flowGlobalType v) { globals.resize(std::max(globals.size(),id+1)); globals[id] = v; }
|
||||
virtual bool useObjectSerializer() const { return _useObjectSerializer; }
|
||||
std::vector<flowGlobalType> globals;
|
||||
|
||||
bool useThreadPool;
|
||||
bool _useObjectSerializer = false;
|
||||
//private:
|
||||
|
||||
ASIOReactor reactor;
|
||||
|
@ -483,9 +481,8 @@ struct PromiseTask : public Task, public FastAllocated<PromiseTask> {
|
|||
}
|
||||
};
|
||||
|
||||
Net2::Net2(bool useThreadPool, bool useMetrics, bool useObjectSerializer)
|
||||
Net2::Net2(bool useThreadPool, bool useMetrics)
|
||||
: useThreadPool(useThreadPool),
|
||||
_useObjectSerializer(useObjectSerializer),
|
||||
network(this),
|
||||
reactor(this),
|
||||
stopped(false),
|
||||
|
@ -1027,9 +1024,9 @@ void ASIOReactor::wake() {
|
|||
|
||||
} // namespace net2
|
||||
|
||||
INetwork* newNet2(bool useThreadPool, bool useMetrics, bool useObjectSerializer) {
|
||||
INetwork* newNet2(bool useThreadPool, bool useMetrics) {
|
||||
try {
|
||||
N2::g_net2 = new N2::Net2(useThreadPool, useMetrics, useObjectSerializer);
|
||||
N2::g_net2 = new N2::Net2(useThreadPool, useMetrics);
|
||||
}
|
||||
catch(boost::system::system_error e) {
|
||||
TraceEvent("Net2InitError").detail("Message", e.what());
|
||||
|
|
|
@ -101,7 +101,7 @@ From 6.1, `wait()` on `Void` actors shouldn't assign the resulting value. So, th
|
|||
|
||||
```c++
|
||||
Future<Void> asyncTask(); //defined elsewhere
|
||||
wait(asyncTask());
|
||||
Void _ = _wait(asyncTask());
|
||||
```
|
||||
|
||||
becomes
|
||||
|
|
|
@ -372,7 +372,7 @@ typedef NetworkAddressList (*NetworkAddressesFuncPtr)();
|
|||
|
||||
class INetwork;
|
||||
extern INetwork* g_network;
|
||||
extern INetwork* newNet2(bool useThreadPool = false, bool useMetrics = false, bool useObjectSerializer = false);
|
||||
extern INetwork* newNet2(bool useThreadPool = false, bool useMetrics = false);
|
||||
|
||||
class INetwork {
|
||||
public:
|
||||
|
@ -447,9 +447,6 @@ public:
|
|||
virtual bool isAddressOnThisHost( NetworkAddress const& addr ) = 0;
|
||||
// Returns true if it is reasonably certain that a connection to the given address would be a fast loopback connection
|
||||
|
||||
virtual bool useObjectSerializer() const = 0;
|
||||
// Whether or not the object serializer should be used when sending packets
|
||||
|
||||
// Shorthand for transport().getLocalAddress()
|
||||
static NetworkAddress getLocalAddress()
|
||||
{
|
||||
|
|
|
@ -279,9 +279,6 @@ def run_simulation_test(basedir, options):
|
|||
if options.testtype == 'test':
|
||||
pargs.append('-C')
|
||||
pargs.append(os.path.join(args.builddir, 'fdb.cluster'))
|
||||
else:
|
||||
pargs.append('-S')
|
||||
pargs.append('on' if seed % 2 == 0 else 'off')
|
||||
td = TestDirectory(basedir)
|
||||
if options.buggify:
|
||||
pargs.append('-b')
|
||||
|
|
Loading…
Reference in New Issue