Merge remote-tracking branch 'upstream/master' into toml-testspec

This commit is contained in:
Alex Miller 2020-07-19 22:16:38 -07:00
commit 878254497a
62 changed files with 868 additions and 307 deletions

View File

@ -239,7 +239,9 @@ function(create_correctness_package)
${out_dir}/joshua_test
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/correctnessTimeout.sh
${out_dir}/joshua_timeout
COMMAND ${CMAKE_COMMAND} -E tar cfz ${tar_file} *
COMMAND ${CMAKE_COMMAND} -E tar cfz ${tar_file} ${package_files}
${out_dir}/joshua_test
${out_dir}/joshua_timeout
WORKING_DIRECTORY ${out_dir}
COMMENT "Package correctness archive"
)
@ -264,7 +266,9 @@ function(create_valgrind_correctness_package)
${out_dir}/joshua_test
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/valgrindTimeout.sh
${out_dir}/joshua_timeout
COMMAND ${CMAKE_COMMAND} -E tar cfz ${tar_file} *
COMMAND ${CMAKE_COMMAND} -E tar cfz ${tar_file} ${package_files}
${out_dir}/joshua_test
${out_dir}/joshua_timeout
WORKING_DIRECTORY ${out_dir}
COMMENT "Package valgrind correctness archive"
)

View File

@ -4,6 +4,7 @@ env_set(USE_GPERFTOOLS OFF BOOL "Use gperfools for profiling")
env_set(USE_DTRACE ON BOOL "Enable dtrace probes on supported platforms")
env_set(USE_VALGRIND OFF BOOL "Compile for valgrind usage")
env_set(USE_VALGRIND_FOR_CTEST ${USE_VALGRIND} BOOL "Use valgrind for ctest")
env_set(VALGRIND_ARENA OFF BOOL "Inform valgrind about arena-allocated memory. Makes valgrind slower but more precise.")
env_set(ALLOC_INSTRUMENTATION OFF BOOL "Instrument alloc")
env_set(WITH_UNDODB OFF BOOL "Use rr or undodb")
env_set(USE_ASAN OFF BOOL "Compile with address sanitizer")
@ -239,7 +240,10 @@ else()
#add_compile_options(-fno-builtin-memcpy)
if (USE_VALGRIND)
add_compile_options(-DVALGRIND -DUSE_VALGRIND)
add_compile_options(-DVALGRIND=1 -DUSE_VALGRIND=1)
endif()
if (VALGRIND_ARENA)
add_compile_options(-DVALGRIND_ARENA=1)
endif()
if (CLANG)
add_compile_options()
@ -271,7 +275,10 @@ else()
-Wno-undefined-var-template
-Wno-tautological-pointer-compare
-Wno-format
-Woverloaded-virtual)
-Wredundant-move
-Wpessimizing-move
-Woverloaded-virtual
)
if (USE_CCACHE)
add_compile_options(
-Wno-register
@ -337,3 +344,4 @@ else()
endif()
endif()
endif()

View File

@ -1158,7 +1158,7 @@ the most part, this also implies that ``T == fdb.tuple.unpack(fdb.tuple.pack(T))
.. method:: has_incomplete_versionstamp(tuple)
Returns ``True`` if there is at least one element contained within the tuple that is a
:class`Versionstamp` instance that is incomplete. If there are multiple incomplete
:class:`Versionstamp` instance that is incomplete. If there are multiple incomplete
:class:`Versionstamp` instances, this method will return ``True``, but trying to pack it into a
byte string will result in an error.

View File

@ -10,38 +10,38 @@ macOS
The macOS installation package is supported on macOS 10.7+. It includes the client and (optionally) the server.
* `FoundationDB-6.3.2.pkg <https://www.foundationdb.org/downloads/6.3.2/macOS/installers/FoundationDB-6.3.2.pkg>`_
* `FoundationDB-6.3.3.pkg <https://www.foundationdb.org/downloads/6.3.3/macOS/installers/FoundationDB-6.3.3.pkg>`_
Ubuntu
------
The Ubuntu packages are supported on 64-bit Ubuntu 12.04+, but beware of the Linux kernel bug in Ubuntu 12.x.
* `foundationdb-clients-6.3.2-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.2/ubuntu/installers/foundationdb-clients_6.3.2-1_amd64.deb>`_
* `foundationdb-server-6.3.2-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.2/ubuntu/installers/foundationdb-server_6.3.2-1_amd64.deb>`_ (depends on the clients package)
* `foundationdb-clients-6.3.3-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.3/ubuntu/installers/foundationdb-clients_6.3.3-1_amd64.deb>`_
* `foundationdb-server-6.3.3-1_amd64.deb <https://www.foundationdb.org/downloads/6.3.3/ubuntu/installers/foundationdb-server_6.3.3-1_amd64.deb>`_ (depends on the clients package)
RHEL/CentOS EL6
---------------
The RHEL/CentOS EL6 packages are supported on 64-bit RHEL/CentOS 6.x.
* `foundationdb-clients-6.3.2-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.2/rhel6/installers/foundationdb-clients-6.3.2-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.3.2-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.2/rhel6/installers/foundationdb-server-6.3.2-1.el6.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.3.3-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.3/rhel6/installers/foundationdb-clients-6.3.3-1.el6.x86_64.rpm>`_
* `foundationdb-server-6.3.3-1.el6.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.3/rhel6/installers/foundationdb-server-6.3.3-1.el6.x86_64.rpm>`_ (depends on the clients package)
RHEL/CentOS EL7
---------------
The RHEL/CentOS EL7 packages are supported on 64-bit RHEL/CentOS 7.x.
* `foundationdb-clients-6.3.2-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.2/rhel7/installers/foundationdb-clients-6.3.2-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.3.2-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.2/rhel7/installers/foundationdb-server-6.3.2-1.el7.x86_64.rpm>`_ (depends on the clients package)
* `foundationdb-clients-6.3.3-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.3/rhel7/installers/foundationdb-clients-6.3.3-1.el7.x86_64.rpm>`_
* `foundationdb-server-6.3.3-1.el7.x86_64.rpm <https://www.foundationdb.org/downloads/6.3.3/rhel7/installers/foundationdb-server-6.3.3-1.el7.x86_64.rpm>`_ (depends on the clients package)
Windows
-------
The Windows installer is supported on 64-bit Windows XP and later. It includes the client and (optionally) the server.
* `foundationdb-6.3.2-x64.msi <https://www.foundationdb.org/downloads/6.3.2/windows/installers/foundationdb-6.3.2-x64.msi>`_
* `foundationdb-6.3.3-x64.msi <https://www.foundationdb.org/downloads/6.3.3/windows/installers/foundationdb-6.3.3-x64.msi>`_
API Language Bindings
=====================
@ -58,18 +58,18 @@ On macOS and Windows, the FoundationDB Python API bindings are installed as part
If you need to use the FoundationDB Python API from other Python installations or paths, use the Python package manager ``pip`` (``pip install foundationdb``) or download the Python package:
* `foundationdb-6.3.2.tar.gz <https://www.foundationdb.org/downloads/6.3.2/bindings/python/foundationdb-6.3.2.tar.gz>`_
* `foundationdb-6.3.3.tar.gz <https://www.foundationdb.org/downloads/6.3.3/bindings/python/foundationdb-6.3.3.tar.gz>`_
Ruby 1.9.3/2.0.0+
-----------------
* `fdb-6.3.2.gem <https://www.foundationdb.org/downloads/6.3.2/bindings/ruby/fdb-6.3.2.gem>`_
* `fdb-6.3.3.gem <https://www.foundationdb.org/downloads/6.3.3/bindings/ruby/fdb-6.3.3.gem>`_
Java 8+
-------
* `fdb-java-6.3.2.jar <https://www.foundationdb.org/downloads/6.3.2/bindings/java/fdb-java-6.3.2.jar>`_
* `fdb-java-6.3.2-javadoc.jar <https://www.foundationdb.org/downloads/6.3.2/bindings/java/fdb-java-6.3.2-javadoc.jar>`_
* `fdb-java-6.3.3.jar <https://www.foundationdb.org/downloads/6.3.3/bindings/java/fdb-java-6.3.3.jar>`_
* `fdb-java-6.3.3-javadoc.jar <https://www.foundationdb.org/downloads/6.3.3/bindings/java/fdb-java-6.3.3-javadoc.jar>`_
Go 1.11+
--------

View File

@ -113,6 +113,44 @@
"counter":0,
"roughness":0.0
},
"grv_latency_statistics":{
"default":{
"count":0,
"min":0.0,
"max":0.0,
"median":0.0,
"mean":0.0,
"p25":0.0,
"p90":0.0,
"p95":0.0,
"p99":0.0,
"p99.9":0.0
}
},
"read_latency_statistics":{
"count":0,
"min":0.0,
"max":0.0,
"median":0.0,
"mean":0.0,
"p25":0.0,
"p90":0.0,
"p95":0.0,
"p99":0.0,
"p99.9":0.0
},
"commit_latency_statistics":{
"count":0,
"min":0.0,
"max":0.0,
"median":0.0,
"mean":0.0,
"p25":0.0,
"p90":0.0,
"p95":0.0,
"p99":0.0,
"p99.9":0.0
},
"grv_latency_bands":{ // How many GRV requests belong to the latency (in seconds) band (e.g., How many requests belong to [0.01,0.1] latency band). The key is the upper bound of the band and the lower bound is the next smallest band (or 0, if none). Example: {0.01: 27, 0.1: 18, 1: 1, inf: 98,filtered: 10}, we have 18 requests in [0.01, 0.1) band.
"$map_key=upperBoundOfBand": 1
},

View File

@ -5,9 +5,17 @@ Release Notes
6.2.23
======
Fixes
-----
* When configured with ``usable_regions=2`` data distribution could temporarily lower the replication of a shard when moving it. `(PR #3487) <https://github.com/apple/foundationdb/pull/3487>`_
* Prevent data distribution from running out of memory by fetching the source servers for too many shards in parallel. `(PR #3487) <https://github.com/apple/foundationdb/pull/3487>`_
* Reset network connections between log routers and satellite tlogs if the latencies are larger than 500ms. `(PR #3487) <https://github.com/apple/foundationdb/pull/3487>`_
Status
------
* Added per-process server request latency statistics reported in the role section of relevant processes. These are named ``grv_latency_statistics`` and ``commit_latency_statistics`` on proxy roles and ``read_latency_statistics`` on storage roles. `(PR #3480) <https://github.com/apple/foundationdb/pull/3480>`_
* Added ``cluster.active_primary_dc`` that indicates which datacenter is serving as the primary datacenter in multi-region setups. `(PR #3320) <https://github.com/apple/foundationdb/pull/3320>`_
6.2.22

View File

@ -2,7 +2,7 @@
Release Notes
#############
6.3.2
6.3.3
=====
Features
@ -104,10 +104,12 @@ Fixes from previous versions
----------------------------
* The 6.3.1 patch release includes all fixes from the patch releases 6.2.21 and 6.2.22. :doc:`(6.2 Release Notes) </release-notes/release-notes-620>`
* The 6.3.3 patch release includes all fixes from the patch release 6.2.23. :doc:`(6.2 Release Notes) </release-notes/release-notes-620>`
Fixes only impacting 6.3.0+
---------------------------
* Clients did not probably balance requests to the proxies. [6.3.3] `(PR #3377) <https://github.com/apple/foundationdb/pull/3377>`_
* Renamed ``MIN_DELAY_STORAGE_CANDIDACY_SECONDS`` knob to ``MIN_DELAY_CC_WORST_FIT_CANDIDACY_SECONDS``. [6.3.2] `(PR #3327) <https://github.com/apple/foundationdb/pull/3327>`_
* Refreshing TLS certificates could cause crashes. [6.3.2] `(PR #3352) <https://github.com/apple/foundationdb/pull/3352>`_
* All storage class processes attempted to connect to the same coordinator. [6.3.2] `(PR #3361) <https://github.com/apple/foundationdb/pull/3361>`_

View File

@ -1545,9 +1545,13 @@ public:
// Remove trailing slashes on path
path.erase(path.find_last_not_of("\\/") + 1);
if(!g_network->isSimulated() && path != abspath(path)) {
TraceEvent(SevWarn, "BackupContainerLocalDirectory").detail("Description", "Backup path must be absolute (e.g. file:///some/path)").detail("URL", url).detail("Path", path);
throw io_error();
std::string absolutePath = abspath(path);
if(!g_network->isSimulated() && path != absolutePath) {
TraceEvent(SevWarn, "BackupContainerLocalDirectory").detail("Description", "Backup path must be absolute (e.g. file:///some/path)").detail("URL", url).detail("Path", path).detail("AbsolutePath", absolutePath);
// throw io_error();
IBackupContainer::lastOpenError = format("Backup path '%s' must be the absolute path '%s'", path.c_str(), absolutePath.c_str());
throw backup_invalid_url();
}
// Finalized path written to will be will be <path>/backup-<uid>

View File

@ -32,7 +32,7 @@
#include "fdbserver/RatekeeperInterface.h"
#include "fdbclient/TagThrottle.h"
#include "flow/Stats.h"
#include "fdbrpc/Stats.h"
#include "fdbrpc/TimedRequest.h"
struct MasterProxyInterface {
@ -158,12 +158,14 @@ struct CommitTransactionRequest : TimedRequest {
ReplyPromise<CommitID> reply;
uint32_t flags;
Optional<UID> debugID;
Optional<TransactionCommitCostEstimation> commitCostEstimation;
Optional<TagSet> tagSet;
CommitTransactionRequest() : flags(0) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, transaction, reply, arena, flags, debugID, spanContext);
serializer(ar, transaction, reply, arena, flags, debugID, commitCostEstimation, tagSet, spanContext);
}
};

View File

@ -3059,6 +3059,7 @@ void TransactionOptions::clear() {
tags = TagSet{};
readTags = TagSet{};
priority = TransactionPriority::DEFAULT;
expensiveClearCostEstimation = false;
}
TransactionOptions::TransactionOptions() {
@ -3257,6 +3258,36 @@ void Transaction::setupWatches() {
}
}
ACTOR Future<TransactionCommitCostEstimation> estimateCommitCosts(Transaction* self,
CommitTransactionRef* transaction) {
state MutationRef* it = transaction->mutations.begin();
state MutationRef* end = transaction->mutations.end();
state TransactionCommitCostEstimation trCommitCosts;
state KeyRange keyRange;
for (; it != end; ++it) {
if (it->type == MutationRef::Type::SetValue) {
trCommitCosts.bytesWrite += it->expectedSize();
trCommitCosts.numWrite++;
} else if (it->isAtomicOp()) {
trCommitCosts.bytesAtomicWrite += it->expectedSize();
trCommitCosts.numAtomicWrite++;
} else if (it->type == MutationRef::Type::ClearRange) {
trCommitCosts.numClear++;
keyRange = KeyRange(KeyRangeRef(it->param1, it->param2));
if (self->options.expensiveClearCostEstimation) {
StorageMetrics m = wait(self->getStorageMetrics(keyRange, std::numeric_limits<int>::max()));
trCommitCosts.bytesClearEst += m.bytes;
}
else {
std::vector<pair<KeyRange, Reference<LocationInfo>>> locations = wait(getKeyRangeLocations(
self->getDatabase(), keyRange, std::numeric_limits<int>::max(), false, &StorageServerInterface::getShardState, self->info));
trCommitCosts.numClearShards += locations.size();
}
}
}
return trCommitCosts;
}
ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo> trLogInfo, CommitTransactionRequest req, Future<Version> readVersion, TransactionInfo info, Version* pCommittedVersion, Transaction* tr, TransactionOptions options) {
state TraceInterval interval( "TransactionCommit" );
state double startTime = now();
@ -3273,8 +3304,12 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
commit_unknown_result()});
}
Version v = wait( readVersion );
req.transaction.read_snapshot = v;
if (!req.tagSet.present()) {
wait(store(req.transaction.read_snapshot, readVersion));
} else {
req.commitCostEstimation = TransactionCommitCostEstimation();
wait(store(req.transaction.read_snapshot, readVersion) && store(req.commitCostEstimation.get(), estimateCommitCosts(tr, &req.transaction)));
}
startTime = now();
state Optional<UID> commitID = Optional<UID>();
@ -3401,6 +3436,7 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
}
}
Future<Void> Transaction::commitMutations() {
try {
//if this is a read-only transaction return immediately
@ -3419,6 +3455,8 @@ Future<Void> Transaction::commitMutations() {
cx->mutationsPerCommit.addSample(tr.transaction.mutations.size());
cx->bytesPerCommit.addSample(tr.transaction.mutations.expectedSize());
if(options.tags.size())
tr.tagSet = options.tags;
size_t transactionSize = getSize();
if (transactionSize > (uint64_t)FLOW_KNOBS->PACKET_WARNING) {
@ -3693,12 +3731,17 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
span.addParent(BinaryReader::fromStringRef<UID>(value.get(), Unversioned()));
break;
case FDBTransactionOptions::REPORT_CONFLICTING_KEYS:
validateOptionValue(value, false);
options.reportConflictingKeys = true;
break;
case FDBTransactionOptions::REPORT_CONFLICTING_KEYS:
validateOptionValue(value, false);
options.reportConflictingKeys = true;
break;
case FDBTransactionOptions::EXPENSIVE_CLEAR_COST_ESTIMATION_ENABLE:
validateOptionValue(value, false);
options.expensiveClearCostEstimation = true;
break;
default:
default:
break;
}
}

View File

@ -133,6 +133,7 @@ struct TransactionOptions {
bool firstInBatch : 1;
bool includePort : 1;
bool reportConflictingKeys : 1;
bool expensiveClearCostEstimation : 1;
TransactionPriority priority;

View File

@ -30,10 +30,10 @@
#include <sstream>
#include <string>
#include "flow/Stats.h"
#include "flow/flow.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/Locality.h"
#include "fdbrpc/Stats.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbserver/CoordinationInterface.h"

View File

@ -136,6 +136,44 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"counter":0,
"roughness":0.0
},
"grv_latency_statistics":{
"default":{
"count":0,
"min":0.0,
"max":0.0,
"median":0.0,
"mean":0.0,
"p25":0.0,
"p90":0.0,
"p95":0.0,
"p99":0.0,
"p99.9":0.0
}
},
"read_latency_statistics":{
"count":0,
"min":0.0,
"max":0.0,
"median":0.0,
"mean":0.0,
"p25":0.0,
"p90":0.0,
"p95":0.0,
"p99":0.0,
"p99.9":0.0
},
"commit_latency_statistics":{
"count":0,
"min":0.0,
"max":0.0,
"median":0.0,
"mean":0.0,
"p25":0.0,
"p90":0.0,
"p95":0.0,
"p99":0.0,
"p99.9":0.0
},
"grv_latency_bands":{
"$map": 1
},

View File

@ -27,7 +27,7 @@
#include "fdbrpc/QueueModel.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/LoadBalance.actor.h"
#include "flow/Stats.h"
#include "fdbrpc/Stats.h"
#include "fdbrpc/TimedRequest.h"
#include "fdbclient/TagThrottle.h"

View File

@ -270,6 +270,8 @@ description is not currently required but encouraged.
description="Adds a tag to the transaction that can be used to apply manual or automatic targeted throttling. At most 5 tags can be set on a transaction." />
<Option name="span_parent" code="900" paramType="Bytes" paramDescription="A byte string of length 16 used to associate the span of this transaction with a parent"
description="Adds a parent to the Span of this transaction. Used for transaction tracing. A span can be identified with any 16 bytes"/>
<Option name="expensive_clear_cost_estimation_enable" code="1000"
description="Asks storage servers for how many bytes a clear key range contains. Otherwise uses the location cache to roughly estimate this." />
</Scope>
<!-- The enumeration values matter - do not change them without

View File

@ -22,6 +22,8 @@ set(FDBRPC_SRCS
ReplicationPolicy.cpp
ReplicationTypes.cpp
ReplicationUtils.cpp
Stats.actor.cpp
Stats.h
sim2.actor.cpp
sim_validation.cpp
TimedRequest.h

View File

@ -50,7 +50,7 @@ public:
return *this;
}
double mean() {
double mean() const {
if (!samples.size()) return 0;
T sum = 0;
for( int c = 0; c < samples.size(); c++ )
@ -70,8 +70,8 @@ public:
return samples[ idx ];
}
T min() { return _min; }
T max() { return _max; }
T min() const { return _min; }
T max() const { return _max; }
void clear() {
samples.clear();
@ -80,6 +80,10 @@ public:
_min = _max = 0; // Doesn't work for all T
}
uint64_t getPopulationSize() const {
return populationSize;
}
private:
int sampleSize;
uint64_t populationSize;

View File

@ -554,7 +554,9 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
self->transport->countConnEstablished++;
if (!delayedHealthUpdateF.isValid())
delayedHealthUpdateF = delayedHealthUpdate(self->destination);
wait(connectionWriter(self, conn) || reader || connectionMonitor(self));
wait(connectionWriter(self, conn) || reader || connectionMonitor(self) || self->resetConnection.onTrigger());
TraceEvent("ConnectionReset", conn ? conn->getDebugID() : UID()).suppressFor(1.0).detail("PeerAddr", self->destination);
throw connection_failed();
} catch (Error& e) {
if (e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled ||
e.code() == error_code_connection_unreferenced ||
@ -565,8 +567,6 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
throw e;
}
ASSERT( false );
} catch (Error& e) {
delayedHealthUpdateF.cancel();
if(now() - self->lastConnectTime > FLOW_KNOBS->RECONNECTION_RESET_TIME) {
@ -1430,6 +1430,13 @@ Reference<AsyncVar<bool>> FlowTransport::getDegraded() {
return self->degraded;
}
void FlowTransport::resetConnection( NetworkAddress address ) {
auto peer = self->getPeer(address);
if(peer) {
peer->resetConnection.trigger();
}
}
bool FlowTransport::incompatibleOutgoingConnectionsPresent() {
return self->numIncompatibleConnections > 0;
}

View File

@ -132,6 +132,7 @@ struct Peer : public ReferenceCounted<Peer> {
AsyncTrigger dataToSend; // Triggered when unsent.empty() becomes false
Future<Void> connect;
AsyncTrigger resetPing;
AsyncTrigger resetConnection;
bool compatible;
bool outgoingConnectionIdle; // We don't actually have a connection open and aren't trying to open one because we don't have anything to send
double lastConnectTime;
@ -213,6 +214,9 @@ public:
Reference<AsyncVar<bool>> getDegraded();
// This async var will be set to true when the process cannot connect to a public network address that the failure monitor thinks is healthy.
void resetConnection( NetworkAddress address );
// Forces the connection with this address to be reset
Reference<Peer> sendUnreliable( ISerializeSource const& what, const Endpoint& destination, bool openConnection );// { cancelReliable(sendReliable(what,destination)); }
bool incompatibleOutgoingConnectionsPresent();

View File

@ -18,7 +18,7 @@
* limitations under the License.
*/
#include "flow/Stats.h"
#include "fdbrpc/Stats.h"
#include "flow/actorcompiler.h" // has to be last include
Counter::Counter(std::string const& name, CounterCollection& collection)

View File

@ -18,8 +18,8 @@
* limitations under the License.
*/
#ifndef FLOW_STATS_H
#define FLOW_STATS_H
#ifndef FDBRPC_STATS_H
#define FDBRPC_STATS_H
#pragma once
// Yet another performance statistics interface
@ -37,6 +37,7 @@ MyCounters() : foo("foo", cc), bar("bar", cc), baz("baz", cc) {}
#include <cstddef>
#include "flow/flow.h"
#include "flow/TDMetric.actor.h"
#include "fdbrpc/ContinuousSample.h"
struct ICounter {
// All counters have a name and value
@ -211,4 +212,43 @@ private:
bands.insert(std::make_pair(value, new Counter(format("Band%f", value), *cc)));
}
};
class LatencySample {
public:
LatencySample(std::string name, UID id, double loggingInterval, int sampleSize) : name(name), id(id), sample(sampleSize), sampleStart(now()) {
logger = recurring([this](){ logSample(); }, loggingInterval);
}
void addMeasurement(double measurement) {
sample.addSample(measurement);
}
private:
std::string name;
UID id;
double sampleStart;
ContinuousSample<double> sample;
Future<Void> logger;
void logSample() {
TraceEvent(name.c_str(), id)
.detail("Count", sample.getPopulationSize())
.detail("Elapsed", now() - sampleStart)
.detail("Min", sample.min())
.detail("Max", sample.max())
.detail("Mean", sample.mean())
.detail("Median", sample.median())
.detail("P25", sample.percentile(0.25))
.detail("P90", sample.percentile(0.9))
.detail("P95", sample.percentile(0.95))
.detail("P99", sample.percentile(0.99))
.detail("P99.9", sample.percentile(0.999))
.trackLatest(id.toString() + "/" + name);
sample.clear();
sampleStart = now();
}
};
#endif

View File

@ -1841,7 +1841,8 @@ Future< Reference<class IAsyncFile> > Sim2FileSystem::open( std::string filename
return f;
}
}
//Simulated disk parameters are shared by the AsyncFileNonDurable and the underlying SimpleFile. This way, they can both keep up with the time to start the next operation
// Simulated disk parameters are shared by the AsyncFileNonDurable and the underlying SimpleFile.
// This way, they can both keep up with the time to start the next operation
Reference<DiskParameters> diskParameters(new DiskParameters(FLOW_KNOBS->SIM_DISK_IOPS, FLOW_KNOBS->SIM_DISK_BANDWIDTH));
machineCache[actualFilename] = AsyncFileNonDurable::open(filename, actualFilename, SimpleFile::open(filename, flags, mode, diskParameters, false), diskParameters);
}

View File

@ -144,6 +144,7 @@ public:
ProcessInfo* getProcess( Endpoint const& endpoint ) { return getProcessByAddress(endpoint.getPrimaryAddress()); }
ProcessInfo* getCurrentProcess() { return currentProcess; }
ProcessInfo const* getCurrentProcess() const { return currentProcess; }
// onProcess: wait for the process to be scheduled by the runloop; a task will be created for the process.
virtual Future<Void> onProcess( ISimulator::ProcessInfo *process, TaskPriority taskID = TaskPriority::Zero ) = 0;
virtual Future<Void> onMachine( ISimulator::ProcessInfo *process, TaskPriority taskID = TaskPriority::Zero ) = 0;

View File

@ -69,10 +69,6 @@ struct VersionedMessage {
}
};
static bool sameArena(const Arena& a, const Arena& b) {
return a.impl.getPtr() == b.impl.getPtr();
}
struct BackupData {
const UID myId;
const Tag tag; // LogRouter tag for this worker, i.e., (-2, i)
@ -340,11 +336,10 @@ struct BackupData {
for (int i = 0; i < num; i++) {
const Arena& a = messages[i].arena;
const Arena& b = messages[i + 1].arena;
if (!sameArena(a, b)) {
if (!a.sameArena(b)) {
bytes += messages[i].bytes;
TraceEvent(SevDebugMemory, "BackupWorkerMemory", myId)
.detail("Release", messages[i].bytes)
.detail("Arena", (void*)a.impl.getPtr());
.detail("Release", messages[i].bytes);
}
}
lock->release(bytes);
@ -904,10 +899,9 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {
// Note we aggressively peek (uncommitted) messages, but only committed
// messages/mutations will be flushed to disk/blob in uploadData().
while (r->hasMessage()) {
if (!sameArena(prev, r->arena())) {
if (!prev.sameArena(r->arena())) {
TraceEvent(SevDebugMemory, "BackupWorkerMemory", self->myId)
.detail("Take", r->arena().getSize())
.detail("Arena", (void*)r->arena().impl.getPtr())
.detail("Current", self->lock->activePermits());
wait(self->lock->take(TaskPriority::DefaultYield, r->arena().getSize()));

View File

@ -774,6 +774,14 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
}
bool foundSrc = false;
for( int i = 0; i < req.src.size(); i++ ) {
if( self->server_info.count( req.src[i] ) ) {
foundSrc = true;
break;
}
}
// Select the best team
// Currently the metric is minimum used disk space (adjusted for data in flight)
// Only healthy teams may be selected. The team has to be healthy at the moment we update
@ -784,7 +792,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// self->teams.size() can be 0 under the ConfigureTest.txt test when we change configurations
// The situation happens rarely. We may want to eliminate this situation someday
if( !self->teams.size() ) {
req.reply.send( Optional<Reference<IDataDistributionTeam>>() );
req.reply.send( std::make_pair(Optional<Reference<IDataDistributionTeam>>(), foundSrc) );
return Void();
}
@ -810,7 +818,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
}
if(found && teamList[j]->isHealthy()) {
req.reply.send( teamList[j] );
bestOption = teamList[j];
req.reply.send( std::make_pair(bestOption, foundSrc) );
return Void();
}
}
@ -901,7 +910,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
}
}
if(found) {
req.reply.send( teamList[j] );
bestOption = teamList[j];
req.reply.send( std::make_pair(bestOption, foundSrc) );
return Void();
}
}
@ -912,7 +922,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// self->traceAllInfo(true);
// }
req.reply.send( bestOption );
req.reply.send( std::make_pair(bestOption, foundSrc) );
return Void();
} catch( Error &e ) {
@ -3156,7 +3166,8 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
: SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY);
}
} else {
TEST(true); // A removed server is still associated with a team in SABTF
TEST(true); // A removed server is still associated with a team in
// ShardsAffectedByTeamFailure
}
}
}
@ -4579,8 +4590,9 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self, Promise
shardsAffectedByTeamFailure->moveShard(keys, teams);
if (initData->shards[shard].hasDest) {
// This shard is already in flight. Ideally we should use dest in sABTF and generate a dataDistributionRelocator directly in
// DataDistributionQueue to track it, but it's easier to just (with low priority) schedule it for movement.
// This shard is already in flight. Ideally we should use dest in ShardsAffectedByTeamFailure and
// generate a dataDistributionRelocator directly in DataDistributionQueue to track it, but it's
// easier to just (with low priority) schedule it for movement.
bool unhealthy = initData->shards[shard].primarySrc.size() != configuration.storageTeamSize;
if (!unhealthy && configuration.usableRegions > 1) {
unhealthy = initData->shards[shard].remoteSrc.size() != configuration.storageTeamSize;

View File

@ -77,7 +77,8 @@ struct GetTeamRequest {
bool teamMustHaveShards;
double inflightPenalty;
std::vector<UID> completeSources;
Promise< Optional< Reference<IDataDistributionTeam> > > reply;
std::vector<UID> src;
Promise< std::pair<Optional<Reference<IDataDistributionTeam>>,bool> > reply;
GetTeamRequest() {}
GetTeamRequest( bool wantsNewServers, bool wantsTrueBest, bool preferLowerUtilization, bool teamMustHaveShards, double inflightPenalty = 1.0 )

View File

@ -358,6 +358,7 @@ struct DDQueueData {
FlowLock startMoveKeysParallelismLock;
FlowLock finishMoveKeysParallelismLock;
Reference<FlowLock> fetchSourceLock;
int activeRelocations;
int queuedRelocations;
@ -380,7 +381,7 @@ struct DDQueueData {
Promise<Void> error;
PromiseStream<RelocateData> dataTransferComplete;
PromiseStream<RelocateData> relocationComplete;
PromiseStream<RelocateData> fetchSourceServersComplete;
PromiseStream<RelocateData> fetchSourceServersComplete; // find source SSs for a relocate range
PromiseStream<RelocateShard> output;
FutureStream<RelocateShard> input;
@ -390,7 +391,8 @@ struct DDQueueData {
double lastInterval;
int suppressIntervals;
Reference<AsyncVar<bool>> rawProcessingUnhealthy; //many operations will remove relocations before adding a new one, so delay a small time before settling on a new number.
Reference<AsyncVar<bool>> rawProcessingUnhealthy; // many operations will remove relocations before adding a new
// one, so delay a small time before settling on a new number.
std::map<int, int> priority_relocations;
int unhealthyRelocations;
@ -425,7 +427,7 @@ struct DDQueueData {
activeRelocations( 0 ), queuedRelocations( 0 ), bytesWritten ( 0 ), teamCollections( teamCollections ),
shardsAffectedByTeamFailure( sABTF ), getAverageShardBytes( getAverageShardBytes ), distributorId( mid ), lock( lock ),
cx( cx ), teamSize( teamSize ), singleRegionTeamSize( singleRegionTeamSize ), output( output ), input( input ), getShardMetrics( getShardMetrics ), startMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ),
finishMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ), lastLimited(lastLimited),
finishMoveKeysParallelismLock( SERVER_KNOBS->DD_MOVE_KEYS_PARALLELISM ), fetchSourceLock( new FlowLock(SERVER_KNOBS->DD_FETCH_SOURCE_PARALLELISM) ), lastLimited(lastLimited),
suppressIntervals(0), lastInterval(0), unhealthyRelocations(0), rawProcessingUnhealthy( new AsyncVar<bool>(false) ) {}
void validate() {
@ -531,7 +533,7 @@ struct DDQueueData {
}
}
ACTOR Future<Void> getSourceServersForRange( Database cx, RelocateData input, PromiseStream<RelocateData> output ) {
ACTOR Future<Void> getSourceServersForRange( Database cx, RelocateData input, PromiseStream<RelocateData> output, Reference<FlowLock> fetchLock ) {
state std::set<UID> servers;
state Transaction tr(cx);
@ -542,6 +544,9 @@ struct DDQueueData {
wait( delay( 0.0001, TaskPriority::DataDistributionLaunch ) );
}
wait( fetchLock->take( TaskPriority::DataDistributionLaunch ) );
state FlowLock::Releaser releaser( *fetchLock );
loop {
servers.clear();
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
@ -683,7 +688,7 @@ struct DDQueueData {
startRelocation(rrs.priority, rrs.healthPriority);
fetchingSourcesQueue.insert( rrs );
getSourceActors.insert( rrs.keys, getSourceServersForRange( cx, rrs, fetchSourceServersComplete ) );
getSourceActors.insert( rrs.keys, getSourceServersForRange( cx, rrs, fetchSourceServersComplete, fetchSourceLock ) );
} else {
RelocateData newData( rrs );
newData.keys = affectedQueuedItems[r];
@ -783,8 +788,8 @@ struct DDQueueData {
}
// For each relocateData rd in the queue, check if there exist inflight relocate data whose keyrange is overlapped
// with rd. If there exist, cancel them by cancel their actors and reduce the src servers' busyness of those
// canceled inflight relocateData Launch the relocation for the rd.
// with rd. If there exist, cancel them by cancelling their actors and reducing the src servers' busyness of those
// canceled inflight relocateData. Launch the relocation for the rd.
void launchQueuedWork( std::set<RelocateData, std::greater<RelocateData>> combined ) {
int startedHere = 0;
double startTime = now();
@ -938,36 +943,36 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
allHealthy = true;
anyWithSource = false;
bestTeams.clear();
// Get team from teamCollections in diffrent DCs and find the best one
while( tciIndex < self->teamCollections.size() ) {
double inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_HEALTHY;
if(rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_UNHEALTHY || rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_2_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_UNHEALTHY;
if(rd.healthPriority == SERVER_KNOBS->PRIORITY_POPULATE_REGION || rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_1_LEFT || rd.healthPriority == SERVER_KNOBS->PRIORITY_TEAM_0_LEFT) inflightPenalty = SERVER_KNOBS->INFLIGHT_PENALTY_ONE_LEFT;
auto req = GetTeamRequest(rd.wantsNewServers, rd.priority == SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, true, false, inflightPenalty);
req.src = rd.src;
req.completeSources = rd.completeSources;
Optional<Reference<IDataDistributionTeam>> bestTeam = wait(brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req)));
// bestTeam.second = false if the bestTeam in the teamCollection (in the DC) does not have any
// server that hosts the relocateData. This is possible, for example, in a fearless configuration
// when the remote DC is just brought up.
std::pair<Optional<Reference<IDataDistributionTeam>>,bool> bestTeam = wait(brokenPromiseToNever(self->teamCollections[tciIndex].getTeam.getReply(req)));
// If a DC has no healthy team, we stop checking the other DCs until
// the unhealthy DC is healthy again or is excluded.
if(!bestTeam.present()) {
if(!bestTeam.first.present()) {
foundTeams = false;
break;
}
if(!bestTeam.get()->isHealthy()) {
if(!bestTeam.first.get()->isHealthy()) {
allHealthy = false;
} else {
anyHealthy = true;
}
bool foundSource = false;
if(!rd.wantsNewServers && self->teamCollections.size() > 1) {
for(auto& it : bestTeam.get()->getServerIDs()) {
if(std::find(rd.src.begin(), rd.src.end(), it) != rd.src.end()) {
foundSource = true;
anyWithSource = true;
break;
}
}
if(bestTeam.second) {
anyWithSource = true;
}
bestTeams.push_back(std::make_pair(bestTeam.get(), foundSource));
bestTeams.push_back(std::make_pair(bestTeam.first.get(), bestTeam.second));
tciIndex++;
}
if (foundTeams && anyHealthy) {
@ -994,8 +999,10 @@ ACTOR Future<Void> dataDistributionRelocator( DDQueueData *self, RelocateData rd
destinationTeams.push_back(ShardsAffectedByTeamFailure::Team(serverIds, i == 0));
if (allHealthy && anyWithSource && !bestTeams[i].second) {
// When all teams in bestTeams[i] do not hold the shard
// We randomly choose a server in bestTeams[i] as the shard's destination and
// When all servers in bestTeams[i] do not hold the shard (!bestTeams[i].second), it indicates
// the bestTeams[i] is in a new DC where data has not been replicated to.
// To move data (specified in RelocateShard) to bestTeams[i] in the new DC AND reduce data movement
// across DC, we randomly choose a server in bestTeams[i] as the shard's destination, and
// move the shard to the randomly chosen server (in the remote DC), which will later
// propogate its data to the servers in the same team. This saves data movement bandwidth across DC
int idx = deterministicRandom()->randomInt(0, serverIds.size());
@ -1202,7 +1209,7 @@ ACTOR Future<bool> rebalanceTeams( DDQueueData* self, int priority, Reference<ID
return false;
}
//verify the shard is still in sabtf
// Verify the shard is still in ShardsAffectedByTeamFailure
shards = self->shardsAffectedByTeamFailure->getShardsFor( ShardsAffectedByTeamFailure::Team( sourceTeam->getServerIDs(), primary ) );
for( int i = 0; i < shards.size(); i++ ) {
if( moveShard == shards[i] ) {
@ -1223,7 +1230,7 @@ ACTOR Future<Void> BgDDMountainChopper( DDQueueData* self, int teamCollectionInd
state double lastRead = 0;
state bool skipCurrentLoop = false;
loop {
state Optional<Reference<IDataDistributionTeam>> randomTeam;
state std::pair<Optional<Reference<IDataDistributionTeam>>, bool> randomTeam;
state bool moved = false;
state TraceEvent traceEvent("BgDDMountainChopper", self->distributorId);
traceEvent.suppressFor(5.0)
@ -1259,26 +1266,26 @@ ACTOR Future<Void> BgDDMountainChopper( DDQueueData* self, int teamCollectionInd
traceEvent.detail("QueuedRelocations", self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM]);
if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM] <
SERVER_KNOBS->DD_REBALANCE_PARALLELISM) {
Optional<Reference<IDataDistributionTeam>> _randomTeam = wait(brokenPromiseToNever(
std::pair<Optional<Reference<IDataDistributionTeam>>,bool> _randomTeam = wait(brokenPromiseToNever(
self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, false, true, false))));
randomTeam = _randomTeam;
traceEvent.detail("DestTeam", printable(randomTeam.map<std::string>([](const Reference<IDataDistributionTeam>& team){
traceEvent.detail("DestTeam", printable(randomTeam.first.map<std::string>([](const Reference<IDataDistributionTeam>& team){
return team->getDesc();
})));
if (randomTeam.present()) {
Optional<Reference<IDataDistributionTeam>> loadedTeam =
if (randomTeam.first.present()) {
std::pair<Optional<Reference<IDataDistributionTeam>>,bool> loadedTeam =
wait(brokenPromiseToNever(self->teamCollections[teamCollectionIndex].getTeam.getReply(
GetTeamRequest(true, true, false, true))));
traceEvent.detail("SourceTeam", printable(loadedTeam.map<std::string>([](const Reference<IDataDistributionTeam>& team){
traceEvent.detail("SourceTeam", printable(loadedTeam.first.map<std::string>([](const Reference<IDataDistributionTeam>& team){
return team->getDesc();
})));
if (loadedTeam.present()) {
if (loadedTeam.first.present()) {
bool _moved =
wait(rebalanceTeams(self, SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM, loadedTeam.get(),
randomTeam.get(), teamCollectionIndex == 0, &traceEvent));
wait(rebalanceTeams(self, SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM, loadedTeam.first.get(),
randomTeam.first.get(), teamCollectionIndex == 0, &traceEvent));
moved = _moved;
if (moved) {
resetCount = 0;
@ -1323,7 +1330,7 @@ ACTOR Future<Void> BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex)
state bool skipCurrentLoop = false;
loop {
state Optional<Reference<IDataDistributionTeam>> randomTeam;
state std::pair<Optional<Reference<IDataDistributionTeam>>, bool> randomTeam;
state bool moved = false;
state TraceEvent traceEvent("BgDDValleyFiller", self->distributorId);
traceEvent.suppressFor(5.0)
@ -1359,25 +1366,25 @@ ACTOR Future<Void> BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex)
traceEvent.detail("QueuedRelocations", self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM]);
if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM] <
SERVER_KNOBS->DD_REBALANCE_PARALLELISM) {
Optional<Reference<IDataDistributionTeam>> _randomTeam = wait(brokenPromiseToNever(
std::pair<Optional<Reference<IDataDistributionTeam>>,bool> _randomTeam = wait(brokenPromiseToNever(
self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, false, false, true))));
randomTeam = _randomTeam;
traceEvent.detail("SourceTeam", printable(randomTeam.map<std::string>([](const Reference<IDataDistributionTeam>& team){
traceEvent.detail("SourceTeam", printable(randomTeam.first.map<std::string>([](const Reference<IDataDistributionTeam>& team){
return team->getDesc();
})));
if (randomTeam.present()) {
Optional<Reference<IDataDistributionTeam>> unloadedTeam = wait(brokenPromiseToNever(
if (randomTeam.first.present()) {
std::pair<Optional<Reference<IDataDistributionTeam>>,bool> unloadedTeam = wait(brokenPromiseToNever(
self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, true, true, false))));
traceEvent.detail("DestTeam", printable(unloadedTeam.map<std::string>([](const Reference<IDataDistributionTeam>& team){
traceEvent.detail("DestTeam", printable(unloadedTeam.first.map<std::string>([](const Reference<IDataDistributionTeam>& team){
return team->getDesc();
})));
if (unloadedTeam.present()) {
if (unloadedTeam.first.present()) {
bool _moved =
wait(rebalanceTeams(self, SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, randomTeam.get(),
unloadedTeam.get(), teamCollectionIndex == 0, &traceEvent));
wait(rebalanceTeams(self, SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM, randomTeam.first.get(),
unloadedTeam.first.get(), teamCollectionIndex == 0, &traceEvent));
moved = _moved;
if (moved) {
resetCount = 0;

View File

@ -90,6 +90,12 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( TLOG_MAX_CREATE_DURATION, 10.0 );
init( PEEK_LOGGING_AMOUNT, 5 );
init( PEEK_LOGGING_DELAY, 5.0 );
init( PEEK_RESET_INTERVAL, 300.0 ); if ( randomize && BUGGIFY ) PEEK_RESET_INTERVAL = 20.0;
init( PEEK_MAX_LATENCY, 0.5 ); if ( randomize && BUGGIFY ) PEEK_MAX_LATENCY = 0.0;
init( PEEK_COUNT_SMALL_MESSAGES, false ); if ( randomize && BUGGIFY ) PEEK_COUNT_SMALL_MESSAGES = true;
init( PEEK_STATS_INTERVAL, 10.0 );
init( PEEK_STATS_SLOW_AMOUNT, 0 );
init( PEEK_STATS_SLOW_RATIO, 0.5 );
// disk snapshot max timeout, to be put in TLog, storage and coordinator nodes
init( SNAP_CREATE_MAX_TIMEOUT, 300.0 );
@ -216,6 +222,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( DD_SHARD_SIZE_GRANULARITY, 5000000 );
init( DD_SHARD_SIZE_GRANULARITY_SIM, 500000 ); if( randomize && BUGGIFY ) DD_SHARD_SIZE_GRANULARITY_SIM = 0;
init( DD_MOVE_KEYS_PARALLELISM, 15 ); if( randomize && BUGGIFY ) DD_MOVE_KEYS_PARALLELISM = 1;
init( DD_FETCH_SOURCE_PARALLELISM, 1000 ); if( randomize && BUGGIFY ) DD_FETCH_SOURCE_PARALLELISM = 1;
init( DD_MERGE_LIMIT, 2000 ); if( randomize && BUGGIFY ) DD_MERGE_LIMIT = 2;
init( DD_SHARD_METRICS_TIMEOUT, 60.0 ); if( randomize && BUGGIFY ) DD_SHARD_METRICS_TIMEOUT = 0.1;
init( DD_LOCATION_CACHE_SIZE, 2000000 ); if( randomize && BUGGIFY ) DD_LOCATION_CACHE_SIZE = 3;
@ -640,6 +647,10 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( REDWOOD_REMAP_CLEANUP_VERSION_LAG_MIN, 4 );
init( REDWOOD_REMAP_CLEANUP_VERSION_LAG_MAX, 15 );
init( REDWOOD_LOGGING_INTERVAL, 5.0 );
// Server request latency measurement
init( LATENCY_SAMPLE_SIZE, 100000 );
init( LATENCY_METRICS_LOGGING_INTERVAL, 60.0 );
// clang-format on

View File

@ -88,6 +88,12 @@ public:
double TLOG_MAX_CREATE_DURATION;
int PEEK_LOGGING_AMOUNT;
double PEEK_LOGGING_DELAY;
double PEEK_RESET_INTERVAL;
double PEEK_MAX_LATENCY;
bool PEEK_COUNT_SMALL_MESSAGES;
double PEEK_STATS_INTERVAL;
double PEEK_STATS_SLOW_AMOUNT;
double PEEK_STATS_SLOW_RATIO;
// Data distribution queue
double HEALTH_POLL_TIME;
@ -165,6 +171,7 @@ public:
int64_t DD_SHARD_SIZE_GRANULARITY;
int64_t DD_SHARD_SIZE_GRANULARITY_SIM;
int DD_MOVE_KEYS_PARALLELISM;
int DD_FETCH_SOURCE_PARALLELISM;
int DD_MERGE_LIMIT;
double DD_SHARD_METRICS_TIMEOUT;
int64_t DD_LOCATION_CACHE_SIZE;
@ -572,6 +579,10 @@ public:
int REDWOOD_REMAP_CLEANUP_VERSION_LAG_MIN; // Number of versions between head of remap queue and oldest retained version before remap cleanup starts
int REDWOOD_REMAP_CLEANUP_VERSION_LAG_MAX; // Number of versions between head of remap queue and oldest retained version before remap cleanup may stop
double REDWOOD_LOGGING_INTERVAL;
// Server request latency measurement
int LATENCY_SAMPLE_SIZE;
double LATENCY_METRICS_LOGGING_INTERVAL;
ServerKnobs();
void initialize(bool randomize = false, ClientKnobs* clientKnobs = NULL, bool isSimulated = false);

View File

@ -20,6 +20,7 @@
#include "flow/ActorCollection.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbrpc/Stats.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/Knobs.h"
@ -30,7 +31,6 @@
#include "fdbserver/RecoveryState.h"
#include "fdbclient/Atomic.h"
#include "flow/TDMetric.actor.h"
#include "flow/Stats.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct LogRouterData {

View File

@ -407,6 +407,12 @@ struct ILogSystem {
Deque<Future<TLogPeekReply>> futureResults;
Future<Void> interfaceChanged;
double lastReset;
Future<Void> resetCheck;
int slowReplies;
int fastReplies;
int unknownReplies;
ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf, Tag tag, Version begin, Version end, bool returnIfBlocked, bool parallelGetMore );
ServerPeekCursor( TLogPeekReply const& results, LogMessageVersion const& messageVersion, LogMessageVersion const& end, TagsAndMessage const& message, bool hasMsg, Version poppedVersion, Tag tag );

View File

@ -26,14 +26,17 @@
#include "flow/actorcompiler.h" // has to be last include
ILogSystem::ServerPeekCursor::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf, Tag tag, Version begin, Version end, bool returnIfBlocked, bool parallelGetMore )
: interf(interf), tag(tag), messageVersion(begin), end(end), hasMsg(false), rd(results.arena, results.messages, Unversioned()), randomID(deterministicRandom()->randomUniqueID()), poppedVersion(0), returnIfBlocked(returnIfBlocked), sequence(0), onlySpilled(false), parallelGetMore(parallelGetMore) {
: interf(interf), tag(tag), messageVersion(begin), end(end), hasMsg(false), rd(results.arena, results.messages, Unversioned()), randomID(deterministicRandom()->randomUniqueID()), poppedVersion(0),
returnIfBlocked(returnIfBlocked), sequence(0), onlySpilled(false), parallelGetMore(parallelGetMore), lastReset(0), slowReplies(0), fastReplies(0), unknownReplies(0), resetCheck(Void())
{
this->results.maxKnownVersion = 0;
this->results.minKnownCommittedVersion = 0;
//TraceEvent("SPC_Starting", randomID).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).backtrace();
}
ILogSystem::ServerPeekCursor::ServerPeekCursor( TLogPeekReply const& results, LogMessageVersion const& messageVersion, LogMessageVersion const& end, TagsAndMessage const& message, bool hasMsg, Version poppedVersion, Tag tag )
: results(results), tag(tag), rd(results.arena, results.messages, Unversioned()), messageVersion(messageVersion), end(end), messageAndTags(message), hasMsg(hasMsg), randomID(deterministicRandom()->randomUniqueID()), poppedVersion(poppedVersion), returnIfBlocked(false), sequence(0), onlySpilled(false), parallelGetMore(false)
: results(results), tag(tag), rd(results.arena, results.messages, Unversioned()), messageVersion(messageVersion), end(end), messageAndTags(message), hasMsg(hasMsg),
randomID(deterministicRandom()->randomUniqueID()), poppedVersion(poppedVersion), returnIfBlocked(false), sequence(0), onlySpilled(false), parallelGetMore(false), lastReset(0), slowReplies(0), fastReplies(0), unknownReplies(0), resetCheck(Void())
{
//TraceEvent("SPC_Clone", randomID);
this->results.maxKnownVersion = 0;
@ -133,6 +136,46 @@ void ILogSystem::ServerPeekCursor::advanceTo(LogMessageVersion n) {
}
}
ACTOR Future<Void> resetChecker( ILogSystem::ServerPeekCursor* self, NetworkAddress addr ) {
self->slowReplies = 0;
self->unknownReplies = 0;
self->fastReplies = 0;
wait(delay(SERVER_KNOBS->PEEK_STATS_INTERVAL));
TraceEvent("SlowPeekStats").detail("SlowReplies", self->slowReplies).detail("FastReplies", self->fastReplies).detail("UnknownReplies", self->unknownReplies);
if(self->slowReplies >= SERVER_KNOBS->PEEK_STATS_SLOW_AMOUNT && self->slowReplies/double(self->slowReplies+self->fastReplies) >= SERVER_KNOBS->PEEK_STATS_SLOW_RATIO) {
FlowTransport::transport().resetConnection(addr);
self->lastReset = now();
}
return Void();
}
ACTOR Future<TLogPeekReply> recordRequestMetrics( ILogSystem::ServerPeekCursor* self, NetworkAddress addr, Future<TLogPeekReply> in ) {
try {
state double startTime = now();
TLogPeekReply t = wait(in);
if(now()-self->lastReset > SERVER_KNOBS->PEEK_RESET_INTERVAL) {
if(now()-startTime > SERVER_KNOBS->PEEK_MAX_LATENCY) {
if(t.messages.size() >= SERVER_KNOBS->DESIRED_TOTAL_BYTES || SERVER_KNOBS->PEEK_COUNT_SMALL_MESSAGES) {
if(self->resetCheck.isReady()) {
self->resetCheck = resetChecker(self, addr);
}
self->slowReplies++;
} else {
self->unknownReplies++;
}
} else {
self->fastReplies++;
}
}
return t;
} catch (Error& e) {
if (e.code() != error_code_broken_promise)
throw;
wait(Never()); // never return
throw internal_error(); // does not happen
}
}
ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self, TaskPriority taskID ) {
if( !self->interf || self->messageVersion >= self->end ) {
if( self->hasMessage() )
@ -150,7 +193,7 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
try {
if (self->parallelGetMore || self->onlySpilled) {
while(self->futureResults.size() < SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->interf->get().present()) {
self->futureResults.push_back( brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, self->onlySpilled, std::make_pair(self->randomID, self->sequence++)), taskID) ) );
self->futureResults.push_back( recordRequestMetrics( self, self->interf->get().interf().peekMessages.getEndpoint().getPrimaryAddress(), self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, self->onlySpilled, std::make_pair(self->randomID, self->sequence++)), taskID) ) );
}
if (self->sequence == std::numeric_limits<decltype(self->sequence)>::max()) {
throw operation_obsolete();

View File

@ -28,6 +28,7 @@
#include "fdbclient/Notified.h"
#include "fdbclient/SystemData.h"
#include "fdbrpc/sim_validation.h"
#include "fdbrpc/Stats.h"
#include "fdbserver/ApplyMetadataMutation.h"
#include "fdbserver/ConflictSet.h"
#include "fdbserver/DataDistributorInterface.h"
@ -46,7 +47,6 @@
#include "flow/ActorCollection.h"
#include "flow/IRandom.h"
#include "flow/Knobs.h"
#include "flow/Stats.h"
#include "flow/TDMetric.actor.h"
#include "flow/Tracing.h"
#include "flow/actorcompiler.h" // This must be the last #include.
@ -92,6 +92,9 @@ struct ProxyStats {
Counter keyServerLocationIn, keyServerLocationOut, keyServerLocationErrors;
Version lastCommitVersionAssigned;
LatencySample commitLatencySample;
LatencySample grvLatencySample;
LatencyBands commitLatencyBands;
LatencyBands grvLatencyBands;
@ -140,6 +143,8 @@ struct ProxyStats {
conflictRanges("ConflictRanges", cc), keyServerLocationIn("KeyServerLocationIn", cc),
keyServerLocationOut("KeyServerLocationOut", cc), keyServerLocationErrors("KeyServerLocationErrors", cc),
lastCommitVersionAssigned(0),
commitLatencySample("CommitLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
grvLatencySample("GRVLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
commitLatencyBands("CommitLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
grvLatencyBands("GRVLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
specialCounter(cc, "LastAssignedCommitVersion", [this](){return this->lastCommitVersionAssigned;});
@ -226,10 +231,13 @@ struct TransactionRateInfo {
}
};
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, TransactionRateInfo *transactionRateInfo,
TransactionRateInfo *batchTransactionRateInfo, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply,
TransactionTagMap<uint64_t>* transactionTagCounter, PrioritizedTransactionTagMap<ClientTagThrottleLimits>* throttledTags) {
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount,
int64_t* inBatchTransactionCount, TransactionRateInfo* transactionRateInfo,
TransactionRateInfo* batchTransactionRateInfo, GetHealthMetricsReply* healthMetricsReply,
GetHealthMetricsReply* detailedHealthMetricsReply,
TransactionTagMap<uint64_t>* transactionTagCounter,
PrioritizedTransactionTagMap<ClientTagThrottleLimits>* throttledTags,
TransactionTagMap<TransactionCommitCostEstimation>* transactionTagCommitCostEst) {
state Future<Void> nextRequestTimer = Never();
state Future<Void> leaseTimeout = Never();
state Future<GetRateInfoReply> reply = Never();
@ -253,8 +261,18 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
when ( wait( nextRequestTimer ) ) {
nextRequestTimer = Never();
bool detailed = now() - lastDetailedReply > SERVER_KNOBS->DETAILED_METRIC_UPDATE_RATE;
reply = brokenPromiseToNever(db->get().ratekeeper.get().getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, *transactionTagCounter, detailed)));
TransactionTagMap<uint64_t> tagCounts;
for(auto itr : *throttledTags) {
for(auto priorityThrottles : itr.second) {
tagCounts[priorityThrottles.first] = (*transactionTagCounter)[priorityThrottles.first];
}
}
reply = brokenPromiseToNever(db->get().ratekeeper.get().getRateInfo.getReply(
GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, *transactionTagCounter,
*transactionTagCommitCostEst, detailed)));
transactionTagCounter->clear();
transactionTagCommitCostEst->clear();
expectingDetailedReply = detailed;
}
when ( GetRateInfoReply rep = wait(reply) ) {
@ -429,6 +447,7 @@ struct ProxyCommitData {
NotifiedDouble lastCommitTime;
vector<double> commitComputePerOperation;
TransactionTagMap<TransactionCommitCostEstimation> transactionTagCommitCostEst;
//The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly more CPU efficient.
//When a tag related to a storage server does change, we empty out all of these vectors to signify they must be repopulated.
@ -1320,6 +1339,14 @@ ACTOR Future<Void> commitBatch(
if (committed[t] == ConflictBatch::TransactionCommitted && (!locked || trs[t].isLockAware())) {
ASSERT_WE_THINK(commitVersion != invalidVersion);
trs[t].reply.send(CommitID(commitVersion, t, metadataVersionAfter));
// aggregate commit cost estimation if committed
ASSERT(trs[t].commitCostEstimation.present() == trs[t].tagSet.present());
if (trs[t].tagSet.present()) {
TransactionCommitCostEstimation& costEstimation = trs[t].commitCostEstimation.get();
for (auto& tag : trs[t].tagSet.get()) {
self->transactionTagCommitCostEst[tag] += costEstimation;
}
}
}
else if (committed[t] == ConflictBatch::TransactionTooOld) {
trs[t].reply.sendError(transaction_too_old());
@ -1353,9 +1380,11 @@ ACTOR Future<Void> commitBatch(
for (int resolverInd : transactionResolverMap[t]) nextTr[resolverInd]++;
// TODO: filter if pipelined with large commit
double duration = endTime - trs[t].requestTime();
self->stats.commitLatencySample.addMeasurement(duration);
if(self->latencyBandConfig.present()) {
bool filter = maxTransactionBytes > self->latencyBandConfig.get().commitConfig.maxCommitBytes.orDefault(std::numeric_limits<int>::max());
self->stats.commitLatencyBands.addMeasurement(endTime - trs[t].requestTime(), filter);
self->stats.commitLatencyBands.addMeasurement(duration, filter);
}
}
@ -1474,8 +1503,12 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::
double end = g_network->timer();
for(GetReadVersionRequest const& request : requests) {
double duration = end - request.requestTime();
if(request.priority == TransactionPriority::DEFAULT) {
stats->grvLatencySample.addMeasurement(duration);
}
if(request.priority >= TransactionPriority::DEFAULT) {
stats->grvLatencyBands.addMeasurement(end - request.requestTime());
stats->grvLatencyBands.addMeasurement(duration);
}
if (request.flags & GetReadVersionRequest::FLAG_USE_MIN_KNOWN_COMMITTED_VERSION) {
@ -1494,8 +1527,13 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::
auto tagItr = priorityThrottledTags.find(tag.first);
if(tagItr != priorityThrottledTags.end()) {
if(tagItr->second.expiration > now()) {
TEST(true); // Proxy returning tag throttle
reply.tagThrottleInfo[tag.first] = tagItr->second;
if(tagItr->second.tpsRate == std::numeric_limits<double>::max()) {
TEST(true); // Auto TPS rate is unlimited
}
else {
TEST(true); // Proxy returning tag throttle
reply.tagThrottleInfo[tag.first] = tagItr->second;
}
}
else {
// This isn't required, but we might as well
@ -1540,7 +1578,9 @@ ACTOR static Future<Void> transactionStarter(
state PromiseStream<double> replyTimes;
state Span span;
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo, &batchRateInfo, healthMetricsReply, detailedHealthMetricsReply, &transactionTagCounter, &throttledTags));
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo, &batchRateInfo,
healthMetricsReply, detailedHealthMetricsReply, &transactionTagCounter, &throttledTags,
&(commitData->transactionTagCommitCostEst)));
addActor.send(queueTransactionStartRequests(db, &systemQueue, &defaultQueue, &batchQueue, proxy.getConsistentReadVersion.getFuture(),
GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats, &batchRateInfo,
&transactionTagCounter));

View File

@ -19,7 +19,6 @@
*/
#include "flow/Hash3.h"
#include "flow/Stats.h"
#include "flow/UnitTest.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/Notified.h"
@ -33,6 +32,7 @@
#include "fdbrpc/FailureMonitor.h"
#include "fdbserver/IDiskQueue.h"
#include "fdbrpc/sim_validation.h"
#include "fdbrpc/Stats.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/WaitFailure.h"

View File

@ -19,7 +19,6 @@
*/
#include "flow/Hash3.h"
#include "flow/Stats.h"
#include "flow/UnitTest.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/Notified.h"
@ -35,6 +34,7 @@
#include "fdbserver/IDiskQueue.h"
#include "fdbrpc/sim_validation.h"
#include "fdbrpc/simulator.h"
#include "fdbrpc/Stats.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/WaitFailure.h"

View File

@ -20,7 +20,6 @@
*/
#include "flow/Hash3.h"
#include "flow/Stats.h"
#include "flow/UnitTest.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/Notified.h"
@ -36,6 +35,7 @@
#include "fdbserver/IDiskQueue.h"
#include "fdbrpc/sim_validation.h"
#include "fdbrpc/simulator.h"
#include "fdbrpc/Stats.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/WaitFailure.h"

View File

@ -149,7 +149,7 @@ private:
RkTagThrottleData() : clientRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {}
double getTargetRate(Optional<double> requestRate) {
if(limits.tpsRate == 0.0 || !requestRate.present() || requestRate.get() == 0.0) {
if(limits.tpsRate == 0.0 || !requestRate.present() || requestRate.get() == 0.0 || !rateSet) {
return limits.tpsRate;
}
else {
@ -162,7 +162,7 @@ private:
double targetRate = getTargetRate(requestRate);
if(targetRate == std::numeric_limits<double>::max()) {
rateSet = false;
return Optional<double>();
return targetRate;
}
if(!rateSet) {
rateSet = true;
@ -184,6 +184,10 @@ private:
}
};
void initializeTag(TransactionTag const& tag) {
tagData.try_emplace(tag);
}
public:
RkTagThrottleCollection() {}
@ -217,17 +221,30 @@ public:
ASSERT(!expiration.present() || expiration.get() > now());
auto itr = autoThrottledTags.find(tag);
if(itr == autoThrottledTags.end() && autoThrottledTags.size() >= SERVER_KNOBS->MAX_AUTO_THROTTLED_TRANSACTION_TAGS) {
TEST(true); // Reached auto-throttle limit
return Optional<double>();
bool present = (itr != autoThrottledTags.end());
if(!present) {
if(autoThrottledTags.size() >= SERVER_KNOBS->MAX_AUTO_THROTTLED_TRANSACTION_TAGS) {
TEST(true); // Reached auto-throttle limit
return Optional<double>();
}
itr = autoThrottledTags.try_emplace(tag).first;
initializeTag(tag);
}
else if(itr->second.limits.expiration <= now()) {
TEST(true); // Re-throttling expired tag that hasn't been cleaned up
present = false;
itr->second = RkTagThrottleData();
}
itr = autoThrottledTags.try_emplace(itr, tag);
auto &throttle = itr->second;
if(!tpsRate.present()) {
if(now() <= throttle.created + SERVER_KNOBS->AUTO_TAG_THROTTLE_START_AGGREGATION_TIME) {
tpsRate = std::numeric_limits<double>::max();
if(present) {
return Optional<double>();
}
}
else if(now() <= throttle.lastUpdated + SERVER_KNOBS->AUTO_TAG_THROTTLE_UPDATE_FREQUENCY) {
TEST(true); // Tag auto-throttled too quickly
@ -253,15 +270,19 @@ public:
ASSERT(tpsRate.present() && tpsRate.get() >= 0);
TraceEvent("RkSetAutoThrottle", id)
.detail("Tag", tag)
.detail("TargetRate", tpsRate.get())
.detail("Expiration", expiration.get() - now());
throttle.limits.tpsRate = tpsRate.get();
throttle.limits.expiration = expiration.get();
throttle.updateAndGetClientRate(getRequestRate(tag));
Optional<double> clientRate = throttle.updateAndGetClientRate(getRequestRate(tag));
TraceEvent("RkSetAutoThrottle", id)
.detail("Tag", tag)
.detail("TargetRate", tpsRate.get())
.detail("Expiration", expiration.get() - now())
.detail("ClientRate", clientRate)
.detail("Created", now()-throttle.created)
.detail("LastUpdate", now()-throttle.lastUpdated)
.detail("LastReduced", now()-throttle.lastReduced);
if(tpsRate.get() != std::numeric_limits<double>::max()) {
return tpsRate.get();
@ -278,6 +299,7 @@ public:
auto &priorityThrottleMap = manualThrottledTags[tag];
auto result = priorityThrottleMap.try_emplace(priority);
initializeTag(tag);
ASSERT(result.second); // Updating to the map is done by copying the whole map
result.first->second.limits.tpsRate = tpsRate;
@ -365,7 +387,8 @@ public:
Optional<double> autoClientRate = autoItr->second.updateAndGetClientRate(requestRate);
if(autoClientRate.present()) {
double adjustedRate = autoClientRate.get();
if(now() >= autoItr->second.lastReduced + SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME) {
double rampStartTime = autoItr->second.lastReduced + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION - SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME;
if(now() >= rampStartTime && adjustedRate != std::numeric_limits<double>::max()) {
TEST(true); // Tag auto-throttle ramping up
double targetBusyness = SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS;
@ -373,7 +396,7 @@ public:
targetBusyness = 0.01;
}
double rampLocation = (now() - autoItr->second.lastReduced - SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME) / targetBusyness;
double rampLocation = (now() - rampStartTime) / SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME;
adjustedRate = computeTargetTpsRate(targetBusyness, pow(targetBusyness, 1 - rampLocation), adjustedRate);
}
@ -388,8 +411,14 @@ public:
clientRates[TransactionPriority::BATCH][tagItr->first] = ClientTagThrottleLimits(0, autoItr->second.limits.expiration);
}
else {
ASSERT(autoItr->second.limits.expiration <= now());
TEST(true); // Auto throttle expired
autoThrottledTags.erase(autoItr);
if(BUGGIFY) { // Temporarily extend the window between expiration and cleanup
tagPresent = true;
}
else {
autoThrottledTags.erase(autoItr);
}
}
}
@ -406,23 +435,24 @@ public:
}
void addRequests(TransactionTag const& tag, int requests) {
TEST(true); // Requests reported for throttled tag
ASSERT(requests > 0);
if(requests > 0) {
TEST(true); // Requests reported for throttled tag
auto tagItr = tagData.try_emplace(tag);
tagItr.first->second.requestRate.addDelta(requests);
auto tagItr = tagData.try_emplace(tag);
tagItr.first->second.requestRate.addDelta(requests);
double requestRate = tagItr.first->second.requestRate.smoothRate();
auto autoItr = autoThrottledTags.find(tag);
if(autoItr != autoThrottledTags.end()) {
autoItr->second.updateAndGetClientRate(requestRate);
}
double requestRate = tagItr.first->second.requestRate.smoothRate();
auto autoItr = autoThrottledTags.find(tag);
if(autoItr != autoThrottledTags.end()) {
autoItr->second.updateAndGetClientRate(requestRate);
}
auto manualItr = manualThrottledTags.find(tag);
if(manualItr != manualThrottledTags.end()) {
for(auto priorityItr = manualItr->second.begin(); priorityItr != manualItr->second.end(); ++priorityItr) {
priorityItr->second.updateAndGetClientRate(requestRate);
auto manualItr = manualThrottledTags.find(tag);
if(manualItr != manualThrottledTags.end()) {
for(auto priorityItr = manualItr->second.begin(); priorityItr != manualItr->second.end(); ++priorityItr) {
priorityItr->second.updateAndGetClientRate(requestRate);
}
}
}
}
@ -810,7 +840,7 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
}
}
void tryAutoThrottleTag(RatekeeperData *self, StorageQueueInfo const& ss, RkTagThrottleCollection& throttledTags) {
void tryAutoThrottleTag(RatekeeperData *self, StorageQueueInfo const& ss) {
if(ss.busiestTag.present() && ss.busiestTagFractionalBusyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && ss.busiestTagRate > SERVER_KNOBS->MIN_TAG_COST) {
TEST(true); // Transaction tag auto-throttled
@ -824,7 +854,7 @@ void tryAutoThrottleTag(RatekeeperData *self, StorageQueueInfo const& ss, RkTagT
}
}
void updateRate(RatekeeperData* self, RatekeeperLimits* limits, RkTagThrottleCollection& throttledTags) {
void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
//double controlFactor = ; // dt / eFoldingTime
double actualTps = self->smoothReleasedTransactions.smoothRate();
@ -892,7 +922,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits, RkTagThrottleCol
double targetRateRatio = std::min(( storageQueue - targetBytes + springBytes ) / (double)springBytes, 2.0);
if(limits->priority == TransactionPriority::DEFAULT && (storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES || storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS)) {
tryAutoThrottleTag(self, ss, throttledTags);
tryAutoThrottleTag(self, ss);
}
double inputRate = ss.smoothInputBytes.smoothRate();
@ -1162,8 +1192,8 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits, RkTagThrottleCol
.detail("LimitingStorageServerVersionLag", limitingVersionLag)
.detail("WorstStorageServerDurabilityLag", worstDurabilityLag)
.detail("LimitingStorageServerDurabilityLag", limitingDurabilityLag)
.detail("TagsAutoThrottled", throttledTags.autoThrottleCount())
.detail("TagsManuallyThrottled", throttledTags.manualThrottleCount())
.detail("TagsAutoThrottled", self->throttledTags.autoThrottleCount())
.detail("TagsManuallyThrottled", self->throttledTags.manualThrottleCount())
.trackLatest(name);
}
}
@ -1227,8 +1257,8 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
state bool lastLimited = false;
loop choose {
when (wait( timeout )) {
updateRate(&self, &self.normalLimits, self.throttledTags);
updateRate(&self, &self.batchLimits, self.throttledTags);
updateRate(&self, &self.normalLimits);
updateRate(&self, &self.batchLimits);
lastLimited = self.smoothReleasedTransactions.smoothRate() > SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit;
double tooOld = now() - 1.0;
@ -1251,6 +1281,10 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
for(auto tag : req.throttledTagCounts) {
self.throttledTags.addRequests(tag.first, tag.second);
}
// TODO process commitCostEstimation
// for (const auto &[tagName, cost] : req.throttledTagCommitCostEst) {
//
// }
}
if(p.batchTransactions > 0) {
self.smoothBatchReleasedTransactions.addDelta( req.batchReleasedTransactions - p.batchTransactions );

View File

@ -75,6 +75,32 @@ struct ClientTagThrottleLimits {
}
};
struct TransactionCommitCostEstimation {
int numWrite = 0;
int numAtomicWrite = 0;
int numClear = 0;
int numClearShards = 0;
uint64_t bytesWrite = 0;
uint64_t bytesAtomicWrite = 0;
uint64_t bytesClearEst = 0;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, bytesWrite, bytesClearEst, bytesAtomicWrite, numWrite, numAtomicWrite, numClear, numClearShards);
}
TransactionCommitCostEstimation& operator+=(const TransactionCommitCostEstimation& other) {
numWrite += other.numWrite;
numAtomicWrite += other.numAtomicWrite;
numClear += other.numClear;
bytesWrite += other.bytesWrite;
bytesAtomicWrite += other.numAtomicWrite;
numClearShards += other.numClearShards;
bytesClearEst += other.bytesClearEst;
return *this;
}
};
struct GetRateInfoReply {
constexpr static FileIdentifier file_identifier = 7845006;
double transactionRate;
@ -97,16 +123,21 @@ struct GetRateInfoRequest {
int64_t batchReleasedTransactions;
TransactionTagMap<uint64_t> throttledTagCounts;
TransactionTagMap<TransactionCommitCostEstimation> throttledTagCommitCostEst;
bool detailed;
ReplyPromise<struct GetRateInfoReply> reply;
GetRateInfoRequest() {}
GetRateInfoRequest(UID const& requesterID, int64_t totalReleasedTransactions, int64_t batchReleasedTransactions, TransactionTagMap<uint64_t> throttledTagCounts, bool detailed)
: requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions), batchReleasedTransactions(batchReleasedTransactions), throttledTagCounts(throttledTagCounts), detailed(detailed) {}
GetRateInfoRequest(UID const& requesterID, int64_t totalReleasedTransactions, int64_t batchReleasedTransactions,
TransactionTagMap<uint64_t> throttledTagCounts,
TransactionTagMap<TransactionCommitCostEstimation> throttledTagCommitCostEst, bool detailed)
: requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions),
batchReleasedTransactions(batchReleasedTransactions), throttledTagCounts(throttledTagCounts),
throttledTagCommitCostEst(throttledTagCommitCostEst), detailed(detailed) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, requesterID, totalReleasedTransactions, batchReleasedTransactions, throttledTagCounts, detailed, reply);
serializer(ar, requesterID, totalReleasedTransactions, batchReleasedTransactions, throttledTagCounts, detailed, reply, throttledTagCommitCostEst);
}
};

View File

@ -28,12 +28,12 @@
#define FDBSERVER_RESTORE_APPLIER_H
#include <sstream>
#include "flow/Stats.h"
#include "fdbclient/Atomic.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/Locality.h"
#include "fdbrpc/Stats.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbclient/RestoreWorkerInterface.actor.h"
#include "fdbserver/MutationTracking.h"

View File

@ -28,10 +28,10 @@
#define FDBSERVER_RESTORE_LOADER_H
#include <sstream>
#include "flow/Stats.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/Stats.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbrpc/Locality.h"
#include "fdbclient/RestoreWorkerInterface.actor.h"

View File

@ -28,12 +28,12 @@
#define FDBSERVER_RESTORE_MASTER_H
#include <sstream>
#include "flow/Stats.h"
#include "flow/Platform.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/Locality.h"
#include "fdbrpc/Stats.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbserver/RestoreUtil.h"
#include "fdbserver/RestoreRoleCommon.actor.h"

View File

@ -29,13 +29,13 @@
#define FDBSERVER_RestoreRoleCommon_H
#include <sstream>
#include "flow/Stats.h"
#include "flow/SystemMonitor.h"
#include "fdbclient/FDBTypes.h"
#include "fdbclient/CommitTransaction.h"
#include "fdbclient/Notified.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/Locality.h"
#include "fdbrpc/Stats.h"
#include "fdbserver/CoordinationInterface.h"
#include "fdbclient/RestoreWorkerInterface.actor.h"
#include "fdbserver/RestoreUtil.h"

View File

@ -29,10 +29,10 @@
#include "fdbclient/Tuple.h"
#include "fdbclient/CommitTransaction.h"
#include "flow/flow.h"
#include "flow/Stats.h"
#include "fdbrpc/TimedRequest.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/IAsyncFile.h"
#include "fdbrpc/Stats.h"
#include <cstdint>
#include <cstdarg>

View File

@ -28,9 +28,9 @@
#include "fdbclient/Tuple.h"
#include "flow/flow.h"
#include "flow/Stats.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/IAsyncFile.h"
#include "fdbrpc/Stats.h"
#include <cstdint>
#include <cstdarg>

View File

@ -1262,7 +1262,7 @@ void setupSimulatedSystem(vector<Future<Void>>* systemActors, std::string baseFo
ips.push_back(makeIPAddressForSim(useIPv6, { 2, dc, 1, machine }));
}
// check the sslEnablementMap using only one ip(
// check the sslEnablementMap using only one ip
LocalityData localities(Optional<Standalone<StringRef>>(), zoneId, machineId, dcUID);
localities.set(LiteralStringRef("data_hall"), dcUID);
systemActors->push_back(reportErrors(simulatedMachine(conn, ips, sslEnabled,

View File

@ -388,8 +388,24 @@ struct MachineMemoryInfo {
struct RolesInfo {
std::multimap<NetworkAddress, JsonBuilderObject> roles;
JsonBuilderObject addLatencyStatistics(TraceEventFields const& metrics) {
JsonBuilderObject latencyStats;
latencyStats.setKeyRawNumber("count", metrics.getValue("Count"));
latencyStats.setKeyRawNumber("min", metrics.getValue("Min"));
latencyStats.setKeyRawNumber("max", metrics.getValue("Max"));
latencyStats.setKeyRawNumber("median", metrics.getValue("Median"));
latencyStats.setKeyRawNumber("mean", metrics.getValue("Mean"));
latencyStats.setKeyRawNumber("p25", metrics.getValue("P25"));
latencyStats.setKeyRawNumber("p90", metrics.getValue("P90"));
latencyStats.setKeyRawNumber("p95", metrics.getValue("P95"));
latencyStats.setKeyRawNumber("p99", metrics.getValue("P99"));
latencyStats.setKeyRawNumber("p99.9", metrics.getValue("P99.9"));
return latencyStats;
}
JsonBuilderObject addLatencyBandInfo(TraceEventFields const& metrics) {
JsonBuilderObject latency;
JsonBuilderObject latencyBands;
std::map<std::string, JsonBuilderObject> bands;
for(auto itr = metrics.begin(); itr != metrics.end(); ++itr) {
@ -404,10 +420,10 @@ struct RolesInfo {
continue;
}
latency[band] = StatusCounter(itr->second).getCounter();
latencyBands[band] = StatusCounter(itr->second).getCounter();
}
return latency;
return latencyBands;
}
JsonBuilderObject& addRole( NetworkAddress address, std::string const& role, UID id) {
@ -461,7 +477,12 @@ struct RolesInfo {
TraceEventFields const& readLatencyMetrics = metrics.at("ReadLatencyMetrics");
if(readLatencyMetrics.size()) {
obj["read_latency_bands"] = addLatencyBandInfo(readLatencyMetrics);
obj["read_latency_statistics"] = addLatencyStatistics(readLatencyMetrics);
}
TraceEventFields const& readLatencyBands = metrics.at("ReadLatencyBands");
if(readLatencyBands.size()) {
obj["read_latency_bands"] = addLatencyBandInfo(readLatencyBands);
}
obj["data_lag"] = getLagObject(versionLag);
@ -536,12 +557,25 @@ struct RolesInfo {
try {
TraceEventFields const& grvLatencyMetrics = metrics.at("GRVLatencyMetrics");
if(grvLatencyMetrics.size()) {
obj["grv_latency_bands"] = addLatencyBandInfo(grvLatencyMetrics);
JsonBuilderObject priorityStats;
// We only report default priority now, but this allows us to add other priorities if we want them
priorityStats["default"] = addLatencyStatistics(grvLatencyMetrics);
obj["grv_latency_statistics"] = priorityStats;
}
TraceEventFields const& grvLatencyBands = metrics.at("GRVLatencyBands");
if(grvLatencyBands.size()) {
obj["grv_latency_bands"] = addLatencyBandInfo(grvLatencyBands);
}
TraceEventFields const& commitLatencyMetrics = metrics.at("CommitLatencyMetrics");
if(commitLatencyMetrics.size()) {
obj["commit_latency_bands"] = addLatencyBandInfo(commitLatencyMetrics);
obj["commit_latency_statistics"] = addLatencyStatistics(commitLatencyMetrics);
}
TraceEventFields const& commitLatencyBands = metrics.at("CommitLatencyBands");
if(commitLatencyBands.size()) {
obj["commit_latency_bands"] = addLatencyBandInfo(commitLatencyBands);
}
} catch (Error &e) {
if(e.code() != error_code_attribute_not_found) {
@ -1552,7 +1586,7 @@ static Future<vector<std::pair<iface, EventMap>>> getServerMetrics(vector<iface>
ACTOR static Future<vector<std::pair<StorageServerInterface, EventMap>>> getStorageServersAndMetrics(Database cx, std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
vector<StorageServerInterface> servers = wait(timeoutError(getStorageServers(cx, true), 5.0));
vector<std::pair<StorageServerInterface, EventMap>> results = wait(
getServerMetrics(servers, address_workers, std::vector<std::string>{ "StorageMetrics", "ReadLatencyMetrics", "BusiestReadTag" }));
getServerMetrics(servers, address_workers, std::vector<std::string>{ "StorageMetrics", "ReadLatencyMetrics", "ReadLatencyBands", "BusiestReadTag" }));
return results;
}
@ -1567,7 +1601,7 @@ ACTOR static Future<vector<std::pair<TLogInterface, EventMap>>> getTLogsAndMetri
ACTOR static Future<vector<std::pair<MasterProxyInterface, EventMap>>> getProxiesAndMetrics(Reference<AsyncVar<ServerDBInfo>> db, std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
vector<std::pair<MasterProxyInterface, EventMap>> results = wait(getServerMetrics(
db->get().client.proxies, address_workers, std::vector<std::string>{ "GRVLatencyMetrics", "CommitLatencyMetrics" }));
db->get().client.proxies, address_workers, std::vector<std::string>{ "GRVLatencyMetrics", "CommitLatencyMetrics", "GRVLatencyBands", "CommitLatencyBands" }));
return results;
}

View File

@ -19,7 +19,6 @@
*/
#include "flow/Hash3.h"
#include "flow/Stats.h"
#include "flow/UnitTest.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/Notified.h"
@ -38,6 +37,7 @@
#include "fdbserver/IDiskQueue.h"
#include "fdbrpc/sim_validation.h"
#include "fdbrpc/simulator.h"
#include "fdbrpc/Stats.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/WaitFailure.h"

View File

@ -427,14 +427,20 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for(auto& it : self->tLogs) {
for(auto &t : it->logServers) {
if( t->get().present() ) {
failed.push_back( waitFailureClient( t->get().interf().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT, -SERVER_KNOBS->TLOG_TIMEOUT/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY ) );
failed.push_back(waitFailureClient(t->get().interf().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT,
-SERVER_KNOBS->TLOG_TIMEOUT /
SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY,
/*trace=*/true));
} else {
changes.push_back(t->onChange());
}
}
for(auto &t : it->logRouters) {
if( t->get().present() ) {
failed.push_back( waitFailureClient( t->get().interf().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT, -SERVER_KNOBS->TLOG_TIMEOUT/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY ) );
failed.push_back(waitFailureClient(t->get().interf().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT,
-SERVER_KNOBS->TLOG_TIMEOUT /
SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY,
/*trace=*/true));
} else {
changes.push_back(t->onChange());
}
@ -443,7 +449,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if (worker->get().present()) {
backupFailed.push_back(waitFailureClient(
worker->get().interf().waitFailure, SERVER_KNOBS->BACKUP_TIMEOUT,
-SERVER_KNOBS->BACKUP_TIMEOUT / SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY));
-SERVER_KNOBS->BACKUP_TIMEOUT / SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY,
/*trace=*/true));
} else {
changes.push_back(worker->onChange());
}
@ -455,7 +462,10 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for(auto& it : old.tLogs) {
for(auto &t : it->logRouters) {
if( t->get().present() ) {
failed.push_back( waitFailureClient( t->get().interf().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT, -SERVER_KNOBS->TLOG_TIMEOUT/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY ) );
failed.push_back(waitFailureClient(
t->get().interf().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT,
-SERVER_KNOBS->TLOG_TIMEOUT / SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY,
/*trace=*/true));
} else {
changes.push_back(t->onChange());
}
@ -466,7 +476,8 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if (worker->get().present()) {
backupFailed.push_back(waitFailureClient(
worker->get().interf().waitFailure, SERVER_KNOBS->BACKUP_TIMEOUT,
-SERVER_KNOBS->BACKUP_TIMEOUT / SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY));
-SERVER_KNOBS->BACKUP_TIMEOUT / SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY,
/*trace=*/true));
} else {
changes.push_back(worker->onChange());
}
@ -1962,7 +1973,11 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
if(tLogs->locality == locality) {
for( int i = 0; i < logRouterInitializationReplies[nextReplies].size(); i++ ) {
tLogs->logRouters.emplace_back(new AsyncVar<OptionalInterface<TLogInterface>>(OptionalInterface<TLogInterface>(logRouterInitializationReplies[nextReplies][i].get())));
failed.push_back( waitFailureClient( logRouterInitializationReplies[nextReplies][i].get().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT, -SERVER_KNOBS->TLOG_TIMEOUT/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY ) );
failed.push_back(waitFailureClient(
logRouterInitializationReplies[nextReplies][i].get().waitFailure,
SERVER_KNOBS->TLOG_TIMEOUT,
-SERVER_KNOBS->TLOG_TIMEOUT / SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY,
/*trace=*/true));
}
nextReplies++;
}
@ -1980,7 +1995,11 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for( int i = 0; i < logRouterInitializationReplies[nextReplies].size(); i++ ) {
tLogs->logRouters.emplace_back(new AsyncVar<OptionalInterface<TLogInterface>>(OptionalInterface<TLogInterface>(logRouterInitializationReplies[nextReplies][i].get())));
if(!forRemote) {
failed.push_back( waitFailureClient( logRouterInitializationReplies[nextReplies][i].get().waitFailure, SERVER_KNOBS->TLOG_TIMEOUT, -SERVER_KNOBS->TLOG_TIMEOUT/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY ) );
failed.push_back(waitFailureClient(
logRouterInitializationReplies[nextReplies][i].get().waitFailure,
SERVER_KNOBS->TLOG_TIMEOUT,
-SERVER_KNOBS->TLOG_TIMEOUT / SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY,
/*trace=*/true));
}
}
nextReplies++;

View File

@ -37,12 +37,19 @@ ACTOR Future<Void> waitFailureServer(FutureStream<ReplyPromise<Void>> waitFailur
}
}
ACTOR Future<Void> waitFailureClient(RequestStream<ReplyPromise<Void>> waitFailure, double reactionTime, double reactionSlope, TaskPriority taskID){
ACTOR Future<Void> waitFailureClient(RequestStream<ReplyPromise<Void>> waitFailure, double reactionTime,
double reactionSlope, bool trace, TaskPriority taskID) {
loop {
try {
state double start = now();
ErrorOr<Void> x = wait(waitFailure.getReplyUnlessFailedFor(ReplyPromise<Void>(), reactionTime, reactionSlope, taskID));
if (!x.present()) return Void();
if (!x.present()) {
if (trace) {
TraceEvent("WaitFailureClient")
.detail("FailedEndpoint", waitFailure.getEndpoint().getPrimaryAddress().toString());
}
return Void();
}
double w = start + SERVER_KNOBS->WAIT_FAILURE_DELAY_LIMIT - now();
if (w > 0)
wait( delay( w, taskID ) );
@ -57,7 +64,7 @@ ACTOR Future<Void> waitFailureClient(RequestStream<ReplyPromise<Void>> waitFailu
ACTOR Future<Void> waitFailureClientStrict(RequestStream<ReplyPromise<Void>> waitFailure, double failureReactionTime, TaskPriority taskID){
loop {
wait(waitFailureClient(waitFailure, 0, 0, taskID));
wait(waitFailureClient(waitFailure, 0, 0, false, taskID));
wait(delay(failureReactionTime, taskID) || IFailureMonitor::failureMonitor().onStateEqual( waitFailure.getEndpoint(), FailureStatus(false)));
if(IFailureMonitor::failureMonitor().getState( waitFailure.getEndpoint() ).isFailed()) {
return Void();

View File

@ -25,8 +25,9 @@
Future<Void> waitFailureServer(const FutureStream<ReplyPromise<Void>>& waitFailure);
// talks to a wait failure server, returns Void on failure
Future<Void> waitFailureClient(const RequestStream<ReplyPromise<Void>>& waitFailure,
double const& failureReactionTime=0, double const& failureReactionSlope=0, TaskPriority const& taskID=TaskPriority::DefaultEndpoint);
Future<Void> waitFailureClient(const RequestStream<ReplyPromise<Void>>& waitFailure,
double const& failureReactionTime = 0, double const& failureReactionSlope = 0,
bool const& trace = false, TaskPriority const& taskID = TaskPriority::DefaultEndpoint);
// talks to a wait failure server, returns Void on failure, reaction time is always waited
Future<Void> waitFailureClientStrict(const RequestStream<ReplyPromise<Void>>& waitFailure, double const& failureReactionTime=0, TaskPriority const& taskID=TaskPriority::DefaultEndpoint);

View File

@ -405,17 +405,23 @@ ACTOR Future<Void> newSeedServers( Reference<MasterData> self, RecruitFromConfig
}
Future<Void> waitProxyFailure( vector<MasterProxyInterface> const& proxies ) {
vector<Future<Void>> failed;
for(int i=0; i<proxies.size(); i++)
failed.push_back( waitFailureClient( proxies[i].waitFailure, SERVER_KNOBS->TLOG_TIMEOUT, -SERVER_KNOBS->TLOG_TIMEOUT/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY ) );
std::vector<Future<Void>> failed;
for (auto proxy : proxies) {
failed.push_back(waitFailureClient(proxy.waitFailure, SERVER_KNOBS->TLOG_TIMEOUT,
-SERVER_KNOBS->TLOG_TIMEOUT / SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY,
/*trace=*/true));
}
ASSERT( failed.size() >= 1 );
return tagError<Void>(quorum( failed, 1 ), master_proxy_failed());
}
Future<Void> waitResolverFailure( vector<ResolverInterface> const& resolvers ) {
vector<Future<Void>> failed;
for(int i=0; i<resolvers.size(); i++)
failed.push_back( waitFailureClient( resolvers[i].waitFailure, SERVER_KNOBS->TLOG_TIMEOUT, -SERVER_KNOBS->TLOG_TIMEOUT/SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY ) );
std::vector<Future<Void>> failed;
for (auto resolver : resolvers) {
failed.push_back(waitFailureClient(resolver.waitFailure, SERVER_KNOBS->TLOG_TIMEOUT,
-SERVER_KNOBS->TLOG_TIMEOUT / SERVER_KNOBS->SECONDS_BEFORE_NO_FAILURE_DELAY,
/*trace=*/true));
}
ASSERT( failed.size() >= 1 );
return tagError<Void>(quorum( failed, 1 ), master_resolver_failed());
}

View File

@ -54,7 +54,7 @@
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbrpc/sim_validation.h"
#include "fdbrpc/Smoother.h"
#include "flow/Stats.h"
#include "fdbrpc/Stats.h"
#include "flow/TDMetric.actor.h"
#include <type_traits>
#include "flow/actorcompiler.h" // This must be the last #include.
@ -538,6 +538,7 @@ public:
Counter fetchWaitingMS, fetchWaitingCount, fetchExecutingMS, fetchExecutingCount;
Counter readsRejected;
LatencySample readLatencySample;
LatencyBands readLatencyBands;
Counters(StorageServer* self)
@ -568,7 +569,8 @@ public:
fetchExecutingMS("FetchExecutingMS", cc),
fetchExecutingCount("FetchExecutingCount", cc),
readsRejected("ReadsRejected", cc),
readLatencyBands("ReadLatencyMetrics", self->thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY)
readLatencySample("ReadLatencyMetrics", self->thisServerID, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
readLatencyBands("ReadLatencyBands", self->thisServerID, SERVER_KNOBS->STORAGE_LOGGING_DELAY)
{
specialCounter(cc, "LastTLogVersion", [self](){ return self->lastTLogVersion; });
specialCounter(cc, "Version", [self](){ return self->version.get(); });
@ -1005,9 +1007,12 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
++data->counters.finishedQueries;
--data->readQueueSizeMetric;
double duration = timer() - req.requestTime();
data->counters.readLatencySample.addMeasurement(duration);
if(data->latencyBandConfig.present()) {
int maxReadBytes = data->latencyBandConfig.get().readConfig.maxReadBytes.orDefault(std::numeric_limits<int>::max());
data->counters.readLatencyBands.addMeasurement(timer() - req.requestTime(), resultSize > maxReadBytes);
data->counters.readLatencyBands.addMeasurement(duration, resultSize > maxReadBytes);
}
return Void();
@ -1614,11 +1619,13 @@ ACTOR Future<Void> getKeyValuesQ( StorageServer* data, GetKeyValuesRequest req )
++data->counters.finishedQueries;
--data->readQueueSizeMetric;
double duration = timer() - req.requestTime();
data->counters.readLatencySample.addMeasurement(duration);
if(data->latencyBandConfig.present()) {
int maxReadBytes = data->latencyBandConfig.get().readConfig.maxReadBytes.orDefault(std::numeric_limits<int>::max());
int maxSelectorOffset = data->latencyBandConfig.get().readConfig.maxKeySelectorOffset.orDefault(std::numeric_limits<int>::max());
data->counters.readLatencyBands.addMeasurement(
timer() - req.requestTime(), resultSize > maxReadBytes || abs(req.begin.offset) > maxSelectorOffset ||
duration, resultSize > maxReadBytes || abs(req.begin.offset) > maxSelectorOffset ||
abs(req.end.offset) > maxSelectorOffset);
}
@ -1684,11 +1691,14 @@ ACTOR Future<Void> getKeyQ( StorageServer* data, GetKeyRequest req ) {
++data->counters.finishedQueries;
--data->readQueueSizeMetric;
double duration = timer() - req.requestTime();
data->counters.readLatencySample.addMeasurement(duration);
if(data->latencyBandConfig.present()) {
int maxReadBytes = data->latencyBandConfig.get().readConfig.maxReadBytes.orDefault(std::numeric_limits<int>::max());
int maxSelectorOffset = data->latencyBandConfig.get().readConfig.maxKeySelectorOffset.orDefault(std::numeric_limits<int>::max());
data->counters.readLatencyBands.addMeasurement(
timer() - req.requestTime(), resultSize > maxReadBytes || abs(req.sel.offset) > maxSelectorOffset);
duration, resultSize > maxReadBytes || abs(req.sel.offset) > maxSelectorOffset);
}
return Void();

View File

@ -28,7 +28,8 @@
struct LowLatencyWorkload : TestWorkload {
double testDuration;
double maxLatency;
double maxGRVLatency;
double maxCommitLatency;
double checkDelay;
PerfIntCounter operations, retries;
bool testWrites;
@ -39,7 +40,8 @@ struct LowLatencyWorkload : TestWorkload {
: TestWorkload(wcx), operations("Operations"), retries("Retries") , ok(true)
{
testDuration = getOption( options, LiteralStringRef("testDuration"), 600.0 );
maxLatency = getOption( options, LiteralStringRef("maxLatency"), 20.0 );
maxGRVLatency = getOption(options, LiteralStringRef("maxGRVLatency"), 20.0);
maxCommitLatency = getOption(options, LiteralStringRef("maxCommitLatency"), 30.0);
checkDelay = getOption( options, LiteralStringRef("checkDelay"), 1.0 );
testWrites = getOption(options, LiteralStringRef("testWrites"), true);
testKey = getOption(options, LiteralStringRef("testKey"), LiteralStringRef("testKey"));
@ -64,13 +66,14 @@ struct LowLatencyWorkload : TestWorkload {
wait( delay( self->checkDelay ) );
state Transaction tr( cx );
state double operationStart = now();
state bool doCommit = self->testWrites && deterministicRandom()->coinflip();
state double maxLatency = doCommit ? self->maxCommitLatency : self->maxGRVLatency;
++self->operations;
loop {
try {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
bool doSet = self->testWrites && deterministicRandom()->coinflip();
if (doSet) {
if (doCommit) {
tr.set(self->testKey, LiteralStringRef(""));
wait(tr.commit());
} else {
@ -82,8 +85,11 @@ struct LowLatencyWorkload : TestWorkload {
++self->retries;
}
}
if(now() - operationStart > self->maxLatency) {
TraceEvent(SevError, "LatencyTooLarge").detail("MaxLatency", self->maxLatency).detail("ObservedLatency", now() - operationStart);
if (now() - operationStart > maxLatency) {
TraceEvent(SevError, "LatencyTooLarge")
.detail("MaxLatency", maxLatency)
.detail("ObservedLatency", now() - operationStart)
.detail("IsCommit", doCommit);
self->ok = false;
}
if( now() - testStart > self->testDuration )

View File

@ -20,8 +20,98 @@
#include "Arena.h"
// See https://dox.ipxe.org/memcheck_8h_source.html and https://dox.ipxe.org/valgrind_8h_source.html for an explanation
// of valgrind client requests
#ifdef VALGRIND_ARENA
#include <memcheck.h>
#else
// Since VALGRIND_ARENA is not set, we don't want to pay the performance penalty for precise tracking of arenas. We'll
// make these macros noops just for this translation unit.
#undef VALGRIND_MAKE_MEM_NOACCESS
#undef VALGRIND_MAKE_MEM_DEFINED
#undef VALGRIND_MAKE_MEM_UNDEFINED
#define VALGRIND_MAKE_MEM_NOACCESS(addr, size) ((void)(addr), (void)(size))
#define VALGRIND_MAKE_MEM_DEFINED(addr, size) ((void)(addr), (void)(size))
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size) ((void)(addr), (void)(size))
#endif
// For each use of arena-internal memory (e.g. ArenaBlock::getSize()), unpoison the memory before use and
// poison it when done.
// When creating a new ArenaBlock, poison the memory that will be later allocated to users.
// When allocating memory to a user, mark that memory as undefined.
namespace {
void allow_access(ArenaBlock* b) {
if (b) {
VALGRIND_MAKE_MEM_DEFINED(b, ArenaBlock::TINY_HEADER);
int headerSize = b->isTiny() ? ArenaBlock::TINY_HEADER : sizeof(ArenaBlock);
VALGRIND_MAKE_MEM_DEFINED(b, headerSize);
}
}
void disallow_access(ArenaBlock* b) {
if (b) {
int headerSize = b->isTiny() ? ArenaBlock::TINY_HEADER : sizeof(ArenaBlock);
VALGRIND_MAKE_MEM_NOACCESS(b, headerSize);
}
}
} // namespace
Arena::Arena() : impl(NULL) {}
Arena::Arena(size_t reservedSize) : impl(0) {
UNSTOPPABLE_ASSERT(reservedSize < std::numeric_limits<int>::max());
if (reservedSize) {
allow_access(impl.getPtr());
ArenaBlock::create((int)reservedSize, impl);
disallow_access(impl.getPtr());
}
}
Arena::Arena(const Arena& r) = default;
Arena::Arena(Arena&& r) noexcept = default;
Arena& Arena::operator=(const Arena& r) = default;
Arena& Arena::operator=(Arena&& r) noexcept = default;
void Arena::dependsOn(const Arena& p) {
if (p.impl) {
allow_access(impl.getPtr());
allow_access(p.impl.getPtr());
ArenaBlock::dependOn(impl, p.impl.getPtr());
disallow_access(p.impl.getPtr());
if (p.impl.getPtr() != impl.getPtr()) {
disallow_access(impl.getPtr());
}
}
}
size_t Arena::getSize() const {
if (impl) {
allow_access(impl.getPtr());
auto result = impl->totalSize();
disallow_access(impl.getPtr());
return result;
}
return 0;
}
bool Arena::hasFree(size_t size, const void* address) {
if (impl) {
allow_access(impl.getPtr());
auto result = impl->unused() >= size && impl->getNextData() == address;
disallow_access(impl.getPtr());
return result;
}
return false;
}
void ArenaBlock::addref() {
VALGRIND_MAKE_MEM_DEFINED(this, sizeof(ThreadSafeReferenceCounted<ArenaBlock>));
ThreadSafeReferenceCounted<ArenaBlock>::addref();
VALGRIND_MAKE_MEM_NOACCESS(this, sizeof(ThreadSafeReferenceCounted<ArenaBlock>));
}
void ArenaBlock::delref() {
if (delref_no_destroy()) destroy();
VALGRIND_MAKE_MEM_DEFINED(this, sizeof(ThreadSafeReferenceCounted<ArenaBlock>));
if (delref_no_destroy()) {
destroy();
} else {
VALGRIND_MAKE_MEM_NOACCESS(this, sizeof(ThreadSafeReferenceCounted<ArenaBlock>));
}
}
bool ArenaBlock::isTiny() const {
@ -52,14 +142,20 @@ const void* ArenaBlock::getNextData() const {
return (const uint8_t*)getData() + used();
}
size_t ArenaBlock::totalSize() {
if (isTiny()) return size();
if (isTiny()) {
return size();
}
size_t s = size();
int o = nextBlockOffset;
while (o) {
ArenaBlockRef* r = (ArenaBlockRef*)((char*)getData() + o);
VALGRIND_MAKE_MEM_DEFINED(r, sizeof(ArenaBlockRef));
allow_access(r->next);
s += r->next->totalSize();
disallow_access(r->next);
o = r->nextBlockOffset;
VALGRIND_MAKE_MEM_NOACCESS(r, sizeof(ArenaBlockRef));
}
return s;
}
@ -71,8 +167,10 @@ void ArenaBlock::getUniqueBlocks(std::set<ArenaBlock*>& a) {
int o = nextBlockOffset;
while (o) {
ArenaBlockRef* r = (ArenaBlockRef*)((char*)getData() + o);
VALGRIND_MAKE_MEM_DEFINED(r, sizeof(ArenaBlockRef));
r->next->getUniqueBlocks(a);
o = r->nextBlockOffset;
VALGRIND_MAKE_MEM_NOACCESS(r, sizeof(ArenaBlockRef));
}
return;
}
@ -91,8 +189,10 @@ int ArenaBlock::addUsed(int bytes) {
void ArenaBlock::makeReference(ArenaBlock* next) {
ArenaBlockRef* r = (ArenaBlockRef*)((char*)getData() + bigUsed);
VALGRIND_MAKE_MEM_DEFINED(r, sizeof(ArenaBlockRef));
r->next = next;
r->nextBlockOffset = nextBlockOffset;
VALGRIND_MAKE_MEM_NOACCESS(r, sizeof(ArenaBlockRef));
nextBlockOffset = bigUsed;
bigUsed += sizeof(ArenaBlockRef);
}
@ -107,9 +207,17 @@ void ArenaBlock::dependOn(Reference<ArenaBlock>& self, ArenaBlock* other) {
void* ArenaBlock::allocate(Reference<ArenaBlock>& self, int bytes) {
ArenaBlock* b = self.getPtr();
if (!self || self->unused() < bytes) b = create(bytes, self);
allow_access(b);
if (!self || self->unused() < bytes) {
auto* tmp = b;
b = create(bytes, self);
disallow_access(tmp);
}
return (char*)b->getData() + b->addUsed(bytes);
void* result = (char*)b->getData() + b->addUsed(bytes);
disallow_access(b);
VALGRIND_MAKE_MEM_UNDEFINED(result, bytes);
return result;
}
// Return an appropriately-sized ArenaBlock to store the given data
@ -205,6 +313,7 @@ ArenaBlock* ArenaBlock::create(int dataSize, Reference<ArenaBlock>& next) {
}
b->setrefCountUnsafe(1);
next.setPtrUnsafe(b);
VALGRIND_MAKE_MEM_NOACCESS(reinterpret_cast<uint8_t*>(b) + b->used(), b->unused());
return b;
}
@ -212,18 +321,23 @@ void ArenaBlock::destroy() {
// If the stack never contains more than one item, nothing will be allocated from stackArena.
// If stackArena is used, it will always be a linked list, so destroying *it* will not create another arena
ArenaBlock* tinyStack = this;
allow_access(this);
Arena stackArena;
VectorRef<ArenaBlock*> stack(&tinyStack, 1);
while (stack.size()) {
ArenaBlock* b = stack.end()[-1];
stack.pop_back();
allow_access(b);
if (!b->isTiny()) {
int o = b->nextBlockOffset;
while (o) {
ArenaBlockRef* br = (ArenaBlockRef*)((char*)b->getData() + o);
VALGRIND_MAKE_MEM_DEFINED(br, sizeof(ArenaBlockRef));
allow_access(br->next);
if (br->next->delref_no_destroy()) stack.push_back(stackArena, br->next);
disallow_access(br->next);
o = br->nextBlockOffset;
}
}

View File

@ -92,24 +92,33 @@ class NonCopyable
NonCopyable & operator = (const NonCopyable &);
};
// An Arena is a custom allocator that consists of a set of ArenaBlocks. Allocation is performed by bumping a pointer
// on the most recent ArenaBlock until the block is unable to service the next allocation request. When the current
// ArenaBlock is full, a new (larger) one is added to the Arena. Deallocation is not directly supported. Instead,
// memory is freed by deleting the entire Arena at once. See flow/README.md for details on using Arenas.
class Arena {
public:
inline Arena();
inline explicit Arena( size_t reservedSize );
Arena();
explicit Arena(size_t reservedSize);
//~Arena();
Arena(const Arena&);
Arena(Arena&& r) noexcept;
Arena& operator=(const Arena&);
Arena& operator=(Arena&&) noexcept;
inline void dependsOn( const Arena& p );
inline size_t getSize() const;
void dependsOn(const Arena& p);
size_t getSize() const;
inline bool hasFree( size_t size, const void *address );
bool hasFree(size_t size, const void* address);
friend void* operator new ( size_t size, Arena& p );
friend void* operator new[] ( size_t size, Arena& p );
//private:
bool sameArena(const Arena& other) const {
return impl.getPtr() == other.impl.getPtr();
}
private:
Reference<struct ArenaBlock> impl;
};
@ -146,6 +155,7 @@ struct ArenaBlock : NonCopyable, ThreadSafeReferenceCounted<ArenaBlock>
uint32_t bigSize, bigUsed; // include block header
uint32_t nextBlockOffset;
void addref();
void delref();
bool isTiny() const;
int size() const;
@ -169,28 +179,6 @@ private:
static void* operator new(size_t s); // not implemented
};
inline Arena::Arena() : impl( NULL ) {}
inline Arena::Arena(size_t reservedSize) : impl( 0 ) {
UNSTOPPABLE_ASSERT( reservedSize < std::numeric_limits<int>::max() );
if (reservedSize)
ArenaBlock::create((int)reservedSize,impl);
}
inline Arena::Arena( const Arena& r ) : impl( r.impl ) {}
inline Arena::Arena(Arena&& r) noexcept : impl(std::move(r.impl)) {}
inline Arena& Arena::operator=(const Arena& r) {
impl = r.impl;
return *this;
}
inline Arena& Arena::operator=(Arena&& r) noexcept {
impl = std::move(r.impl);
return *this;
}
inline void Arena::dependsOn( const Arena& p ) {
if (p.impl)
ArenaBlock::dependOn( impl, p.impl.getPtr() );
}
inline size_t Arena::getSize() const { return impl ? impl->totalSize() : 0; }
inline bool Arena::hasFree( size_t size, const void *address ) { return impl && impl->unused() >= size && impl->getNextData() == address; }
inline void* operator new ( size_t size, Arena& p ) {
UNSTOPPABLE_ASSERT( size < std::numeric_limits<int>::max() );
return ArenaBlock::allocate( p.impl, (int)size );
@ -1026,21 +1014,21 @@ public:
using pointer = value_type*;
using reference = value_type&;
friend class SmallVectorRef<T, InlineMembers>;
template<bool I>
friend bool operator<(const iterator_impl<I>&, const iterator_impl<I>&);
template<bool I>
friend bool operator>(const iterator_impl<I>&, const iterator_impl<I>&);
template<bool I>
friend bool operator<=(const iterator_impl<I>&, const iterator_impl<I>&);
template<bool I>
friend bool operator>=(const iterator_impl<I>&, const iterator_impl<I>&);
template<bool I>
template <bool I>
friend bool operator<(const iterator_impl<I>&, const iterator_impl<I>&);
template <bool I>
friend bool operator>(const iterator_impl<I>&, const iterator_impl<I>&);
template <bool I>
friend bool operator<=(const iterator_impl<I>&, const iterator_impl<I>&);
template <bool I>
friend bool operator>=(const iterator_impl<I>&, const iterator_impl<I>&);
template <bool I>
friend self_t operator+(const iterator_impl<I>&, difference_type);
template<bool I>
template <bool I>
friend self_t operator+(difference_type, const self_t&);
template<bool I>
template <bool I>
friend self_t operator-(const iterator_impl<I>&, difference_type);
template<bool I>
template <bool I>
friend difference_type operator-(iterator_impl<I>, self_t);
self_t& operator++() {
@ -1049,7 +1037,7 @@ public:
}
self_t operator++(int) {
auto res = *this;
++res;
++(*this);
return res;
}
self_t& operator--() {
@ -1058,7 +1046,7 @@ public:
}
self_t operator--(int) {
auto res = *this;
--res;
--(*this);
return res;
}
self_t& operator+=(difference_type diff) {
@ -1092,10 +1080,9 @@ public: // Construction
static_assert(std::is_trivially_destructible_v<T>);
SmallVectorRef() {}
SmallVectorRef(const SmallVectorRef<T, InlineMembers>& other)
: m_size(other.m_size), m_capacity(std::max(other.m_capacity, InlineMembers)), arr(other.arr), data(other.data) {}
: m_size(other.m_size), arr(other.arr), data(other.data) {}
SmallVectorRef& operator=(const SmallVectorRef<T, InlineMembers>& other) {
m_size = other.m_size;
m_capacity = other.m_capacity;
arr = other.arr;
data = other.data;
return *this;
@ -1103,52 +1090,46 @@ public: // Construction
template <class T2 = T, int IM = InlineMembers>
SmallVectorRef(Arena& arena, const SmallVectorRef<T, IM>& toCopy,
typename std::enable_if<!flow_ref<T2>::value, int>::type = 0)
: m_size(toCopy.m_size), m_capacity(std::max(InlineMembers, toCopy.m_capacity)),
data(toCopy.m_size <= InlineMembers ? nullptr
: (T*)new (arena) uint8_t[sizeof(T) * (toCopy.m_size - InlineMembers)]) {
typename std::enable_if<!flow_ref<T2>::value, int>::type = 0)
: m_size(toCopy.m_size) {
if (toCopy.size() > InlineMembers) {
data.resize(arena, toCopy.size() - InlineMembers);
}
std::copy(toCopy.cbegin(), toCopy.cend(), begin());
}
template <class T2 = T, int IM = InlineMembers>
SmallVectorRef(Arena& arena, const SmallVectorRef<T2, IM>& toCopy,
typename std::enable_if<flow_ref<T2>::value, int>::type = 0)
: m_size(toCopy.m_size), m_capacity(std::max(toCopy.m_capacity, InlineMembers)),
data(toCopy.m_size <= InlineMembers ? nullptr
: (T*)new (arena) uint8_t[sizeof(T) * (toCopy.m_size - InlineMembers)]) {
for (int i = 0; i < toCopy.m_size; ++i) {
: m_size(toCopy.m_size) {
for (int i = 0; i < toCopy.size(); ++i) {
if (i < arr.size()) {
new (&arr[i]) T(arena, toCopy[i]);
} else {
new (&data[i - InlineMembers]) T(arena, toCopy[i]);
data.push_back_deep(arena, toCopy[i]);
}
}
std::copy(toCopy.cbegin(), toCopy.cend(), begin());
}
template <class It>
SmallVectorRef(Arena& arena, It first, It last)
: m_size(0), m_capacity(std::max(int(std::distance(first, last)), InlineMembers)),
data(m_capacity <= InlineMembers ? nullptr
: (T*)new (arena) uint8_t[sizeof(T) * (m_capacity - InlineMembers)]) {
: m_size(0) {
while (first != last && m_size < InlineMembers) {
new (&arr[m_size++]) T(*(first++));
}
while (first != last) {
new (&arr[m_size++ - InlineMembers]) T(*(first++));
data.push_back(arena, *(first++));
}
}
SmallVectorRef(SmallVectorRef<T, InlineMembers>&& o)
: m_size(o.m_size), m_capacity(o.m_capacity), arr(std::move(o.arr)), data(o.data) {
: m_size(o.m_size), arr(std::move(o.arr)), data(std::move(o.data)) {
o.m_size = 0;
o.m_capacity = InlineMembers;
o.data = nullptr;
}
public: // information
int size() const { return m_size; }
int capacity() const { return m_capacity; }
int capacity() const { return InlineMembers + data.capacity(); }
bool empty() const { return m_size == 0; }
public: // element access
@ -1166,15 +1147,12 @@ public: // element access
public: // Modification
void push_back(Arena& arena, T const& value) {
UNSTOPPABLE_ASSERT(m_capacity >= m_size && m_capacity >= InlineMembers);
if (m_size < InlineMembers) {
new (&arr[m_size++]) T(value);
return;
}
if (m_size == m_capacity) {
reallocate(arena, m_capacity + 1);
}
new (&data[m_size++ - InlineMembers]) T(value);
++m_size;
data.push_back(arena, value);
}
void push_back_deep(Arena& arena, T const& value) {
@ -1182,13 +1160,11 @@ public: // Modification
new (&arr[m_size++]) T(arena, value);
return;
}
if (m_size == m_capacity) {
reallocate(arena, m_capacity + 1);
}
new (&data[m_size++ - InlineMembers]) T(arena, value);
++m_size;
data.push_back_deep(arena, value);
}
void pop_back() { --m_size; }
void pop_back() {--m_size; }
template <class It>
void append(Arena& arena, It first, It last) {
@ -1196,14 +1172,14 @@ public: // Modification
return;
}
auto d = std::distance(first, last);
if (m_size + d < m_capacity) {
reallocate(arena, m_capacity);
if (m_size + d > InlineMembers) {
data.reserve(arena, m_size + d - InlineMembers);
}
while (first != last && m_size < InlineMembers) {
new (&(arr[m_size++])) T(*(first++));
}
while (first != last) {
new (&data[m_size++ - InlineMembers]) T(*(first++));
data.push_back(arena, *(first++));
}
}
@ -1213,14 +1189,14 @@ public: // Modification
return;
}
auto d = std::distance(first, last);
if (m_size + d < m_capacity) {
reallocate(arena, m_capacity);
if (m_size + d > InlineMembers) {
data.reserve(arena, m_size + d - InlineMembers);
}
while (first != last && m_size < InlineMembers) {
new (&(arr[m_size++])) T(arena, *(first++));
}
while (first != last) {
new (&data[m_size++ - InlineMembers]) T(arena, *(first++));
data.push_back_deep(arena, *(first++));
}
}
@ -1269,22 +1245,10 @@ public: // iterator access
const_reverse_iterator rend() const { return crend(); }
private: // memory management
void reallocate(Arena& p, int requiredCapacity) {
requiredCapacity = std::max(m_capacity * 2, requiredCapacity);
// SOMEDAY: Maybe we are right at the end of the arena and can expand cheaply
T* newData = new (p) T[requiredCapacity - InlineMembers];
if (m_size > InlineMembers) {
std::move(data, data + m_size - InlineMembers, newData);
}
data = newData;
m_capacity = requiredCapacity;
}
private:
int m_size = 0, m_capacity = InlineMembers;
int m_size = 0;
std::array<T, InlineMembers> arr;
T* data = nullptr;
VectorRef<T> data;
};
template <class T, int InlineMembers, bool isConst>

View File

@ -48,8 +48,6 @@ set(FLOW_SRCS
SignalSafeUnwind.cpp
SignalSafeUnwind.h
SimpleOpt.h
Stats.actor.cpp
Stats.h
SystemMonitor.cpp
SystemMonitor.h
TDMetric.actor.h

View File

@ -333,6 +333,9 @@ void FastAllocator<Size>::release(void *ptr) {
ASSERT(!thr.freelist == (thr.count == 0)); // freelist is empty if and only if count is 0
#if VALGRIND
VALGRIND_MAKE_MEM_DEFINED(ptr, sizeof(void*));
#endif
++thr.count;
*(void**)ptr = thr.freelist;
//check(ptr, false);

View File

@ -112,7 +112,7 @@ Reference<IThreadPool> createGenericThreadPool();
class DummyThreadPool : public IThreadPool, ReferenceCounted<DummyThreadPool> {
public:
~DummyThreadPool() {}
DummyThreadPool() : thread(NULL) {}
DummyThreadPool() : thread(nullptr) {}
Future<Void> getError() {
return errors.getFuture();
}

View File

@ -2388,11 +2388,10 @@ ACTOR Future<vector<std::string>> findFiles( std::string directory, std::string
}
if (!FindNextFile( h, &fd ))
break;
if (async && __rdtsc() - tsc_begin > FLOW_KNOBS->TSC_YIELD_TIME) {
if (async && __rdtsc() - tsc_begin > FLOW_KNOBS->TSC_YIELD_TIME && !g_network->isSimulated()) {
wait( yield() );
tsc_begin = __rdtsc();
}
}
if (GetLastError() != ERROR_NO_MORE_FILES) {
TraceEvent(SevError, "FindNextFile").detail("Directory", directory).detail("Extension", extension).GetLastError();
@ -2450,7 +2449,7 @@ ACTOR Future<vector<std::string>> findFiles( std::string directory, std::string
(!directoryOnly && acceptFile(buf.st_mode, name, extension))) {
result.push_back( name );
}
if (async && __rdtsc() - tsc_begin > FLOW_KNOBS->TSC_YIELD_TIME) {
if (async && __rdtsc() - tsc_begin > FLOW_KNOBS->TSC_YIELD_TIME && !g_network->isSimulated()) {
wait( yield() );
tsc_begin = __rdtsc();
}

View File

@ -615,7 +615,7 @@ namespace actorcompiler
{
LineNumber(cx.target, stmt.FirstSourceLine);
if (stmt.decl.initializerConstructorSyntax || stmt.decl.initializer=="")
cx.target.WriteLine("{0} = std::move( {1}({2}) );", stmt.decl.name, stmt.decl.type, stmt.decl.initializer);
cx.target.WriteLine("{0} = {1}({2});", stmt.decl.name, stmt.decl.type, stmt.decl.initializer);
else
cx.target.WriteLine("{0} = {1};", stmt.decl.name, stmt.decl.initializer);
}

View File

@ -32,7 +32,7 @@
<Wix xmlns='http://schemas.microsoft.com/wix/2006/wi'>
<Product Name='$(var.Title)'
Id='{0EFA1E57-0081-4CB5-8502-F0779A0C59F5}'
Id='{C2791390-0993-4F6B-9708-ED2A4558A013}'
UpgradeCode='{A95EA002-686E-4164-8356-C715B7F8B1C8}'
Version='$(var.Version)'
Manufacturer='$(var.Manufacturer)'

View File

@ -13,7 +13,8 @@ connectionFailuresDisableDuration = 60
[[test.workload]]
testName = 'LowLatency'
testDuration = 300.0
maxLatency = 50.0
maxGRVLatency = 50.0
testWrites = false
[[test.workload]]
testName = 'Attrition'