Merge pull request #3016 from etschannen/master
Merge release 6.2 into master
This commit is contained in:
commit
3fb91e8f0c
|
@ -304,7 +304,7 @@ func (o DatabaseOptions) SetTransactionTimeout(param int64) error {
|
|||
return o.setOpt(500, int64ToBytes(param))
|
||||
}
|
||||
|
||||
// Set a timeout in milliseconds which, when elapsed, will cause a transaction automatically to be cancelled. This sets the ``retry_limit`` option of each transaction created by this database. See the transaction option description for more information.
|
||||
// Set a maximum number of retries after which additional calls to ``onError`` will throw the most recently seen error code. This sets the ``retry_limit`` option of each transaction created by this database. See the transaction option description for more information.
|
||||
//
|
||||
// Parameter: number of times to retry
|
||||
func (o DatabaseOptions) SetTransactionRetryLimit(param int64) error {
|
||||
|
@ -330,7 +330,7 @@ func (o DatabaseOptions) SetTransactionCausalReadRisky() error {
|
|||
return o.setOpt(504, nil)
|
||||
}
|
||||
|
||||
// Addresses returned by get_addresses_for_key include the port when enabled. As of api version 700, this option is enabled by default and setting this has no effect.
|
||||
// Addresses returned by get_addresses_for_key include the port when enabled. As of api version 630, this option is enabled by default and setting this has no effect.
|
||||
func (o DatabaseOptions) SetTransactionIncludePortInAddress() error {
|
||||
return o.setOpt(505, nil)
|
||||
}
|
||||
|
@ -350,7 +350,7 @@ func (o TransactionOptions) SetCausalReadDisable() error {
|
|||
return o.setOpt(21, nil)
|
||||
}
|
||||
|
||||
// Addresses returned by get_addresses_for_key include the port when enabled. As of api version 700, this option is enabled by default and setting this has no effect.
|
||||
// Addresses returned by get_addresses_for_key include the port when enabled. As of api version 630, this option is enabled by default and setting this has no effect.
|
||||
func (o TransactionOptions) SetIncludePortInAddress() error {
|
||||
return o.setOpt(23, nil)
|
||||
}
|
||||
|
@ -429,7 +429,7 @@ func (o TransactionOptions) SetDebugTransactionIdentifier(param string) error {
|
|||
return o.setOpt(403, []byte(param))
|
||||
}
|
||||
|
||||
// Enables tracing for this transaction and logs results to the client trace logs. The DEBUG_TRANSACTION_IDENTIFIER option must be set before using this option, and client trace logging must be enabled and to get log output.
|
||||
// Enables tracing for this transaction and logs results to the client trace logs. The DEBUG_TRANSACTION_IDENTIFIER option must be set before using this option, and client trace logging must be enabled to get log output.
|
||||
func (o TransactionOptions) SetLogTransaction() error {
|
||||
return o.setOpt(404, nil)
|
||||
}
|
||||
|
@ -479,7 +479,7 @@ func (o TransactionOptions) SetSnapshotRywDisable() error {
|
|||
return o.setOpt(601, nil)
|
||||
}
|
||||
|
||||
// The transaction can read and write to locked databases, and is resposible for checking that it took the lock.
|
||||
// The transaction can read and write to locked databases, and is responsible for checking that it took the lock.
|
||||
func (o TransactionOptions) SetLockAware() error {
|
||||
return o.setOpt(700, nil)
|
||||
}
|
||||
|
|
|
@ -410,6 +410,9 @@ func (t *transaction) getApproximateSize() FutureInt64 {
|
|||
}
|
||||
}
|
||||
|
||||
// Returns a future that is the approximate transaction size so far in this
|
||||
// transaction, which is the summation of the estimated size of mutations,
|
||||
// read conflict ranges, and write conflict ranges.
|
||||
func (t Transaction) GetApproximateSize() FutureInt64 {
|
||||
return t.getApproximateSize()
|
||||
}
|
||||
|
|
|
@ -0,0 +1,207 @@
|
|||
|
||||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
APPENDIX: How to apply the Apache License to your work.
|
||||
|
||||
To apply the Apache License to your work, attach the following
|
||||
boilerplate notice, with the fields enclosed by brackets "[]"
|
||||
replaced with your own identifying information. (Don't include
|
||||
the brackets!) The text should be enclosed in the appropriate
|
||||
comment syntax for the file format. We also recommend that a
|
||||
file or class name and description of purpose be included on the
|
||||
same "printed page" as the copyright notice for easier
|
||||
identification within third-party archives.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
SOFTWARE DISTRIBUTED WITH FOUNDATIONDB:
|
||||
|
||||
The FoundationDB software includes a number of subcomponents with separate
|
||||
copyright notices and license terms - please see the file ACKNOWLEDGEMENTS.
|
||||
-------------------------------------------------------------------------------
|
|
@ -176,6 +176,9 @@
|
|||
.. |transaction-get-committed-version-blurb| replace::
|
||||
Gets the version number at which a successful commit modified the database. This must be called only after the successful (non-error) completion of a call to |commit-func| on this Transaction, or the behavior is undefined. Read-only transactions do not modify the database when committed and will have a committed version of -1. Keep in mind that a transaction which reads keys and then sets them to their current values may be optimized to a read-only transaction.
|
||||
|
||||
.. |transaction-get-approximate-size-blurb| replace::
|
||||
Gets the the approximate transaction size so far, which is the summation of the estimated size of mutations, read conflict ranges, and write conflict ranges.
|
||||
|
||||
.. |transaction-get-versionstamp-blurb| replace::
|
||||
Returns a future which will contain the versionstamp which was used by any versionstamp operations in this transaction. This function must be called before a call to |commit-func| on this Transaction. The future will be ready only after the successful completion of a call to |commit-func| on this Transaction. Read-only transactions do not modify the database when committed and will result in the future completing with an error. Keep in mind that a transaction which reads keys and then sets them to their current values may be optimized to a read-only transaction.
|
||||
|
||||
|
|
|
@ -805,6 +805,13 @@ Transaction misc functions
|
|||
|
||||
.. _api-python-transaction-options:
|
||||
|
||||
Transaction misc functions
|
||||
--------------------------
|
||||
|
||||
.. method:: Transaction.get_approximate_size()
|
||||
|
||||
|transaction-get-approximate-size-blurb|. Returns a :class:`FutureInt64`.
|
||||
|
||||
Transaction options
|
||||
-------------------
|
||||
|
||||
|
|
|
@ -736,7 +736,7 @@ Most applications should use the read version that FoundationDB determines autom
|
|||
|
||||
|infrequent| |transaction-get-committed-version-blurb|
|
||||
|
||||
.. method:: Transaction.get_verionstamp() -> String
|
||||
.. method:: Transaction.get_versionstamp() -> String
|
||||
|
||||
|infrequent| |transaction-get-versionstamp-blurb|
|
||||
|
||||
|
@ -747,6 +747,10 @@ Transaction misc functions
|
|||
|
||||
Get the estimated byte size of the given key range. Returns a :class:`Int64Future`.
|
||||
|
||||
.. method:: Transaction.get_approximate_size() -> Int64Future
|
||||
|
||||
|transaction-get-approximate-size-blurb|. Returns a :class:`Int64Future`.
|
||||
|
||||
Transaction options
|
||||
-------------------
|
||||
|
||||
|
|
|
@ -10,38 +10,38 @@ macOS
|
|||
|
||||
The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server.
|
||||
|
||||
* `FoundationDB-6.2.19.pkg <https://www.foundationdb.org/downloads/6.2.19/macOS/installers/FoundationDB-6.2.19.pkg>`_
|
||||
* `FoundationDB-6.2.20.pkg <https://www.foundationdb.org/downloads/6.2.20/macOS/installers/FoundationDB-6.2.20.pkg>`_
|
||||
|
||||
Ubuntu
|
||||
------
|
||||
|
||||
The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x.
|
||||
|
||||
* `foundationdb-clients-6.2.19-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.19/ubuntu/installers/foundationdb-clients_6.2.19-1_amd64.deb>`_
|
||||
* `foundationdb-server-6.2.19-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.19/ubuntu/installers/foundationdb-server_6.2.19-1_amd64.deb>`_ (depends on the clients package)
|
||||
* `foundationdb-clients-6.2.20-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.20/ubuntu/installers/foundationdb-clients_6.2.20-1_amd64.deb>`_
|
||||
* `foundationdb-server-6.2.20-1_amd64.deb <https://www.foundationdb.org/downloads/6.2.20/ubuntu/installers/foundationdb-server_6.2.20-1_amd64.deb>`_ (depends on the clients package)
|
||||
|
||||
RHEL/CentOS EL6
|
||||
---------------
|
||||
|
||||
The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x.
|
||||
|
||||
* `foundationdb-clients-6.2.19-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.19/rhel6/installers/foundationdb-clients-6.2.19-1.el6.x86_64.rpm>`_
|
||||
* `foundationdb-server-6.2.19-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.19/rhel6/installers/foundationdb-server-6.2.19-1.el6.x86_64.rpm>`_ (depends on the clients package)
|
||||
* `foundationdb-clients-6.2.20-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.20/rhel6/installers/foundationdb-clients-6.2.20-1.el6.x86_64.rpm>`_
|
||||
* `foundationdb-server-6.2.20-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.20/rhel6/installers/foundationdb-server-6.2.20-1.el6.x86_64.rpm>`_ (depends on the clients package)
|
||||
|
||||
RHEL/CentOS EL7
|
||||
---------------
|
||||
|
||||
The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x.
|
||||
|
||||
* `foundationdb-clients-6.2.19-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.19/rhel7/installers/foundationdb-clients-6.2.19-1.el7.x86_64.rpm>`_
|
||||
* `foundationdb-server-6.2.19-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.19/rhel7/installers/foundationdb-server-6.2.19-1.el7.x86_64.rpm>`_ (depends on the clients package)
|
||||
* `foundationdb-clients-6.2.20-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.20/rhel7/installers/foundationdb-clients-6.2.20-1.el7.x86_64.rpm>`_
|
||||
* `foundationdb-server-6.2.20-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.2.20/rhel7/installers/foundationdb-server-6.2.20-1.el7.x86_64.rpm>`_ (depends on the clients package)
|
||||
|
||||
Windows
|
||||
-------
|
||||
|
||||
The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server.
|
||||
|
||||
* `foundationdb-6.2.19-x64.msi <https://www.foundationdb.org/downloads/6.2.19/windows/installers/foundationdb-6.2.19-x64.msi>`_
|
||||
* `foundationdb-6.2.20-x64.msi <https://www.foundationdb.org/downloads/6.2.20/windows/installers/foundationdb-6.2.20-x64.msi>`_
|
||||
|
||||
API Language Bindings
|
||||
=====================
|
||||
|
@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part
|
|||
|
||||
If you need to use the FoundationDB Python API from other Python installations or paths, use the Python package manager ``pip`` (``pip install foundationdb``) or download the Python package:
|
||||
|
||||
* `foundationdb-6.2.19.tar.gz <https://www.foundationdb.org/downloads/6.2.19/bindings/python/foundationdb-6.2.19.tar.gz>`_
|
||||
* `foundationdb-6.2.20.tar.gz <https://www.foundationdb.org/downloads/6.2.20/bindings/python/foundationdb-6.2.20.tar.gz>`_
|
||||
|
||||
Ruby 1.9.3/2.0.0+
|
||||
-----------------
|
||||
|
||||
* `fdb-6.2.19.gem <https://www.foundationdb.org/downloads/6.2.19/bindings/ruby/fdb-6.2.19.gem>`_
|
||||
* `fdb-6.2.20.gem <https://www.foundationdb.org/downloads/6.2.20/bindings/ruby/fdb-6.2.20.gem>`_
|
||||
|
||||
Java 8+
|
||||
-------
|
||||
|
||||
* `fdb-java-6.2.19.jar <https://www.foundationdb.org/downloads/6.2.19/bindings/java/fdb-java-6.2.19.jar>`_
|
||||
* `fdb-java-6.2.19-javadoc.jar <https://www.foundationdb.org/downloads/6.2.19/bindings/java/fdb-java-6.2.19-javadoc.jar>`_
|
||||
* `fdb-java-6.2.20.jar <https://www.foundationdb.org/downloads/6.2.20/bindings/java/fdb-java-6.2.20.jar>`_
|
||||
* `fdb-java-6.2.20-javadoc.jar <https://www.foundationdb.org/downloads/6.2.20/bindings/java/fdb-java-6.2.20-javadoc.jar>`_
|
||||
|
||||
Go 1.11+
|
||||
--------
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
Release Notes
|
||||
#############
|
||||
|
||||
7.0.0
|
||||
6.3.0
|
||||
=====
|
||||
|
||||
Features
|
||||
|
@ -31,7 +31,6 @@ Bindings
|
|||
* Go: Added a ``Close`` function to ``RangeIterator`` which **must** be called to free resources returned from ``Transaction.GetRange``. `(PR #1910) <https://github.com/apple/foundationdb/pull/1910>`_.
|
||||
* Go: Finalizers are no longer used to clean up native resources. ``Future`` results are now copied from the native heap to the Go heap, and native resources are freed immediately. `(PR #1910) <https://github.com/apple/foundationdb/pull/1910>`_.
|
||||
|
||||
|
||||
Other Changes
|
||||
-------------
|
||||
* Double the number of shard locations that the client will cache locally. `(PR #2198) <https://github.com/apple/foundationdb/pull/2198>`_
|
||||
|
|
|
@ -284,6 +284,7 @@ struct KeyRangeRef {
|
|||
force_inline void serialize(Ar& ar) {
|
||||
serializer(ar, const_cast<KeyRef&>(begin), const_cast<KeyRef&>(end));
|
||||
if( begin > end ) {
|
||||
TraceEvent("InvertedRange").detail("Begin", begin).detail("End", end);
|
||||
throw inverted_range();
|
||||
};
|
||||
}
|
||||
|
|
|
@ -76,16 +76,7 @@ TLSConfig tlsConfig(TLSEndpointType::CLIENT);
|
|||
NetworkOptions::NetworkOptions()
|
||||
: localAddress(""), clusterFile(""), traceDirectory(Optional<std::string>()),
|
||||
traceRollSize(TRACE_DEFAULT_ROLL_SIZE), traceMaxLogsSize(TRACE_DEFAULT_MAX_LOGS_SIZE), traceLogGroup("default"),
|
||||
traceFormat("xml"), traceClockSource("now"), runLoopProfilingEnabled(false) {
|
||||
|
||||
Standalone<VectorRef<ClientVersionRef>> defaultSupportedVersions;
|
||||
|
||||
StringRef sourceVersion = StringRef((const uint8_t*)getSourceVersion(), strlen(getSourceVersion()));
|
||||
std::string protocolVersionString = format("%llx", currentProtocolVersion.version());
|
||||
defaultSupportedVersions.push_back_deep(defaultSupportedVersions.arena(), ClientVersionRef(LiteralStringRef(FDB_VT_VERSION), sourceVersion, protocolVersionString));
|
||||
|
||||
supportedVersions = ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>::from(defaultSupportedVersions);
|
||||
}
|
||||
traceFormat("xml"), traceClockSource("now"), runLoopProfilingEnabled(false), supportedVersions(new ReferencedObject<Standalone<VectorRef<ClientVersionRef>>>()) {}
|
||||
|
||||
static const Key CLIENT_LATENCY_INFO_PREFIX = LiteralStringRef("client_latency/");
|
||||
static const Key CLIENT_LATENCY_INFO_CTR_PREFIX = LiteralStringRef("client_latency_counter/");
|
||||
|
@ -1045,7 +1036,10 @@ void setupNetwork(uint64_t transportId, bool useMetrics) {
|
|||
if (!networkOptions.logClientInfo.present())
|
||||
networkOptions.logClientInfo = true;
|
||||
|
||||
TLS::DisableOpenSSLAtExitHandler();
|
||||
g_network = newNet2(tlsConfig, false, useMetrics || networkOptions.traceDirectory.present());
|
||||
g_network->addStopCallback( Net2FileSystem::stop );
|
||||
g_network->addStopCallback( TLS::DestroyOpenSSLGlobalState );
|
||||
FlowTransport::createInstance(true, transportId);
|
||||
Net2FileSystem::newFileSystem();
|
||||
}
|
||||
|
|
|
@ -52,6 +52,10 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
static void stop() {
|
||||
eio_set_max_parallel(0);
|
||||
}
|
||||
|
||||
static bool should_poll() { return want_poll; }
|
||||
|
||||
static bool lock_fd( int fd ) {
|
||||
|
|
|
@ -39,6 +39,8 @@ class AsyncFileWinASIO : public IAsyncFile, public ReferenceCounted<AsyncFileWin
|
|||
public:
|
||||
static void init() {}
|
||||
|
||||
static void stop() {}
|
||||
|
||||
static bool should_poll() { return false; }
|
||||
// FIXME: This implementation isn't actually asynchronous - it just does operations synchronously!
|
||||
|
||||
|
|
|
@ -236,6 +236,7 @@ struct YieldMockNetwork : INetwork, ReferenceCounted<YieldMockNetwork> {
|
|||
virtual double now() { return baseNetwork->now(); }
|
||||
virtual double timer() { return baseNetwork->timer(); }
|
||||
virtual void stop() { return baseNetwork->stop(); }
|
||||
virtual void addStopCallback( std::function<void()> fn ) { ASSERT(false); return; }
|
||||
virtual bool isSimulated() const { return baseNetwork->isSimulated(); }
|
||||
virtual void onMainThread(Promise<Void>&& signal, TaskPriority taskID) { return baseNetwork->onMainThread(std::move(signal), taskID); }
|
||||
bool isOnMainThread() const override { return baseNetwork->isOnMainThread(); }
|
||||
|
|
|
@ -295,7 +295,7 @@ static ReliablePacket* sendPacket( TransportData* self, Reference<Peer> peer, IS
|
|||
ACTOR Future<Void> connectionMonitor( Reference<Peer> peer ) {
|
||||
state Endpoint remotePingEndpoint({ peer->destination }, WLTOKEN_PING_PACKET);
|
||||
loop {
|
||||
if (!FlowTransport::transport().isClient() && !peer->destination.isPublic() && peer->compatible) {
|
||||
if (!FlowTransport::isClient() && !peer->destination.isPublic() && peer->compatible) {
|
||||
// Don't send ping messages to clients unless necessary. Instead monitor incoming client pings.
|
||||
// We ignore this block for incompatible clients because pings from server would trigger the
|
||||
// peer->resetPing and prevent 'connection_failed' due to ping timeout.
|
||||
|
@ -324,7 +324,7 @@ ACTOR Future<Void> connectionMonitor( Reference<Peer> peer ) {
|
|||
(peer->lastDataPacketSentTime < now() - FLOW_KNOBS->CONNECTION_MONITOR_UNREFERENCED_CLOSE_DELAY)) {
|
||||
// TODO: What about when peerReference == -1?
|
||||
throw connection_unreferenced();
|
||||
} else if (FlowTransport::transport().isClient() && peer->compatible && peer->destination.isPublic() &&
|
||||
} else if (FlowTransport::isClient() && peer->compatible && peer->destination.isPublic() &&
|
||||
(peer->lastConnectTime < now() - FLOW_KNOBS->CONNECTION_MONITOR_IDLE_TIMEOUT) &&
|
||||
(peer->lastDataPacketSentTime < now() - FLOW_KNOBS->CONNECTION_MONITOR_IDLE_TIMEOUT)) {
|
||||
// First condition is necessary because we may get here if we are server.
|
||||
|
@ -521,7 +521,7 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
|
|||
|
||||
if(self->destination.isPublic()
|
||||
&& IFailureMonitor::failureMonitor().getState(self->destination).isAvailable()
|
||||
&& !FlowTransport::transport().isClient())
|
||||
&& !FlowTransport::isClient())
|
||||
{
|
||||
auto& it = self->transport->closedPeers[self->destination];
|
||||
if(now() - it.second > FLOW_KNOBS->TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY) {
|
||||
|
@ -662,6 +662,9 @@ ACTOR static void deliver(TransportData* self, Endpoint destination, ArenaReader
|
|||
} catch (Error& e) {
|
||||
g_currentDeliveryPeerAddress = {NetworkAddress()};
|
||||
TraceEvent(SevError, "ReceiverError").error(e).detail("Token", destination.token.toString()).detail("Peer", destination.getPrimaryAddress());
|
||||
if(!FlowTransport::isClient()) {
|
||||
flushAndExit(FDB_EXIT_ERROR);
|
||||
}
|
||||
throw;
|
||||
}
|
||||
} else if (destination.token.first() & TOKEN_STREAM_FLAG) {
|
||||
|
|
|
@ -115,3 +115,7 @@ Net2FileSystem::Net2FileSystem(double ioTimeout, std::string fileSystemPath)
|
|||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void Net2FileSystem::stop() {
|
||||
Net2AsyncFile::stop();
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ public:
|
|||
virtual Future< std::time_t > lastWriteTime( std::string filename );
|
||||
|
||||
//void init();
|
||||
static void stop();
|
||||
|
||||
Net2FileSystem(double ioTimeout=0.0, std::string fileSystemPath = "");
|
||||
|
||||
|
|
|
@ -871,7 +871,12 @@ public:
|
|||
return emptyConfig;
|
||||
}
|
||||
|
||||
virtual void stop() { isStopped = true; }
|
||||
virtual void stop() {
|
||||
isStopped = true;
|
||||
}
|
||||
virtual void addStopCallback( std::function<void()> fn ) {
|
||||
stopCallbacks.emplace_back(std::move(fn));
|
||||
}
|
||||
virtual bool isSimulated() const { return true; }
|
||||
|
||||
struct SimThreadArgs {
|
||||
|
@ -995,6 +1000,9 @@ public:
|
|||
}
|
||||
self->currentProcess = callingMachine;
|
||||
self->net2->stop();
|
||||
for ( auto& fn : self->stopCallbacks ) {
|
||||
fn();
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
|
@ -1615,6 +1623,7 @@ public:
|
|||
// Not letting currentProcess be NULL eliminates some annoying special cases
|
||||
currentProcess = new ProcessInfo("NoMachine", LocalityData(Optional<Standalone<StringRef>>(), StringRef(), StringRef(), StringRef()), ProcessClass(), {NetworkAddress()}, this, "", "");
|
||||
g_network = net2 = newNet2(TLSConfig(), false, true);
|
||||
g_network->addStopCallback( Net2FileSystem::stop );
|
||||
Net2FileSystem::newFileSystem();
|
||||
check_yield(TaskPriority::Zero);
|
||||
}
|
||||
|
@ -1713,6 +1722,8 @@ public:
|
|||
//tasks is guarded by ISimulator::mutex
|
||||
std::priority_queue<Task, std::vector<Task>> tasks;
|
||||
|
||||
std::vector<std::function<void()>> stopCallbacks;
|
||||
|
||||
//Sim2Net network;
|
||||
INetwork *net2;
|
||||
|
||||
|
|
|
@ -216,7 +216,9 @@ ACTOR Future<Void> openDatabase(ClientData* db, int* clientCount, Reference<Asyn
|
|||
++(*clientCount);
|
||||
hasConnectedClients->set(true);
|
||||
|
||||
db->clientStatusInfoMap[req.reply.getEndpoint().getPrimaryAddress()] = ClientStatusInfo(req.traceLogGroup, req.supportedVersions, req.issues);
|
||||
if(req.supportedVersions.size() > 0) {
|
||||
db->clientStatusInfoMap[req.reply.getEndpoint().getPrimaryAddress()] = ClientStatusInfo(req.traceLogGroup, req.supportedVersions, req.issues);
|
||||
}
|
||||
|
||||
while (db->clientInfo->get().read().id == req.knownClientInfoID && !db->clientInfo->get().read().forward.present()) {
|
||||
choose {
|
||||
|
@ -225,7 +227,9 @@ ACTOR Future<Void> openDatabase(ClientData* db, int* clientCount, Reference<Asyn
|
|||
}
|
||||
}
|
||||
|
||||
db->clientStatusInfoMap.erase(req.reply.getEndpoint().getPrimaryAddress());
|
||||
if(req.supportedVersions.size() > 0) {
|
||||
db->clientStatusInfoMap.erase(req.reply.getEndpoint().getPrimaryAddress());
|
||||
}
|
||||
|
||||
req.reply.send( db->clientInfo->get() );
|
||||
|
||||
|
|
|
@ -88,6 +88,8 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
init( TLOG_IGNORE_POP_AUTO_ENABLE_DELAY, 300.0 );
|
||||
init( TXS_POPPED_MAX_DELAY, 1.0 ); if ( randomize && BUGGIFY ) TXS_POPPED_MAX_DELAY = deterministicRandom()->random01();
|
||||
init( TLOG_MAX_CREATE_DURATION, 10.0 );
|
||||
init( PEEK_LOGGING_AMOUNT, 5 );
|
||||
init( PEEK_LOGGING_DELAY, 5.0 );
|
||||
|
||||
// disk snapshot max timeout, to be put in TLog, storage and coordinator nodes
|
||||
init( SNAP_CREATE_MAX_TIMEOUT, 300.0 );
|
||||
|
|
|
@ -86,6 +86,8 @@ public:
|
|||
int64_t MAX_CACHE_VERSIONS;
|
||||
double TXS_POPPED_MAX_DELAY;
|
||||
double TLOG_MAX_CREATE_DURATION;
|
||||
int PEEK_LOGGING_AMOUNT;
|
||||
double PEEK_LOGGING_DELAY;
|
||||
|
||||
// Data distribution queue
|
||||
double HEALTH_POLL_TIME;
|
||||
|
|
|
@ -385,6 +385,43 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
struct PeekTrackerData {
|
||||
std::map<int, Promise<std::pair<Version, bool>>> sequence_version;
|
||||
double lastUpdate;
|
||||
|
||||
Tag tag;
|
||||
|
||||
double lastLogged;
|
||||
int64_t totalPeeks;
|
||||
int64_t replyBytes;
|
||||
int64_t duplicatePeeks;
|
||||
double queueTime;
|
||||
double queueMax;
|
||||
double blockTime;
|
||||
double blockMax;
|
||||
double workTime;
|
||||
double workMax;
|
||||
|
||||
int64_t unblockedPeeks;
|
||||
double idleTime;
|
||||
double idleMax;
|
||||
|
||||
PeekTrackerData() : lastUpdate(0) {
|
||||
resetMetrics();
|
||||
}
|
||||
|
||||
void resetMetrics() {
|
||||
lastLogged = now();
|
||||
totalPeeks = 0;
|
||||
replyBytes = 0;
|
||||
duplicatePeeks = 0;
|
||||
queueTime = 0;
|
||||
queueMax = 0;
|
||||
blockTime = 0;
|
||||
blockMax = 0;
|
||||
workTime = 0;
|
||||
workMax = 0;
|
||||
unblockedPeeks = 0;
|
||||
idleTime = 0;
|
||||
idleMax = 0;
|
||||
}
|
||||
};
|
||||
|
||||
std::map<UID, PeekTrackerData> peekTracker;
|
||||
|
@ -1049,6 +1086,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
state BinaryWriter messages2(Unversioned());
|
||||
state int sequence = -1;
|
||||
state UID peekId;
|
||||
state double queueStart = now();
|
||||
|
||||
if(req.sequence.present()) {
|
||||
try {
|
||||
|
@ -1059,6 +1097,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
}
|
||||
auto& trackerData = logData->peekTracker[peekId];
|
||||
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
|
||||
trackerData.tag = req.tag;
|
||||
trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled));
|
||||
}
|
||||
auto seqBegin = trackerData.sequence_version.begin();
|
||||
|
@ -1074,8 +1113,16 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
throw operation_obsolete();
|
||||
}
|
||||
|
||||
Future<std::pair<Version, bool>> fPrevPeekData = trackerData.sequence_version[sequence].getFuture();
|
||||
if(fPrevPeekData.isReady()) {
|
||||
trackerData.unblockedPeeks++;
|
||||
double t = now() - trackerData.lastUpdate;
|
||||
if(t > trackerData.idleMax) trackerData.idleMax = t;
|
||||
trackerData.idleTime += t;
|
||||
}
|
||||
trackerData.lastUpdate = now();
|
||||
std::pair<Version, bool> prevPeekData = wait(trackerData.sequence_version[sequence].getFuture());
|
||||
std::pair<Version, bool> prevPeekData = wait(fPrevPeekData);
|
||||
|
||||
req.begin = std::max(prevPeekData.first, req.begin);
|
||||
req.onlySpilled = prevPeekData.second;
|
||||
wait(yield());
|
||||
|
@ -1089,6 +1136,8 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
}
|
||||
}
|
||||
|
||||
state double blockStart = now();
|
||||
|
||||
if( req.returnIfBlocked && logData->version.get() < req.begin ) {
|
||||
req.reply.sendError(end_of_stream());
|
||||
if(req.sequence.present()) {
|
||||
|
@ -1123,6 +1172,8 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
wait(delay(0, TaskPriority::TLogSpilledPeekReply));
|
||||
}
|
||||
|
||||
state double workStart = now();
|
||||
|
||||
Version poppedVer = poppedVersion(logData, req.tag);
|
||||
if(poppedVer > req.begin) {
|
||||
TLogPeekReply rep;
|
||||
|
@ -1211,6 +1262,22 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
if(req.sequence.present()) {
|
||||
auto& trackerData = logData->peekTracker[peekId];
|
||||
trackerData.lastUpdate = now();
|
||||
|
||||
double queueT = blockStart-queueStart;
|
||||
double blockT = workStart-blockStart;
|
||||
double workT = now()-workStart;
|
||||
|
||||
trackerData.totalPeeks++;
|
||||
trackerData.replyBytes += reply.messages.size();
|
||||
|
||||
if(queueT > trackerData.queueMax) trackerData.queueMax = queueT;
|
||||
if(blockT > trackerData.blockMax) trackerData.blockMax = blockT;
|
||||
if(workT > trackerData.workMax) trackerData.workMax = workT;
|
||||
|
||||
trackerData.queueTime += queueT;
|
||||
trackerData.blockTime += blockT;
|
||||
trackerData.workTime += workT;
|
||||
|
||||
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
||||
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
|
||||
req.reply.sendError(operation_obsolete());
|
||||
|
@ -1219,6 +1286,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
return Void();
|
||||
}
|
||||
if(sequenceData.isSet()) {
|
||||
trackerData.duplicatePeeks++;
|
||||
if(sequenceData.getFuture().get().first != reply.end) {
|
||||
TEST(true); //tlog peek second attempt ended at a different version
|
||||
req.reply.sendError(operation_obsolete());
|
||||
|
@ -1542,6 +1610,47 @@ ACTOR Future<Void> cleanupPeekTrackers( LogData* logData ) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> logPeekTrackers( LogData* logData ) {
|
||||
loop {
|
||||
int64_t logThreshold = 1;
|
||||
if(logData->peekTracker.size() > SERVER_KNOBS->PEEK_LOGGING_AMOUNT) {
|
||||
std::vector<int64_t> peekCounts;
|
||||
peekCounts.reserve(logData->peekTracker.size());
|
||||
for( auto& it : logData->peekTracker ) {
|
||||
peekCounts.push_back(it.second.totalPeeks);
|
||||
}
|
||||
size_t pivot = peekCounts.size()-SERVER_KNOBS->PEEK_LOGGING_AMOUNT;
|
||||
std::nth_element(peekCounts.begin(), peekCounts.begin()+pivot, peekCounts.end());
|
||||
logThreshold = std::max<int64_t>(1,peekCounts[pivot]);
|
||||
}
|
||||
int logCount = 0;
|
||||
for( auto& it : logData->peekTracker ) {
|
||||
if(it.second.totalPeeks >= logThreshold) {
|
||||
logCount++;
|
||||
TraceEvent("PeekMetrics", logData->logId)
|
||||
.detail("Tag", it.second.tag.toString())
|
||||
.detail("Elapsed", now() - it.second.lastLogged)
|
||||
.detail("MeanReplyBytes", it.second.replyBytes/it.second.totalPeeks)
|
||||
.detail("TotalPeeks", it.second.totalPeeks)
|
||||
.detail("UnblockedPeeks", it.second.unblockedPeeks)
|
||||
.detail("DuplicatePeeks", it.second.duplicatePeeks)
|
||||
.detail("Sequence", it.second.sequence_version.size() ? it.second.sequence_version.begin()->first : -1)
|
||||
.detail("IdleSeconds", it.second.idleTime)
|
||||
.detail("IdleMax", it.second.idleMax)
|
||||
.detail("QueueSeconds", it.second.queueTime)
|
||||
.detail("QueueMax", it.second.queueMax)
|
||||
.detail("BlockSeconds", it.second.blockTime)
|
||||
.detail("BlockMax", it.second.blockMax)
|
||||
.detail("WorkSeconds", it.second.workTime)
|
||||
.detail("WorkMax", it.second.workMax);
|
||||
it.second.resetMetrics();
|
||||
}
|
||||
}
|
||||
|
||||
wait( delay(SERVER_KNOBS->PEEK_LOGGING_DELAY * std::max(1,logCount)) );
|
||||
}
|
||||
}
|
||||
|
||||
void getQueuingMetrics( TLogData* self, Reference<LogData> logData, TLogQueuingMetricsRequest const& req ) {
|
||||
TLogQueuingMetricsReply reply;
|
||||
reply.localTime = now();
|
||||
|
@ -1880,6 +1989,7 @@ ACTOR Future<Void> tLogCore( TLogData* self, Reference<LogData> logData, TLogInt
|
|||
logData->addActor.send( traceCounters("TLogMetrics", logData->logId, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &logData->cc, logData->logId.toString() + "/TLogMetrics"));
|
||||
logData->addActor.send( serveTLogInterface(self, tli, logData, warningCollectorInput) );
|
||||
logData->addActor.send( cleanupPeekTrackers(logData.getPtr()) );
|
||||
logData->addActor.send( logPeekTrackers(logData.getPtr()) );
|
||||
|
||||
if(!logData->isPrimary) {
|
||||
std::vector<Tag> tags;
|
||||
|
|
|
@ -495,6 +495,44 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
struct PeekTrackerData {
|
||||
std::map<int, Promise<std::pair<Version, bool>>> sequence_version;
|
||||
double lastUpdate;
|
||||
|
||||
Tag tag;
|
||||
|
||||
double lastLogged;
|
||||
int64_t totalPeeks;
|
||||
int64_t replyBytes;
|
||||
int64_t duplicatePeeks;
|
||||
double queueTime;
|
||||
double queueMax;
|
||||
double blockTime;
|
||||
double blockMax;
|
||||
double workTime;
|
||||
double workMax;
|
||||
|
||||
int64_t unblockedPeeks;
|
||||
double idleTime;
|
||||
double idleMax;
|
||||
|
||||
PeekTrackerData() : lastUpdate(0) {
|
||||
resetMetrics();
|
||||
}
|
||||
|
||||
void resetMetrics() {
|
||||
lastLogged = now();
|
||||
totalPeeks = 0;
|
||||
replyBytes = 0;
|
||||
duplicatePeeks = 0;
|
||||
queueTime = 0;
|
||||
queueMax = 0;
|
||||
blockTime = 0;
|
||||
blockMax = 0;
|
||||
workTime = 0;
|
||||
workMax = 0;
|
||||
unblockedPeeks = 0;
|
||||
idleTime = 0;
|
||||
idleMax = 0;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
std::map<UID, PeekTrackerData> peekTracker;
|
||||
|
@ -1352,6 +1390,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
state BinaryWriter messages2(Unversioned());
|
||||
state int sequence = -1;
|
||||
state UID peekId;
|
||||
state double queueStart = now();
|
||||
|
||||
if(req.sequence.present()) {
|
||||
try {
|
||||
|
@ -1362,6 +1401,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
}
|
||||
auto& trackerData = logData->peekTracker[peekId];
|
||||
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
|
||||
trackerData.tag = req.tag;
|
||||
trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled));
|
||||
}
|
||||
auto seqBegin = trackerData.sequence_version.begin();
|
||||
|
@ -1378,8 +1418,15 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
throw operation_obsolete();
|
||||
}
|
||||
|
||||
Future<std::pair<Version, bool>> fPrevPeekData = trackerData.sequence_version[sequence].getFuture();
|
||||
if(fPrevPeekData.isReady()) {
|
||||
trackerData.unblockedPeeks++;
|
||||
double t = now() - trackerData.lastUpdate;
|
||||
if(t > trackerData.idleMax) trackerData.idleMax = t;
|
||||
trackerData.idleTime += t;
|
||||
}
|
||||
trackerData.lastUpdate = now();
|
||||
std::pair<Version, bool> prevPeekData = wait(trackerData.sequence_version[sequence].getFuture());
|
||||
std::pair<Version, bool> prevPeekData = wait(fPrevPeekData);
|
||||
req.begin = std::max(prevPeekData.first, req.begin);
|
||||
req.onlySpilled = prevPeekData.second;
|
||||
wait(yield());
|
||||
|
@ -1393,6 +1440,8 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
}
|
||||
}
|
||||
|
||||
state double blockStart = now();
|
||||
|
||||
if( req.returnIfBlocked && logData->version.get() < req.begin ) {
|
||||
req.reply.sendError(end_of_stream());
|
||||
if(req.sequence.present()) {
|
||||
|
@ -1427,6 +1476,8 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
wait(delay(0, TaskPriority::TLogSpilledPeekReply));
|
||||
}
|
||||
|
||||
state double workStart = now();
|
||||
|
||||
Version poppedVer = poppedVersion(logData, req.tag);
|
||||
if(poppedVer > req.begin) {
|
||||
TLogPeekReply rep;
|
||||
|
@ -1603,6 +1654,22 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
if(req.sequence.present()) {
|
||||
auto& trackerData = logData->peekTracker[peekId];
|
||||
trackerData.lastUpdate = now();
|
||||
|
||||
double queueT = blockStart-queueStart;
|
||||
double blockT = workStart-blockStart;
|
||||
double workT = now()-workStart;
|
||||
|
||||
trackerData.totalPeeks++;
|
||||
trackerData.replyBytes += reply.messages.size();
|
||||
|
||||
if(queueT > trackerData.queueMax) trackerData.queueMax = queueT;
|
||||
if(blockT > trackerData.blockMax) trackerData.blockMax = blockT;
|
||||
if(workT > trackerData.workMax) trackerData.workMax = workT;
|
||||
|
||||
trackerData.queueTime += queueT;
|
||||
trackerData.blockTime += blockT;
|
||||
trackerData.workTime += workT;
|
||||
|
||||
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
||||
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
|
||||
req.reply.sendError(operation_obsolete());
|
||||
|
@ -1611,6 +1678,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
return Void();
|
||||
}
|
||||
if(sequenceData.isSet()) {
|
||||
trackerData.duplicatePeeks++;
|
||||
if(sequenceData.getFuture().get().first != reply.end) {
|
||||
TEST(true); //tlog peek second attempt ended at a different version
|
||||
req.reply.sendError(operation_obsolete());
|
||||
|
@ -1934,6 +2002,47 @@ ACTOR Future<Void> cleanupPeekTrackers( LogData* logData ) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> logPeekTrackers( LogData* logData ) {
|
||||
loop {
|
||||
int64_t logThreshold = 1;
|
||||
if(logData->peekTracker.size() > SERVER_KNOBS->PEEK_LOGGING_AMOUNT) {
|
||||
std::vector<int64_t> peekCounts;
|
||||
peekCounts.reserve(logData->peekTracker.size());
|
||||
for( auto& it : logData->peekTracker ) {
|
||||
peekCounts.push_back(it.second.totalPeeks);
|
||||
}
|
||||
size_t pivot = peekCounts.size()-SERVER_KNOBS->PEEK_LOGGING_AMOUNT;
|
||||
std::nth_element(peekCounts.begin(), peekCounts.begin()+pivot, peekCounts.end());
|
||||
logThreshold = std::max<int64_t>(1,peekCounts[pivot]);
|
||||
}
|
||||
int logCount = 0;
|
||||
for( auto& it : logData->peekTracker ) {
|
||||
if(it.second.totalPeeks >= logThreshold) {
|
||||
logCount++;
|
||||
TraceEvent("PeekMetrics", logData->logId)
|
||||
.detail("Tag", it.second.tag.toString())
|
||||
.detail("Elapsed", now() - it.second.lastLogged)
|
||||
.detail("MeanReplyBytes", it.second.replyBytes/it.second.totalPeeks)
|
||||
.detail("TotalPeeks", it.second.totalPeeks)
|
||||
.detail("UnblockedPeeks", it.second.unblockedPeeks)
|
||||
.detail("DuplicatePeeks", it.second.duplicatePeeks)
|
||||
.detail("Sequence", it.second.sequence_version.size() ? it.second.sequence_version.begin()->first : -1)
|
||||
.detail("IdleSeconds", it.second.idleTime)
|
||||
.detail("IdleMax", it.second.idleMax)
|
||||
.detail("QueueSeconds", it.second.queueTime)
|
||||
.detail("QueueMax", it.second.queueMax)
|
||||
.detail("BlockSeconds", it.second.blockTime)
|
||||
.detail("BlockMax", it.second.blockMax)
|
||||
.detail("WorkSeconds", it.second.workTime)
|
||||
.detail("WorkMax", it.second.workMax);
|
||||
it.second.resetMetrics();
|
||||
}
|
||||
}
|
||||
|
||||
wait( delay(SERVER_KNOBS->PEEK_LOGGING_DELAY * std::max(1,logCount)) );
|
||||
}
|
||||
}
|
||||
|
||||
void getQueuingMetrics( TLogData* self, Reference<LogData> logData, TLogQueuingMetricsRequest const& req ) {
|
||||
TLogQueuingMetricsReply reply;
|
||||
reply.localTime = now();
|
||||
|
@ -2283,6 +2392,7 @@ ACTOR Future<Void> tLogCore( TLogData* self, Reference<LogData> logData, TLogInt
|
|||
logData->addActor.send( traceCounters("TLogMetrics", logData->logId, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &logData->cc, logData->logId.toString() + "/TLogMetrics"));
|
||||
logData->addActor.send( serveTLogInterface(self, tli, logData, warningCollectorInput) );
|
||||
logData->addActor.send( cleanupPeekTrackers(logData.getPtr()) );
|
||||
logData->addActor.send( logPeekTrackers(logData.getPtr()) );
|
||||
|
||||
if(!logData->isPrimary) {
|
||||
std::vector<Tag> tags;
|
||||
|
|
|
@ -491,6 +491,43 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
struct PeekTrackerData {
|
||||
std::map<int, Promise<std::pair<Version, bool>>> sequence_version;
|
||||
double lastUpdate;
|
||||
|
||||
Tag tag;
|
||||
|
||||
double lastLogged;
|
||||
int64_t totalPeeks;
|
||||
int64_t replyBytes;
|
||||
int64_t duplicatePeeks;
|
||||
double queueTime;
|
||||
double queueMax;
|
||||
double blockTime;
|
||||
double blockMax;
|
||||
double workTime;
|
||||
double workMax;
|
||||
|
||||
int64_t unblockedPeeks;
|
||||
double idleTime;
|
||||
double idleMax;
|
||||
|
||||
PeekTrackerData() : lastUpdate(0) {
|
||||
resetMetrics();
|
||||
}
|
||||
|
||||
void resetMetrics() {
|
||||
lastLogged = now();
|
||||
totalPeeks = 0;
|
||||
replyBytes = 0;
|
||||
duplicatePeeks = 0;
|
||||
queueTime = 0;
|
||||
queueMax = 0;
|
||||
blockTime = 0;
|
||||
blockMax = 0;
|
||||
workTime = 0;
|
||||
workMax = 0;
|
||||
unblockedPeeks = 0;
|
||||
idleTime = 0;
|
||||
idleMax = 0;
|
||||
}
|
||||
};
|
||||
|
||||
std::map<UID, PeekTrackerData> peekTracker;
|
||||
|
@ -1366,6 +1403,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
state BinaryWriter messages2(Unversioned());
|
||||
state int sequence = -1;
|
||||
state UID peekId;
|
||||
state double queueStart = now();
|
||||
|
||||
if(req.sequence.present()) {
|
||||
try {
|
||||
|
@ -1376,6 +1414,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
}
|
||||
auto& trackerData = logData->peekTracker[peekId];
|
||||
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
|
||||
trackerData.tag = req.tag;
|
||||
trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled));
|
||||
}
|
||||
auto seqBegin = trackerData.sequence_version.begin();
|
||||
|
@ -1392,8 +1431,15 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
throw operation_obsolete();
|
||||
}
|
||||
|
||||
Future<std::pair<Version, bool>> fPrevPeekData = trackerData.sequence_version[sequence].getFuture();
|
||||
if(fPrevPeekData.isReady()) {
|
||||
trackerData.unblockedPeeks++;
|
||||
double t = now() - trackerData.lastUpdate;
|
||||
if(t > trackerData.idleMax) trackerData.idleMax = t;
|
||||
trackerData.idleTime += t;
|
||||
}
|
||||
trackerData.lastUpdate = now();
|
||||
std::pair<Version, bool> prevPeekData = wait(trackerData.sequence_version[sequence].getFuture());
|
||||
std::pair<Version, bool> prevPeekData = wait(fPrevPeekData);
|
||||
req.begin = std::max(prevPeekData.first, req.begin);
|
||||
req.onlySpilled = prevPeekData.second;
|
||||
wait(yield());
|
||||
|
@ -1407,6 +1453,8 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
}
|
||||
}
|
||||
|
||||
state double blockStart = now();
|
||||
|
||||
if( req.returnIfBlocked && logData->version.get() < req.begin ) {
|
||||
req.reply.sendError(end_of_stream());
|
||||
if(req.sequence.present()) {
|
||||
|
@ -1442,6 +1490,8 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
wait(delay(0, TaskPriority::TLogSpilledPeekReply));
|
||||
}
|
||||
|
||||
state double workStart = now();
|
||||
|
||||
Version poppedVer = poppedVersion(logData, req.tag);
|
||||
if(poppedVer > req.begin) {
|
||||
TLogPeekReply rep;
|
||||
|
@ -1617,8 +1667,24 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
|
||||
if(req.sequence.present()) {
|
||||
auto& trackerData = logData->peekTracker[peekId];
|
||||
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
||||
trackerData.lastUpdate = now();
|
||||
|
||||
double queueT = blockStart-queueStart;
|
||||
double blockT = workStart-blockStart;
|
||||
double workT = now()-workStart;
|
||||
|
||||
trackerData.totalPeeks++;
|
||||
trackerData.replyBytes += reply.messages.size();
|
||||
|
||||
if(queueT > trackerData.queueMax) trackerData.queueMax = queueT;
|
||||
if(blockT > trackerData.blockMax) trackerData.blockMax = blockT;
|
||||
if(workT > trackerData.workMax) trackerData.workMax = workT;
|
||||
|
||||
trackerData.queueTime += queueT;
|
||||
trackerData.blockTime += blockT;
|
||||
trackerData.workTime += workT;
|
||||
|
||||
auto& sequenceData = trackerData.sequence_version[sequence+1];
|
||||
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
|
||||
req.reply.sendError(operation_obsolete());
|
||||
if(!sequenceData.isSet()) {
|
||||
|
@ -1631,6 +1697,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
|
|||
return Void();
|
||||
}
|
||||
if(sequenceData.isSet()) {
|
||||
trackerData.duplicatePeeks++;
|
||||
if(sequenceData.getFuture().get().first != reply.end) {
|
||||
TEST(true); //tlog peek second attempt ended at a different version
|
||||
req.reply.sendError(operation_obsolete());
|
||||
|
@ -1956,6 +2023,47 @@ ACTOR Future<Void> cleanupPeekTrackers( LogData* logData ) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> logPeekTrackers( LogData* logData ) {
|
||||
loop {
|
||||
int64_t logThreshold = 1;
|
||||
if(logData->peekTracker.size() > SERVER_KNOBS->PEEK_LOGGING_AMOUNT) {
|
||||
std::vector<int64_t> peekCounts;
|
||||
peekCounts.reserve(logData->peekTracker.size());
|
||||
for( auto& it : logData->peekTracker ) {
|
||||
peekCounts.push_back(it.second.totalPeeks);
|
||||
}
|
||||
size_t pivot = peekCounts.size()-SERVER_KNOBS->PEEK_LOGGING_AMOUNT;
|
||||
std::nth_element(peekCounts.begin(), peekCounts.begin()+pivot, peekCounts.end());
|
||||
logThreshold = std::max<int64_t>(1,peekCounts[pivot]);
|
||||
}
|
||||
int logCount = 0;
|
||||
for( auto& it : logData->peekTracker ) {
|
||||
if(it.second.totalPeeks >= logThreshold) {
|
||||
logCount++;
|
||||
TraceEvent("PeekMetrics", logData->logId)
|
||||
.detail("Tag", it.second.tag.toString())
|
||||
.detail("Elapsed", now() - it.second.lastLogged)
|
||||
.detail("MeanReplyBytes", it.second.replyBytes/it.second.totalPeeks)
|
||||
.detail("TotalPeeks", it.second.totalPeeks)
|
||||
.detail("UnblockedPeeks", it.second.unblockedPeeks)
|
||||
.detail("DuplicatePeeks", it.second.duplicatePeeks)
|
||||
.detail("Sequence", it.second.sequence_version.size() ? it.second.sequence_version.begin()->first : -1)
|
||||
.detail("IdleSeconds", it.second.idleTime)
|
||||
.detail("IdleMax", it.second.idleMax)
|
||||
.detail("QueueSeconds", it.second.queueTime)
|
||||
.detail("QueueMax", it.second.queueMax)
|
||||
.detail("BlockSeconds", it.second.blockTime)
|
||||
.detail("BlockMax", it.second.blockMax)
|
||||
.detail("WorkSeconds", it.second.workTime)
|
||||
.detail("WorkMax", it.second.workMax);
|
||||
it.second.resetMetrics();
|
||||
}
|
||||
}
|
||||
|
||||
wait( delay(SERVER_KNOBS->PEEK_LOGGING_DELAY * std::max(1,logCount)) );
|
||||
}
|
||||
}
|
||||
|
||||
void getQueuingMetrics( TLogData* self, Reference<LogData> logData, TLogQueuingMetricsRequest const& req ) {
|
||||
TLogQueuingMetricsReply reply;
|
||||
reply.localTime = now();
|
||||
|
@ -2302,6 +2410,7 @@ ACTOR Future<Void> tLogCore( TLogData* self, Reference<LogData> logData, TLogInt
|
|||
logData->addActor.send( traceCounters("TLogMetrics", logData->logId, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &logData->cc, logData->logId.toString() + "/TLogMetrics"));
|
||||
logData->addActor.send( serveTLogInterface(self, tli, logData, warningCollectorInput) );
|
||||
logData->addActor.send( cleanupPeekTrackers(logData.getPtr()) );
|
||||
logData->addActor.send( logPeekTrackers(logData.getPtr()) );
|
||||
|
||||
if(!logData->isPrimary) {
|
||||
std::vector<Tag> tags;
|
||||
|
|
|
@ -81,6 +81,7 @@ struct WorkerInterface {
|
|||
logRouter.getEndpoint( TaskPriority::Worker );
|
||||
debugPing.getEndpoint( TaskPriority::Worker );
|
||||
coordinationPing.getEndpoint( TaskPriority::Worker );
|
||||
eventLogRequest.getEndpoint( TaskPriority::Worker );
|
||||
}
|
||||
|
||||
template <class Ar>
|
||||
|
|
|
@ -1629,6 +1629,7 @@ int main(int argc, char* argv[]) {
|
|||
openTraceFile(NetworkAddress(), opts.rollsize, opts.maxLogsSize, opts.logFolder, "trace", opts.logGroup);
|
||||
} else {
|
||||
g_network = newNet2(opts.tlsConfig, opts.useThreadPool, true);
|
||||
g_network->addStopCallback( Net2FileSystem::stop );
|
||||
FlowTransport::createInstance(false, 1);
|
||||
|
||||
const bool expectsPublicAddress = (role == FDBD || role == NetworkTestServer || role == Restore);
|
||||
|
|
|
@ -23,12 +23,13 @@
|
|||
#define BOOST_SYSTEM_NO_LIB
|
||||
#define BOOST_DATE_TIME_NO_LIB
|
||||
#define BOOST_REGEX_NO_LIB
|
||||
#include "boost/asio.hpp"
|
||||
#include "boost/bind.hpp"
|
||||
#include "boost/date_time/posix_time/posix_time_types.hpp"
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/bind.hpp>
|
||||
#include <boost/date_time/posix_time/posix_time_types.hpp>
|
||||
#include <boost/range.hpp>
|
||||
#include <boost/algorithm/string/join.hpp>
|
||||
#include "flow/network.h"
|
||||
#include "flow/IThreadPool.h"
|
||||
#include "boost/range.hpp"
|
||||
|
||||
#include "flow/ActorCollection.h"
|
||||
#include "flow/ThreadSafeQueue.h"
|
||||
|
@ -142,9 +143,14 @@ public:
|
|||
if ( thread_network == this )
|
||||
stopImmediately();
|
||||
else
|
||||
// SOMEDAY: NULL for deferred error, no analysis of correctness (itp)
|
||||
onMainThreadVoid( [this] { this->stopImmediately(); }, NULL );
|
||||
}
|
||||
virtual void addStopCallback( std::function<void()> fn ) {
|
||||
if ( thread_network == this )
|
||||
stopCallbacks.emplace_back(std::move(fn));
|
||||
else
|
||||
onMainThreadVoid( [this, fn] { this->stopCallbacks.emplace_back(std::move(fn)); }, nullptr );
|
||||
}
|
||||
|
||||
virtual bool isSimulated() const { return false; }
|
||||
virtual THREAD_HANDLE startThread( THREAD_FUNC_RETURN (*func) (void*), void *arg);
|
||||
|
@ -232,6 +238,7 @@ public:
|
|||
EventMetricHandle<SlowTask> slowTaskMetric;
|
||||
|
||||
std::vector<std::string> blobCredentialFiles;
|
||||
std::vector<std::function<void()>> stopCallbacks;
|
||||
};
|
||||
|
||||
static boost::asio::ip::address tcpAddress(IPAddress const& n) {
|
||||
|
@ -261,11 +268,19 @@ public:
|
|||
try {
|
||||
if (error) {
|
||||
// Log the error...
|
||||
TraceEvent(SevWarn, errContext, errID).suppressFor(1.0).detail("ErrorCode", error.value()).detail("Message", error.message())
|
||||
{
|
||||
TraceEvent evt(SevWarn, errContext, errID);
|
||||
evt.suppressFor(1.0).detail("ErrorCode", error.value()).detail("Message", error.message());
|
||||
#ifndef TLS_DISABLED
|
||||
.detail("WhichMeans", TLSPolicy::ErrorString(error))
|
||||
// There is no function in OpenSSL to use to check if an error code is from OpenSSL,
|
||||
// but all OpenSSL errors have a non-zero "library" code set in bits 24-32, and linux
|
||||
// error codes should never go that high.
|
||||
if (error.value() >= (1 << 24L)) {
|
||||
evt.detail("WhichMeans", TLSPolicy::ErrorString(error));
|
||||
}
|
||||
#endif
|
||||
;
|
||||
}
|
||||
|
||||
p.sendError( connection_failed() );
|
||||
} else
|
||||
p.send( Void() );
|
||||
|
@ -790,11 +805,11 @@ private:
|
|||
}
|
||||
|
||||
void onReadError( const boost::system::error_code& error ) {
|
||||
TraceEvent(SevWarn, "N2_ReadError", id).suppressFor(1.0).detail("Message", error.value());
|
||||
TraceEvent(SevWarn, "N2_ReadError", id).suppressFor(1.0).detail("ErrorCode", error.value()).detail("Message", error.message());
|
||||
closeSocket();
|
||||
}
|
||||
void onWriteError( const boost::system::error_code& error ) {
|
||||
TraceEvent(SevWarn, "N2_WriteError", id).suppressFor(1.0).detail("Message", error.value());
|
||||
TraceEvent(SevWarn, "N2_WriteError", id).suppressFor(1.0).detail("ErrorCode", error.value()).detail("Message", error.message());
|
||||
closeSocket();
|
||||
}
|
||||
};
|
||||
|
@ -896,13 +911,19 @@ ACTOR static Future<Void> watchFileForChanges( std::string filename, AsyncTrigge
|
|||
if (filename == "") {
|
||||
return Never();
|
||||
}
|
||||
state std::time_t lastModTime = wait(IAsyncFileSystem::filesystem()->lastWriteTime(filename));
|
||||
state bool firstRun = true;
|
||||
state bool statError = false;
|
||||
state std::time_t lastModTime = 0;
|
||||
loop {
|
||||
wait(delay(FLOW_KNOBS->TLS_CERT_REFRESH_DELAY_SECONDS));
|
||||
try {
|
||||
std::time_t modtime = wait(IAsyncFileSystem::filesystem()->lastWriteTime(filename));
|
||||
if (lastModTime != modtime) {
|
||||
if (firstRun) {
|
||||
lastModTime = modtime;
|
||||
firstRun = false;
|
||||
}
|
||||
if (lastModTime != modtime || statError) {
|
||||
lastModTime = modtime;
|
||||
statError = false;
|
||||
fileChanged->trigger();
|
||||
}
|
||||
} catch (Error& e) {
|
||||
|
@ -912,10 +933,12 @@ ACTOR static Future<Void> watchFileForChanges( std::string filename, AsyncTrigge
|
|||
// certificates, then there's no point in crashing, but we should complain
|
||||
// loudly. IAsyncFile will log the error, but not necessarily as a warning.
|
||||
TraceEvent(SevWarnAlways, "TLSCertificateRefreshStatError").detail("File", filename);
|
||||
statError = true;
|
||||
} else {
|
||||
throw;
|
||||
}
|
||||
}
|
||||
wait(delay(FLOW_KNOBS->TLS_CERT_REFRESH_DELAY_SECONDS));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -964,16 +987,22 @@ void Net2::initTLS() {
|
|||
return;
|
||||
}
|
||||
#ifndef TLS_DISABLED
|
||||
auto onPolicyFailure = [this]() { this->countTLSPolicyFailures++; };
|
||||
try {
|
||||
boost::asio::ssl::context newContext(boost::asio::ssl::context::tls);
|
||||
auto onPolicyFailure = [this]() { this->countTLSPolicyFailures++; };
|
||||
const LoadedTLSConfig& loaded = tlsConfig.loadSync();
|
||||
TraceEvent("Net2TLSConfig")
|
||||
.detail("CAPath", tlsConfig.getCAPathSync())
|
||||
.detail("CertificatePath", tlsConfig.getCertificatePathSync())
|
||||
.detail("KeyPath", tlsConfig.getKeyPathSync())
|
||||
.detail("HasPassword", !loaded.getPassword().empty())
|
||||
.detail("VerifyPeers", boost::algorithm::join(loaded.getVerifyPeers(), "|"));
|
||||
ConfigureSSLContext( tlsConfig.loadSync(), &newContext, onPolicyFailure );
|
||||
sslContextVar.set(ReferencedObject<boost::asio::ssl::context>::from(std::move(newContext)));
|
||||
backgroundCertRefresh = reloadCertificatesOnChange( tlsConfig, onPolicyFailure, &sslContextVar );
|
||||
} catch (Error& e) {
|
||||
TraceEvent("Net2TLSInitError").error(e);
|
||||
throw tls_error();
|
||||
}
|
||||
backgroundCertRefresh = reloadCertificatesOnChange( tlsConfig, onPolicyFailure, &sslContextVar );
|
||||
#endif
|
||||
tlsInitialized = true;
|
||||
}
|
||||
|
@ -1199,6 +1228,10 @@ void Net2::run() {
|
|||
TraceEvent("SomewhatSlowRunLoopBottom").detail("Elapsed", nnow - now); // This includes the time spent running tasks
|
||||
}
|
||||
|
||||
for ( auto& fn : stopCallbacks ) {
|
||||
fn();
|
||||
}
|
||||
|
||||
#ifdef WIN32
|
||||
timeEndPeriod(1);
|
||||
#endif
|
||||
|
|
|
@ -2836,7 +2836,25 @@ void crashHandler(int sig) {
|
|||
fprintf(stderr, "SIGNAL: %s (%d)\n", strsignal(sig), sig);
|
||||
fprintf(stderr, "Trace: %s\n", backtrace.c_str());
|
||||
|
||||
_exit(128 + sig);
|
||||
struct sigaction sa;
|
||||
sa.sa_handler = SIG_DFL;
|
||||
if (sigemptyset(&sa.sa_mask)) {
|
||||
int err = errno;
|
||||
fprintf(stderr, "sigemptyset failed: %s\n", strerror(err));
|
||||
_exit(sig + 128);
|
||||
}
|
||||
sa.sa_flags = 0;
|
||||
if (sigaction(sig, &sa, NULL)) {
|
||||
int err = errno;
|
||||
fprintf(stderr, "sigaction failed: %s\n", strerror(err));
|
||||
_exit(sig + 128);
|
||||
}
|
||||
if (kill(getpid(), sig)) {
|
||||
int err = errno;
|
||||
fprintf(stderr, "kill failed: %s\n", strerror(err));
|
||||
_exit(sig + 128);
|
||||
}
|
||||
// Rely on kill to end the process
|
||||
#else
|
||||
// No crash handler for other platforms!
|
||||
#endif
|
||||
|
|
|
@ -25,6 +25,32 @@
|
|||
// To force typeinfo to only be emitted once.
|
||||
TLSPolicy::~TLSPolicy() {}
|
||||
|
||||
namespace TLS {
|
||||
|
||||
void DisableOpenSSLAtExitHandler() {
|
||||
#ifdef TLS_DISABLED
|
||||
return;
|
||||
#else
|
||||
static bool once = false;
|
||||
if (!once) {
|
||||
once = true;
|
||||
int success = OPENSSL_init_crypto(OPENSSL_INIT_NO_ATEXIT, nullptr);
|
||||
if (!success) {
|
||||
throw tls_error();
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void DestroyOpenSSLGlobalState() {
|
||||
#ifdef TLS_DISABLED
|
||||
return;
|
||||
#else
|
||||
OPENSSL_cleanup();
|
||||
#endif
|
||||
}
|
||||
|
||||
} // namespace TLS
|
||||
#ifdef TLS_DISABLED
|
||||
|
||||
void LoadedTLSConfig::print(FILE *fp) {
|
||||
|
|
|
@ -36,6 +36,22 @@
|
|||
#include "flow/Knobs.h"
|
||||
#include "flow/flow.h"
|
||||
|
||||
namespace TLS {
|
||||
|
||||
// Force OpenSSL to not register an atexit handler to clean up global state before process exit.
|
||||
// If you call this, you must also call DestroyOpenSSLGlobalState() before the program exits.
|
||||
// Calls OPENSSL_init_crypto with OPENSSL_INIT_NO_ATEXIT.
|
||||
// Must be called before any other OpenSSL function.
|
||||
void DisableOpenSSLAtExitHandler();
|
||||
|
||||
// Frees all global state maintained by OpenSSL.
|
||||
// Calls OPENSSL_cleanup.
|
||||
// Must be called before program exit if using DisableOpenSSLAtExitHandler.
|
||||
// No OpenSSL code may be run after calling this function.
|
||||
void DestroyOpenSSLGlobalState();
|
||||
|
||||
} // namespace TLS
|
||||
|
||||
#ifndef TLS_DISABLED
|
||||
|
||||
#include <openssl/x509.h>
|
||||
|
|
|
@ -31,10 +31,12 @@
|
|||
namespace detail {
|
||||
|
||||
namespace {
|
||||
std::vector<int> mWriteToOffsetsMemoy;
|
||||
thread_local std::vector<int> gWriteToOffsetsMemory;
|
||||
}
|
||||
|
||||
std::vector<int>* writeToOffsetsMemory = &mWriteToOffsetsMemoy;
|
||||
void swapWithThreadLocalGlobal(std::vector<int>& writeToOffsets) {
|
||||
gWriteToOffsetsMemory.swap(writeToOffsets);
|
||||
}
|
||||
|
||||
VTable generate_vtable(size_t numMembers, const std::vector<unsigned>& sizesAlignments) {
|
||||
if (numMembers == 0) {
|
||||
|
|
|
@ -348,15 +348,16 @@ struct _SizeOf {
|
|||
static constexpr unsigned int align = fb_align<T>;
|
||||
};
|
||||
|
||||
extern std::vector<int>* writeToOffsetsMemory;
|
||||
// Re-use this intermediate memory to avoid frequent new/delete
|
||||
void swapWithThreadLocalGlobal(std::vector<int>& writeToOffsets);
|
||||
|
||||
template <class Context>
|
||||
struct PrecomputeSize : Context {
|
||||
PrecomputeSize(const Context& context) : Context(context) {
|
||||
writeToOffsets.swap(*writeToOffsetsMemory);
|
||||
swapWithThreadLocalGlobal(writeToOffsets);
|
||||
writeToOffsets.clear();
|
||||
}
|
||||
~PrecomputeSize() { writeToOffsets.swap(*writeToOffsetsMemory); }
|
||||
~PrecomputeSize() { swapWithThreadLocalGlobal(writeToOffsets); }
|
||||
// |offset| is measured from the end of the buffer. Precondition: len <=
|
||||
// offset.
|
||||
void write(const void*, int offset, int /*len*/) { current_buffer_size = std::max(current_buffer_size, offset); }
|
||||
|
@ -496,7 +497,7 @@ extern VTable generate_vtable(size_t numMembers, const std::vector<unsigned>& si
|
|||
|
||||
template <unsigned... MembersAndAlignments>
|
||||
const VTable* gen_vtable3() {
|
||||
static VTable table =
|
||||
static thread_local VTable table =
|
||||
generate_vtable(sizeof...(MembersAndAlignments) / 2, std::vector<unsigned>{ MembersAndAlignments... });
|
||||
return &table;
|
||||
}
|
||||
|
@ -624,7 +625,7 @@ VTableSet get_vtableset_impl(const Root& root, const Context& context) {
|
|||
|
||||
template <class Root, class Context>
|
||||
const VTableSet* get_vtableset(const Root& root, const Context& context) {
|
||||
static VTableSet result = get_vtableset_impl(root, context);
|
||||
static thread_local VTableSet result = get_vtableset_impl(root, context);
|
||||
return &result;
|
||||
}
|
||||
|
||||
|
|
|
@ -485,6 +485,10 @@ public:
|
|||
virtual void stop() = 0;
|
||||
// Terminate the program
|
||||
|
||||
virtual void addStopCallback( std::function<void()> fn ) = 0;
|
||||
// Calls `fn` when stop() is called.
|
||||
// addStopCallback can be called more than once, and each added `fn` will be run once.
|
||||
|
||||
virtual bool isSimulated() const = 0;
|
||||
// Returns true if this network is a local simulation
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@
|
|||
|
||||
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
|
||||
<Product Name='$(var.Title)'
|
||||
Id='{F7603B3D-766D-4C5C-906E-4F1FD3BEF455}'
|
||||
Id='{56C3F81D-327F-4AC4-8432-11B4A4643250}'
|
||||
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
|
||||
Version='$(var.Version)'
|
||||
Manufacturer='$(var.Manufacturer)'
|
||||
|
|
Loading…
Reference in New Issue