Merge branch 'master' into feature-metadata-version

# Conflicts:
#	fdbclient/DatabaseContext.h
This commit is contained in:
Evan Tschannen 2019-03-03 22:58:45 -08:00
commit 075fdef31a
41 changed files with 1025 additions and 218 deletions

View File

@ -77,6 +77,12 @@ else
CCACHE_CXX := $(CXX)
endif
# Default variables don't get pushed into the environment, but scripts in build/
# rely on the existence of CC in the environment.
ifeq ($(origin CC), default)
CC := $(CC)
endif
ACTORCOMPILER := bin/actorcompiler.exe
# UNSTRIPPED := 1

View File

@ -60,7 +60,12 @@ class RandomGenerator(object):
sign = -1 if random.random() < 0.5 else 1
exponent = random.randint(-(1 << (exp_bits - 1)) - 10, (1 << (exp_bits - 1) - 1))
mantissa = random.random()
return sign * math.pow(2, exponent) * mantissa
result = sign * math.pow(2, exponent) * mantissa
if random.random() < 0.05:
result = float(int(result))
return result
def random_tuple(self, max_size, incomplete_versionstamps=False):
size = random.randint(1, max_size)

View File

@ -79,7 +79,7 @@ class SingleFloat(object):
self.value = ctypes.c_float(value).value
elif isinstance(value, ctypes.c_float):
self.value = value.value
elif isinstance(value, six.integertypes):
elif isinstance(value, six.integer_types):
self.value = ctypes.c_float(value).value
else:
raise ValueError("Incompatible type for single-precision float: " + repr(value))

View File

@ -21,6 +21,7 @@
import ctypes
import math
import sys
import os
import struct
@ -498,6 +499,8 @@ class Tester:
elif inst.op == six.u("ENCODE_FLOAT"):
f_bytes = inst.pop()
f = struct.unpack(">f", f_bytes)[0]
if not math.isnan(f) and not math.isinf(f) and not f == -0.0 and f == int(f):
f = int(f)
inst.push(fdb.tuple.SingleFloat(f))
elif inst.op == six.u("ENCODE_DOUBLE"):
d_bytes = inst.pop()

View File

@ -2,11 +2,6 @@
set -e
if [ -z ${CC+x} ]
then
CC=gcc
fi
case $1 in
Application | DynamicLibrary)
echo "Linking $3"

View File

@ -49,7 +49,7 @@ Encoding: `b'\x05' + ''.join(map(lambda x: b'\x00\xff' if x is None else pack(x)
Test case: `pack( (“foo\x00bar”, None, ()) ) == b'\x05\x01foo\x00\xffbar\x00\x00\xff\x05\x00\x00'`
Status: Standard
The list is ended with a 0x00 byte. Nulls within the tuple are encoded as `\x00\xff`. There is no other null escaping. In particular, 0x00 bytes that are within the nested types can be left as-is as they are passed over when decoding the interior types. To show how this fixes the bug in the previous version of nested tuples, the empty tuple is now encoded as `\x05\x00` while the tuple containing only null is encoded as `\x05\x00\xff\x00`, so the first tuple will sort first.
The list ends with a 0x00 byte. Nulls within the tuple are encoded as `\x00\xff`. There is no other null escaping. In particular, 0x00 bytes that are within the nested types can be left as-is as they are passed over when decoding the interior types. To show how this fixes the bug in the previous version of nested tuples, the empty tuple is now encoded as `\x05\x00` while the tuple containing only null is encoded as `\x05\x00\xff\x00`, so the first tuple will sort first.
### **Negative arbitrary-precision Integer**
@ -90,7 +90,7 @@ Typecodes:
&nbsp;`0x21` - double (64 bits)
&nbsp;`0x22` - long double (80 bits)
Length: 4 - 10 bytes
Test case: `pack( -42f ) == b'=\xd7\xff\xff'`
Test case: `pack( -42f ) == b'\x20\x3d\xd7\xff\xff'`
Encoding: Big-endian IEEE binary representation, followed by the following transformation:
```python
if ord(rep[0])&0x80: # Check sign bit

View File

@ -13,6 +13,7 @@ Improved replication mechanism, a new hierarchical replication technique that fu
* Get read version, read, and commit requests are counted and aggregated by server-side latency in configurable latency bands and output in JSON status. `(PR #1084) <https://github.com/apple/foundationdb/pull/1084>`_
* Added configuration option to choose log spilling implementation `(PR #1160) <https://github.com/apple/foundationdb/pull/1160>`_
* Added configuration option to choose log system implementation `(PR #1160) <https://github.com/apple/foundationdb/pull/1160>`_
* Batch priority transactions are now limited separately by ratekeeper and will be throttled at lower levels of cluster saturation. This makes it possible to run a more intense background load at saturation without significantly affecting normal priority transactions. It is still recommended not to run excessive loads at batch priority. `(PR #1198) <https://github.com/apple/foundationdb/pull/1198>`_
Performance
-----------
@ -20,6 +21,8 @@ Performance
Fixes
-----
* Python: Creating a ``SingleFloat`` for the tuple layer didn't work with integers. `(PR #1216) <https://github.com/apple/foundationdb/pull/1216>`_
Status
------

View File

@ -69,8 +69,8 @@ Code
In this example, were storing user data based on user_ID but sometimes need to retrieve users based on their zipcode. We use a transactional function to set user data and its index and another to retrieve data using the index.
::
user = Subspace(('user',))
index = Subspace(('zipcode_index',))
user = fdb.Subspace(('user',))
index = fdb.Subspace(('zipcode_index',))
@fdb.transactional
def set_user(tr, ID, name, zipcode):
@ -80,11 +80,14 @@ In this example, were storing user data based on user_ID but sometimes need t
# Normal lookup
@fdb.transactional
def get_user(tr, ID):
return tr[user[ID]]
for k,v in tr[user[ID].range()]:
return v
return None
# Index lookup
@fdb.transactional
def get_user_IDs_in_region(tr, region):
return [index.unpack(k)[1] for k, _ in tr[index[region].range()]]
def get_user_IDs_in_region(tr, zipcode):
return [index.unpack(k)[1] for k, _ in tr[index[zipcode].range()]]
That's just about all you need to create an index.

View File

@ -162,6 +162,9 @@ public:
int mvCacheInsertLocation;
std::vector<std::pair<Version, Optional<Value>>> metadataVersionCache;
HealthMetrics healthMetrics;
Future<Void> updateHealthMetrics;
};
#endif

View File

@ -724,4 +724,72 @@ struct ClusterControllerPriorityInfo {
}
};
struct HealthMetrics {
struct StorageStats {
int64_t storageQueue;
int64_t storageDurabilityLag;
double diskUsage;
double cpuUsage;
bool operator==(StorageStats const &r) const {
return (
(storageQueue == r.storageQueue) &&
(storageDurabilityLag == r.storageDurabilityLag) &&
(diskUsage == r.diskUsage) &&
(cpuUsage == r.cpuUsage)
);
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, storageQueue, storageDurabilityLag, diskUsage, cpuUsage);
}
};
int64_t worstStorageQueue;
int64_t worstStorageDurabilityLag;
int64_t worstTLogQueue;
double tpsLimit;
std::map<UID, StorageStats> storageStats;
std::map<UID, int64_t> tLogQueue;
HealthMetrics()
: worstStorageQueue(0)
, worstStorageDurabilityLag(0)
, worstTLogQueue(0)
, tpsLimit(0.0)
{}
void update(const HealthMetrics& hm, bool detailedInput, bool detailedOutput)
{
worstStorageQueue = hm.worstStorageQueue;
worstStorageDurabilityLag = hm.worstStorageDurabilityLag;
worstTLogQueue = hm.worstTLogQueue;
tpsLimit = hm.tpsLimit;
if (!detailedOutput) {
storageStats.clear();
tLogQueue.clear();
} else if (detailedInput) {
storageStats = hm.storageStats;
tLogQueue = hm.tLogQueue;
}
}
bool operator==(HealthMetrics const& r) const {
return (
worstStorageQueue == r.worstStorageQueue &&
worstStorageDurabilityLag == r.worstStorageDurabilityLag &&
worstTLogQueue == r.worstTLogQueue &&
storageStats == r.storageStats &&
tLogQueue == r.tLogQueue
);
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, worstStorageQueue, worstStorageDurabilityLag, worstTLogQueue, tpsLimit, storageStats, tLogQueue);
}
};
#endif

View File

@ -71,6 +71,8 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( STORAGE_METRICS_SHARD_LIMIT, 100 ); if( randomize && BUGGIFY ) STORAGE_METRICS_SHARD_LIMIT = 3;
init( STORAGE_METRICS_UNFAIR_SPLIT_LIMIT, 2.0/3.0 );
init( STORAGE_METRICS_TOO_MANY_SHARDS_DELAY, 15.0 );
init( UPDATE_HEALTH_METRICS_INTERVAL, 0.5 );
init( UPDATE_DETAILED_HEALTH_METRICS_INTERVAL, 5.0 );
//KeyRangeMap
init( KRM_GET_RANGE_LIMIT, 1e5 ); if( randomize && BUGGIFY ) KRM_GET_RANGE_LIMIT = 10;

View File

@ -70,6 +70,8 @@ public:
int STORAGE_METRICS_SHARD_LIMIT;
double STORAGE_METRICS_UNFAIR_SPLIT_LIMIT;
double STORAGE_METRICS_TOO_MANY_SHARDS_DELAY;
double UPDATE_HEALTH_METRICS_INTERVAL;
double UPDATE_DETAILED_HEALTH_METRICS_INTERVAL;
//KeyRangeMap
int KRM_GET_RANGE_LIMIT;

View File

@ -45,6 +45,8 @@ struct MasterProxyInterface {
RequestStream< struct GetRawCommittedVersionRequest > getRawCommittedVersion;
RequestStream< struct TxnStateRequest > txnState;
RequestStream< struct GetHealthMetricsRequest > getHealthMetrics;
UID id() const { return commit.getEndpoint().token; }
std::string toString() const { return id().shortString(); }
bool operator == (MasterProxyInterface const& r) const { return id() == r.id(); }
@ -53,7 +55,9 @@ struct MasterProxyInterface {
template <class Archive>
void serialize(Archive& ar) {
serializer(ar, locality, commit, getConsistentReadVersion, getKeyServersLocations, waitFailure, getStorageServerRejoinInfo, getRawCommittedVersion, txnState);
serializer(ar, locality, commit, getConsistentReadVersion, getKeyServersLocations,
waitFailure, getStorageServerRejoinInfo, getRawCommittedVersion,
txnState, getHealthMetrics);
}
void initEndpoints() {
@ -231,4 +235,47 @@ struct TxnStateRequest {
}
};
struct GetHealthMetricsRequest
{
ReplyPromise<struct GetHealthMetricsReply> reply;
bool detailed;
explicit GetHealthMetricsRequest(bool detailed = false) : detailed(detailed) {}
template <class Ar>
void serialize(Ar& ar)
{
serializer(ar, reply, detailed);
}
};
struct GetHealthMetricsReply
{
Standalone<StringRef> serialized;
HealthMetrics healthMetrics;
explicit GetHealthMetricsReply(const HealthMetrics& healthMetrics = HealthMetrics()) :
healthMetrics(healthMetrics)
{
update(healthMetrics, true, true);
}
void update(const HealthMetrics& healthMetrics, bool detailedInput, bool detailedOutput)
{
this->healthMetrics.update(healthMetrics, detailedInput, detailedOutput);
BinaryWriter bw(IncludeVersion());
bw << this->healthMetrics;
serialized = Standalone<StringRef>(bw.toStringRef());
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, serialized);
if (ar.isDeserializing) {
BinaryReader br(serialized, IncludeVersion());
br >> healthMetrics;
}
}
};
#endif

View File

@ -466,6 +466,32 @@ ACTOR static Future<Void> monitorMasterProxiesChange(Reference<AsyncVar<ClientDB
}
}
ACTOR static Future<Void> updateHealthMetricsActor(DatabaseContext *cx) {
state bool sendDetailedHealthMetrics = networkOptions.sendDetailedHealthMetrics;
state double lastDetailed = 0;
loop {
wait( delay(CLIENT_KNOBS->UPDATE_HEALTH_METRICS_INTERVAL) );
state bool sendDetailed = networkOptions.sendDetailedHealthMetrics && now() - lastDetailed > CLIENT_KNOBS->UPDATE_DETAILED_HEALTH_METRICS_INTERVAL;
loop {
choose {
when(wait(cx->onMasterProxiesChanged())) {}
when(state GetHealthMetricsReply rep =
wait(cx->getMasterProxies().isValid() && cx->getMasterProxies()->size() ?
loadBalance(cx->getMasterProxies(),
&MasterProxyInterface::getHealthMetrics,
GetHealthMetricsRequest(sendDetailed)) :
Never())) {
cx->healthMetrics.update(rep.healthMetrics, sendDetailed, true);
break;
}
}
}
if(sendDetailed) {
lastDetailed = now();
}
}
}
DatabaseContext::DatabaseContext(
Reference<Cluster> cluster, Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor, Standalone<StringRef> dbId,
int taskID, LocalityData const& clientLocality, bool enableLocalityLoadBalance, bool lockAware, int apiVersion )
@ -489,6 +515,8 @@ DatabaseContext::DatabaseContext(
monitorMasterProxiesInfoChange = monitorMasterProxiesChange(clientInfo, &masterProxiesChangeTrigger);
clientStatusUpdater.actor = clientStatusUpdateActor(this);
updateHealthMetrics = updateHealthMetricsActor(this);
}
DatabaseContext::DatabaseContext( const Error &err ) : deferredError(err), latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000) {}
@ -911,6 +939,17 @@ void setNetworkOption(FDBNetworkOptions::Option option, Optional<StringRef> valu
validateOptionValue(value, false);
networkOptions.slowTaskProfilingEnabled = true;
break;
case FDBNetworkOptions::SEND_DETAILED_HEALTH_METRICS:
validateOptionValue(value, true);
int sendDetailedHealthMetrics;
try {
sendDetailedHealthMetrics = std::stoi(value.get().toString());
} catch (...) {
TraceEvent(SevWarnAlways, "InvalidDetailedMetricsOptionValue").detail("Value", value.get().toString());
throw invalid_option_value();
}
networkOptions.sendDetailedHealthMetrics = (sendDetailedHealthMetrics > 0);
break;
default:
break;
}

View File

@ -56,12 +56,13 @@ struct NetworkOptions {
Optional<bool> logClientInfo;
Standalone<VectorRef<ClientVersionRef>> supportedVersions;
bool slowTaskProfilingEnabled;
bool sendDetailedHealthMetrics;
// The default values, TRACE_DEFAULT_ROLL_SIZE and TRACE_DEFAULT_MAX_LOGS_SIZE are located in Trace.h.
NetworkOptions()
: localAddress(""), clusterFile(""), traceDirectory(Optional<std::string>()),
traceRollSize(TRACE_DEFAULT_ROLL_SIZE), traceMaxLogsSize(TRACE_DEFAULT_MAX_LOGS_SIZE), traceLogGroup("default"),
traceFormat("xml"), slowTaskProfilingEnabled(false) {}
traceFormat("xml"), slowTaskProfilingEnabled(false), sendDetailedHealthMetrics(false) {}
};
class Database {

View File

@ -230,6 +230,25 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
},
"qos":{
"worst_queue_bytes_log_server":460,
"batch_performance_limited_by":{
"reason_server_id":"7f8d623d0cb9966e",
"reason_id":0,
"name":{
"$enum":[
"workload",
"storage_server_write_queue_size",
"storage_server_write_bandwidth_mvcc",
"storage_server_readable_behind",
"log_server_mvcc_write_bandwidth",
"log_server_write_queue",
"storage_server_min_free_space",
"storage_server_min_free_space_ratio",
"log_server_min_free_space",
"log_server_min_free_space_ratio"
]
},
"description":"The database is not being saturated by the workload."
},
"performance_limited_by":{
"reason_server_id":"7f8d623d0cb9966e",
"reason_id":0,
@ -249,7 +268,9 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
},
"description":"The database is not being saturated by the workload."
},
"batch_transactions_per_second_limit":0,
"transactions_per_second_limit":0,
"batch_released_transactions_per_second":0,
"released_transactions_per_second":0,
"limiting_queue_bytes_storage_server":0,
"worst_queue_bytes_storage_server":0,

View File

@ -346,11 +346,14 @@ struct StorageQueuingMetricsReply {
int64_t instanceID; // changes if bytesDurable and bytesInput reset
int64_t bytesDurable, bytesInput;
StorageBytes storageBytes;
Version v; // current storage server version
Version version; // current storage server version
Version durableVersion; // latest version durable on storage server
double cpuUsage;
double diskUsage;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, localTime, instanceID, bytesDurable, bytesInput, v, storageBytes);
serializer(ar, localTime, instanceID, bytesDurable, bytesInput, version, storageBytes, durableVersion, cpuUsage, diskUsage);
}
};

View File

@ -107,6 +107,9 @@ description is not currently required but encouraged.
description="Disables logging of client statistics, such as sampled transaction activity." />
<Option name="enable_slow_task_profiling" code="71"
description="Enables debugging feature to perform slow task profiling. Requires trace logging to be enabled. WARNING: this feature is not recommended for use in production." />
<Option name="send_detailed_health_metrics" code="72"
paramType="Int" paramDescription="Positive value enables sending of detailed health metrics"
description="Send per-process map of health metrics to client"/>
<Option name="supported_client_versions" code="1000"
paramType="String" paramDescription="[release version],[source version],[protocol version];..."
description="This option is set automatically to communicate the list of supported clients to the active client."

View File

@ -162,6 +162,7 @@ set(FDBSERVER_SRCS
workloads/TargetedKill.actor.cpp
workloads/TaskBucketCorrectness.actor.cpp
workloads/ThreadSafety.actor.cpp
workloads/Throttling.actor.cpp
workloads/Throughput.actor.cpp
workloads/TimeKeeperCorrectness.actor.cpp
workloads/UnitPerf.actor.cpp

View File

@ -21,6 +21,7 @@
#ifndef FDBSERVER_DATADISTRIBUTORINTERFACE_H
#define FDBSERVER_DATADISTRIBUTORINTERFACE_H
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/Locality.h"
@ -51,24 +52,29 @@ struct DataDistributorInterface {
struct GetRateInfoRequest {
UID requesterID;
int64_t totalReleasedTransactions;
int64_t batchReleasedTransactions;
bool detailed;
ReplyPromise<struct GetRateInfoReply> reply;
GetRateInfoRequest() {}
GetRateInfoRequest( UID const& requesterID, int64_t totalReleasedTransactions ) : requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions) {}
GetRateInfoRequest(UID const& requesterID, int64_t totalReleasedTransactions, int64_t batchReleasedTransactions, bool detailed)
: requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions), batchReleasedTransactions(batchReleasedTransactions), detailed(detailed) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, requesterID, totalReleasedTransactions, reply);
serializer(ar, requesterID, totalReleasedTransactions, batchReleasedTransactions, detailed, reply);
}
};
struct GetRateInfoReply {
double transactionRate;
double batchTransactionRate;
double leaseDuration;
HealthMetrics healthMetrics;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, transactionRate, leaseDuration);
serializer(ar, transactionRate, batchTransactionRate, leaseDuration, healthMetrics);
}
};

View File

@ -342,15 +342,20 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( SMOOTHING_AMOUNT, 1.0 ); if( slowRateKeeper ) SMOOTHING_AMOUNT = 5.0;
init( SLOW_SMOOTHING_AMOUNT, 10.0 ); if( slowRateKeeper ) SLOW_SMOOTHING_AMOUNT = 50.0;
init( METRIC_UPDATE_RATE, .1 ); if( slowRateKeeper ) METRIC_UPDATE_RATE = 0.5;
init( DETAILED_METRIC_UPDATE_RATE, 5.0 );
bool smallStorageTarget = randomize && BUGGIFY;
init( TARGET_BYTES_PER_STORAGE_SERVER, 1000e6 ); if( smallStorageTarget ) TARGET_BYTES_PER_STORAGE_SERVER = 3000e3;
init( SPRING_BYTES_STORAGE_SERVER, 100e6 ); if( smallStorageTarget ) SPRING_BYTES_STORAGE_SERVER = 300e3;
init( TARGET_BYTES_PER_STORAGE_SERVER_BATCH, 500e6 ); if( smallStorageTarget ) TARGET_BYTES_PER_STORAGE_SERVER_BATCH = 1500e3;
init( SPRING_BYTES_STORAGE_SERVER_BATCH, 50e6 ); if( smallStorageTarget ) SPRING_BYTES_STORAGE_SERVER_BATCH = 150e3;
init( STORAGE_HARD_LIMIT_BYTES, 1500e6 ); if( smallStorageTarget ) STORAGE_HARD_LIMIT_BYTES = 4500e3;
bool smallTlogTarget = randomize && BUGGIFY;
init( TARGET_BYTES_PER_TLOG, 2400e6 ); if( smallTlogTarget ) TARGET_BYTES_PER_TLOG = 2000e3;
init( SPRING_BYTES_TLOG, 400e6 ); if( smallTlogTarget ) SPRING_BYTES_TLOG = 200e3;
init( TARGET_BYTES_PER_TLOG_BATCH, 1000e6 ); if( smallTlogTarget ) TARGET_BYTES_PER_TLOG_BATCH = 1000e3;
init( SPRING_BYTES_TLOG_BATCH, 200e6 ); if( smallTlogTarget ) SPRING_BYTES_TLOG_BATCH = 100e3;
init( TLOG_SPILL_THRESHOLD, 1500e6 ); if( smallTlogTarget ) TLOG_SPILL_THRESHOLD = 1500e3; if( randomize && BUGGIFY ) TLOG_SPILL_THRESHOLD = 0;
init( TLOG_HARD_LIMIT_BYTES, 3000e6 ); if( smallTlogTarget ) TLOG_HARD_LIMIT_BYTES = 3000e3;
init( TLOG_RECOVER_MEMORY_LIMIT, TARGET_BYTES_PER_TLOG + SPRING_BYTES_TLOG );
@ -361,6 +366,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( MIN_FREE_SPACE_RATIO, 0.05 );
init( MAX_TL_SS_VERSION_DIFFERENCE, 1e99 ); // if( randomize && BUGGIFY ) MAX_TL_SS_VERSION_DIFFERENCE = std::max(1.0, 0.25 * VERSIONS_PER_SECOND); // spring starts at half this value //FIXME: this knob causes ratekeeper to clamp on idle cluster in simulation that have a large number of logs
init( MAX_TL_SS_VERSION_DIFFERENCE_BATCH, 1e99 );
init( MAX_MACHINES_FALLING_BEHIND, 1 );
//Storage Metrics
@ -389,6 +395,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs) {
init( LONG_BYTE_SAMPLE_RECOVERY_DELAY, 60.0 );
init( BYTE_SAMPLE_LOAD_PARALLELISM, 32 ); if( randomize && BUGGIFY ) BYTE_SAMPLE_LOAD_PARALLELISM = 1;
init( BYTE_SAMPLE_LOAD_DELAY, 0.0 ); if( randomize && BUGGIFY ) BYTE_SAMPLE_LOAD_DELAY = 0.1;
init( UPDATE_STORAGE_PROCESS_STATS_INTERVAL, 5.0 );
//Wait Failure
init( BUGGIFY_OUTSTANDING_WAIT_FAILURE_REQUESTS, 2 );

View File

@ -282,13 +282,18 @@ public:
double SMOOTHING_AMOUNT;
double SLOW_SMOOTHING_AMOUNT;
double METRIC_UPDATE_RATE;
double DETAILED_METRIC_UPDATE_RATE;
double LAST_LIMITED_RATIO;
int64_t TARGET_BYTES_PER_STORAGE_SERVER;
double SPRING_BYTES_STORAGE_SERVER;
int64_t SPRING_BYTES_STORAGE_SERVER;
int64_t TARGET_BYTES_PER_STORAGE_SERVER_BATCH;
int64_t SPRING_BYTES_STORAGE_SERVER_BATCH;
int64_t TARGET_BYTES_PER_TLOG;
double SPRING_BYTES_TLOG;
int64_t SPRING_BYTES_TLOG;
int64_t TARGET_BYTES_PER_TLOG_BATCH;
int64_t SPRING_BYTES_TLOG_BATCH;
int64_t TLOG_SPILL_THRESHOLD;
int64_t TLOG_HARD_LIMIT_BYTES;
int64_t TLOG_RECOVER_MEMORY_LIMIT;
@ -299,6 +304,7 @@ public:
double MIN_FREE_SPACE_RATIO;
double MAX_TL_SS_VERSION_DIFFERENCE; // spring starts at half this value
double MAX_TL_SS_VERSION_DIFFERENCE_BATCH;
int MAX_MACHINES_FALLING_BEHIND;
//Storage Metrics
@ -328,6 +334,7 @@ public:
double LONG_BYTE_SAMPLE_RECOVERY_DELAY;
int BYTE_SAMPLE_LOAD_PARALLELISM;
double BYTE_SAMPLE_LOAD_DELAY;
double UPDATE_STORAGE_PROCESS_STATS_INTERVAL;
//Wait Failure
int BUGGIFY_OUTSTANDING_WAIT_FAILURE_REQUESTS;

View File

@ -87,19 +87,26 @@ Future<Void> forwardValue(Promise<T> out, Future<T> in)
int getBytes(Promise<Version> const& r) { return 0; }
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, double* outTransactionRate) {
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, double* outTransactionRate,
double* outBatchTransactionRate, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply) {
state Future<Void> nextRequestTimer = Never();
state Future<Void> leaseTimeout = Never();
state Future<GetRateInfoReply> reply = Never();
state double lastDetailedReply = 0.0; // request detailed metrics immediately
state bool expectingDetailedReply = false;
state int64_t lastTC = 0;
if (db->get().distributor.present()) nextRequestTimer = Void();
if (db->get().distributor.present()) {
nextRequestTimer = Void();
}
loop choose {
when ( wait( db->onChange() ) ) {
if ( db->get().distributor.present() ) {
TraceEvent("Proxy_DataDistributorChanged", myID)
.detail("DDID", db->get().distributor.get().id());
nextRequestTimer = Void(); // trigger GetRate request
nextRequestTimer = Void(); // trigger GetRate request
} else {
TraceEvent("Proxy_DataDistributorDied", myID);
nextRequestTimer = Never();
@ -108,19 +115,28 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
}
when ( wait( nextRequestTimer ) ) {
nextRequestTimer = Never();
reply = brokenPromiseToNever(db->get().distributor.get().getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount)));
bool detailed = now() - lastDetailedReply > SERVER_KNOBS->DETAILED_METRIC_UPDATE_RATE;
reply = brokenPromiseToNever(db->get().distributor.get().getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, detailed)));
expectingDetailedReply = detailed;
}
when ( GetRateInfoReply rep = wait(reply) ) {
reply = Never();
*outTransactionRate = rep.transactionRate;
// TraceEvent("MasterProxyRate", myID).detail("Rate", rep.transactionRate).detail("Lease", rep.leaseDuration).detail("ReleasedTransactions", *inTransactionCount - lastTC);
*outBatchTransactionRate = rep.batchTransactionRate;
//TraceEvent("MasterProxyRate", myID).detail("Rate", rep.transactionRate).detail("BatchRate", rep.batchTransactionRate).detail("Lease", rep.leaseDuration).detail("ReleasedTransactions", *inTransactionCount - lastTC);
lastTC = *inTransactionCount;
leaseTimeout = delay(rep.leaseDuration);
nextRequestTimer = delayJittered(rep.leaseDuration / 2);
healthMetricsReply->update(rep.healthMetrics, expectingDetailedReply, true);
if (expectingDetailedReply) {
detailedHealthMetricsReply->update(rep.healthMetrics, true, true);
lastDetailedReply = now();
}
}
when ( wait(leaseTimeout ) ) {
when ( wait( leaseTimeout ) ) {
*outTransactionRate = 0;
// TraceEvent("MasterProxyRate", myID).detail("Rate", 0).detail("Lease", "Expired");
*outBatchTransactionRate = 0;
//TraceEvent("MasterProxyRate", myID).detail("Rate", 0).detail("BatchRate", 0).detail("Lease", "Expired");
leaseTimeout = Never();
}
}
@ -1066,6 +1082,27 @@ ACTOR Future<Void> fetchVersions(ProxyCommitData *commitData) {
}
}
struct TransactionRateInfo {
double rate;
double budget;
double limit;
TransactionRateInfo(double rate) : rate(rate), budget(0), limit(0) {}
void reset(double elapsed) {
this->limit = std::min(rate * elapsed, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START) + budget;
}
bool canStart(int64_t numToStart, int64_t numAlreadyStarted) {
return numToStart + numAlreadyStarted < limit || numToStart * g_random->random01() + numAlreadyStarted < limit - std::max(0.0, budget);
}
void updateBudget(int64_t numStarted) {
budget = std::max(std::min<double>(limit - numStarted, SERVER_KNOBS->START_TRANSACTION_MAX_BUDGET_SIZE), -SERVER_KNOBS->START_TRANSACTION_MAX_BUDGET_SIZE);
}
};
ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::vector<GetReadVersionRequest> requests, ProxyStats *stats) {
GetReadVersionReply reply = wait(replyFuture);
double end = timer();
@ -1081,21 +1118,23 @@ ACTOR static Future<Void> transactionStarter(
MasterProxyInterface proxy,
Reference<AsyncVar<ServerDBInfo>> db,
PromiseStream<Future<Void>> addActor,
ProxyCommitData* commitData
)
ProxyCommitData* commitData, GetHealthMetricsReply* healthMetricsReply,
GetHealthMetricsReply* detailedHealthMetricsReply)
{
state double lastGRVTime = 0;
state PromiseStream<Void> GRVTimer;
state double GRVBatchTime = SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_MIN;
state int64_t transactionCount = 0;
state double transactionBudget = 0;
state double transactionRate = 10;
state int64_t batchTransactionCount = 0;
state TransactionRateInfo normalRateInfo(10);
state TransactionRateInfo batchRateInfo(0);
state std::priority_queue<std::pair<GetReadVersionRequest, int64_t>, std::vector<std::pair<GetReadVersionRequest, int64_t>>> transactionQueue;
state vector<MasterProxyInterface> otherProxies;
state PromiseStream<double> replyTimes;
addActor.send( getRate(proxy.id(), db, &transactionCount, &transactionRate) );
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo.rate, &batchRateInfo.rate, healthMetricsReply, detailedHealthMetricsReply));
addActor.send(queueTransactionStartRequests(&transactionQueue, proxy.getConsistentReadVersion.getFuture(), GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats));
// Get a list of the other proxies that go together with us
@ -1118,7 +1157,9 @@ ACTOR static Future<Void> transactionStarter(
lastGRVTime = t;
if(elapsed == 0) elapsed = 1e-15; // resolve a possible indeterminant multiplication with infinite transaction rate
double nTransactionsToStart = std::min(transactionRate * elapsed, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START) + transactionBudget;
normalRateInfo.reset(elapsed);
batchRateInfo.reset(elapsed);
int transactionsStarted[2] = {0,0};
int systemTransactionsStarted[2] = {0,0};
@ -1129,13 +1170,17 @@ ACTOR static Future<Void> transactionStarter(
Optional<UID> debugID;
double leftToStart = 0;
double batchLeftToStart = 0;
while (!transactionQueue.empty()) {
auto& req = transactionQueue.top().first;
int tc = req.transactionCount;
leftToStart = nTransactionsToStart - transactionsStarted[0] - transactionsStarted[1];
bool startNext = tc < leftToStart || req.priority() >= GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE || tc * g_random->random01() < leftToStart - std::max(0.0, transactionBudget);
if (!startNext) break;
if(req.priority() < GetReadVersionRequest::PRIORITY_DEFAULT && !batchRateInfo.canStart(tc, transactionsStarted[0] + transactionsStarted[1])) {
break;
}
else if(req.priority() < GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE && !normalRateInfo.canStart(tc, transactionsStarted[0] + transactionsStarted[1])) {
break;
}
if (req.debugID.present()) {
if (!debugID.present()) debugID = g_nondeterministic_random->randomUniqueID();
@ -1166,10 +1211,15 @@ ACTOR static Future<Void> transactionStarter(
.detail("NumSystemTransactionsStarted", systemTransactionsStarted[0] + systemTransactionsStarted[1])
.detail("NumNonSystemTransactionsStarted", transactionsStarted[0] + transactionsStarted[1] - systemTransactionsStarted[0] - systemTransactionsStarted[1])
.detail("TransactionBudget", transactionBudget)
.detail("LastLeftToStart", leftToStart);*/
.detail("BatchTransactionBudget", batchTransactionBudget)
.detail("LastLeftToStart", leftToStart)
.detail("LastBatchLeftToStart", batchLeftToStart);*/
transactionCount += transactionsStarted[0] + transactionsStarted[1];
transactionBudget = std::max(std::min(nTransactionsToStart - transactionsStarted[0] - transactionsStarted[1], SERVER_KNOBS->START_TRANSACTION_MAX_BUDGET_SIZE), -SERVER_KNOBS->START_TRANSACTION_MAX_BUDGET_SIZE);
batchTransactionCount += batchPriTransactionsStarted[0] + batchPriTransactionsStarted[1];
normalRateInfo.updateBudget(transactionsStarted[0] + transactionsStarted[1]);
batchRateInfo.updateBudget(transactionsStarted[0] + transactionsStarted[1]);
if (debugID.present()) {
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.masterProxyServerCore.Broadcast");
@ -1304,6 +1354,22 @@ ACTOR static Future<Void> readRequestServer(
}
}
ACTOR Future<Void> healthMetricsRequestServer(MasterProxyInterface proxy, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply)
{
loop {
choose {
when(GetHealthMetricsRequest req =
waitNext(proxy.getHealthMetrics.getFuture()))
{
if (req.detailed)
req.reply.send(*detailedHealthMetricsReply);
else
req.reply.send(*healthMetricsReply);
}
}
}
}
ACTOR Future<Void> monitorRemoteCommitted(ProxyCommitData* self, Reference<AsyncVar<ServerDBInfo>> db) {
loop {
wait(delay(0)); //allow this actor to be cancelled if we are removed after db changes.
@ -1382,6 +1448,9 @@ ACTOR Future<Void> masterProxyServerCore(
state std::set<Sequence> txnSequences;
state Sequence maxSequence = std::numeric_limits<Sequence>::max();
state GetHealthMetricsReply healthMetricsReply;
state GetHealthMetricsReply detailedHealthMetricsReply;
addActor.send( fetchVersions(&commitData) );
addActor.send( waitFailureServer(proxy.waitFailure.getFuture()) );
@ -1412,8 +1481,9 @@ ACTOR Future<Void> masterProxyServerCore(
TraceEvent(SevInfo, "CommitBatchesMemoryLimit").detail("BytesLimit", commitBatchesMemoryLimit);
addActor.send(monitorRemoteCommitted(&commitData, db));
addActor.send(transactionStarter(proxy, db, addActor, &commitData));
addActor.send(transactionStarter(proxy, db, addActor, &commitData, &healthMetricsReply, &detailedHealthMetricsReply));
addActor.send(readRequestServer(proxy, &commitData));
addActor.send(healthMetricsRequestServer(proxy, &healthMetricsReply, &detailedHealthMetricsReply));
// wait for txnStateStore recovery
wait(success(commitData.txnStateStore->readValue(StringRef())));

View File

@ -85,11 +85,10 @@ struct StorageQueueInfo {
Smoother smoothDurableVersion, smoothLatestVersion;
Smoother smoothFreeSpace;
Smoother smoothTotalSpace;
limitReason_t limitReason;
StorageQueueInfo(UID id, LocalityData locality) : valid(false), id(id), locality(locality), smoothDurableBytes(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothInputBytes(SERVER_KNOBS->SMOOTHING_AMOUNT), verySmoothDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
smoothDurableVersion(1.), smoothLatestVersion(1.), smoothFreeSpace(SERVER_KNOBS->SMOOTHING_AMOUNT),
smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT), limitReason(limitReason_t::unlimited)
smoothTotalSpace(SERVER_KNOBS->SMOOTHING_AMOUNT)
{
// FIXME: this is a tacky workaround for a potential uninitialized use in trackStorageServerQueueInfo
lastReply.instanceID = -1;
@ -112,25 +111,63 @@ struct TLogQueueInfo {
}
};
struct RatekeeperLimits {
double tpsLimit;
Int64MetricHandle tpsLimitMetric;
Int64MetricHandle reasonMetric;
int64_t storageTargetBytes;
int64_t storageSpringBytes;
int64_t logTargetBytes;
int64_t logSpringBytes;
int64_t maxVersionDifference;
std::string context;
RatekeeperLimits(std::string context, int64_t storageTargetBytes, int64_t storageSpringBytes, int64_t logTargetBytes, int64_t logSpringBytes, int64_t maxVersionDifference) :
tpsLimit(std::numeric_limits<double>::infinity()),
tpsLimitMetric(StringRef("Ratekeeper.TPSLimit" + context)),
reasonMetric(StringRef("Ratekeeper.Reason" + context)),
storageTargetBytes(storageTargetBytes),
storageSpringBytes(storageSpringBytes),
logTargetBytes(logTargetBytes),
logSpringBytes(logSpringBytes),
maxVersionDifference(maxVersionDifference),
context(context)
{}
};
struct TransactionCounts {
int64_t total;
int64_t batch;
double time;
TransactionCounts() : total(0), batch(0), time(0) {}
};
struct Ratekeeper {
Map<UID, StorageQueueInfo> storageQueueInfo;
Map<UID, TLogQueueInfo> tlogQueueInfo;
std::map<UID, std::pair<int64_t, double> > proxy_transactionCountAndTime;
Smoother smoothReleasedTransactions, smoothTotalDurableBytes;
double TPSLimit;
std::map<UID, TransactionCounts> proxy_transactionCounts;
Smoother smoothReleasedTransactions, smoothBatchReleasedTransactions, smoothTotalDurableBytes;
HealthMetrics healthMetrics;
DatabaseConfiguration configuration;
Int64MetricHandle tpsLimitMetric;
Int64MetricHandle actualTpsMetric;
Int64MetricHandle reasonMetric;
double lastWarning;
double* lastLimited;
Ratekeeper() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT), TPSLimit(std::numeric_limits<double>::infinity()),
tpsLimitMetric(LiteralStringRef("Ratekeeper.TPSLimit")),
RatekeeperLimits normalLimits;
RatekeeperLimits batchLimits;
Ratekeeper() : smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")),
reasonMetric(LiteralStringRef("Ratekeeper.Reason")),
lastWarning(0)
lastWarning(0),
normalLimits("", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, SERVER_KNOBS->TARGET_BYTES_PER_TLOG, SERVER_KNOBS->SPRING_BYTES_TLOG, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE),
batchLimits("Batch", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH)
{}
};
@ -152,6 +189,8 @@ ACTOR Future<Void> trackStorageServerQueueInfo( Ratekeeper* self, StorageServerI
myQueueInfo->value.smoothInputBytes.reset(reply.get().bytesInput);
myQueueInfo->value.smoothFreeSpace.reset(reply.get().storageBytes.available);
myQueueInfo->value.smoothTotalSpace.reset(reply.get().storageBytes.total);
myQueueInfo->value.smoothDurableVersion.reset(reply.get().durableVersion);
myQueueInfo->value.smoothLatestVersion.reset(reply.get().version);
} else {
self->smoothTotalDurableBytes.addDelta( reply.get().bytesDurable - myQueueInfo->value.prevReply.bytesDurable );
myQueueInfo->value.smoothDurableBytes.setTotal( reply.get().bytesDurable );
@ -159,6 +198,8 @@ ACTOR Future<Void> trackStorageServerQueueInfo( Ratekeeper* self, StorageServerI
myQueueInfo->value.smoothInputBytes.setTotal( reply.get().bytesInput );
myQueueInfo->value.smoothFreeSpace.setTotal( reply.get().storageBytes.available );
myQueueInfo->value.smoothTotalSpace.setTotal( reply.get().storageBytes.total );
myQueueInfo->value.smoothDurableVersion.setTotal(reply.get().durableVersion);
myQueueInfo->value.smoothLatestVersion.setTotal(reply.get().version);
}
} else {
if(myQueueInfo->value.valid) {
@ -248,15 +289,15 @@ ACTOR Future<Void> trackEachStorageServer(
}
}
void updateRate( Ratekeeper* self ) {
void updateRate( Ratekeeper* self, RatekeeperLimits &limits ) {
//double controlFactor = ; // dt / eFoldingTime
double actualTPS = self->smoothReleasedTransactions.smoothRate();
self->actualTpsMetric = (int64_t)actualTPS;
double actualTps = self->smoothReleasedTransactions.smoothRate();
self->actualTpsMetric = (int64_t)actualTps;
// SOMEDAY: Remove the max( 1.0, ... ) since the below calculations _should_ be able to recover back up from this value
actualTPS = std::max( std::max( 1.0, actualTPS ), self->smoothTotalDurableBytes.smoothRate() / CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT );
actualTps = std::max( std::max( 1.0, actualTps ), self->smoothTotalDurableBytes.smoothRate() / CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT );
self->TPSLimit = std::numeric_limits<double>::infinity();
limits.tpsLimit = std::numeric_limits<double>::infinity();
UID reasonID = UID();
limitReason_t limitReason = limitReason_t::unlimited;
@ -264,9 +305,11 @@ void updateRate( Ratekeeper* self ) {
int64_t worstFreeSpaceStorageServer = std::numeric_limits<int64_t>::max();
int64_t worstStorageQueueStorageServer = 0;
int64_t worstStorageDurabilityLagStorageServer = 0;
int64_t limitingStorageQueueStorageServer = 0;
std::multimap<double, StorageQueueInfo*> storageTPSLimitReverseIndex;
std::multimap<double, StorageQueueInfo*> storageTpsLimitReverseIndex;
std::map<UID, limitReason_t> ssReasons;
// Look at each storage server's write queue, compute and store the desired rate ratio
for(auto i = self->storageQueueInfo.begin(); i != self->storageQueueInfo.end(); ++i) {
@ -274,32 +317,43 @@ void updateRate( Ratekeeper* self ) {
if (!ss.valid) continue;
++sscount;
ss.limitReason = limitReason_t::unlimited;
limitReason_t ssLimitReason = limitReason_t::unlimited;
int64_t minFreeSpace = std::max(SERVER_KNOBS->MIN_FREE_SPACE, (int64_t)(SERVER_KNOBS->MIN_FREE_SPACE_RATIO * ss.smoothTotalSpace.smoothTotal()));
worstFreeSpaceStorageServer = std::min(worstFreeSpaceStorageServer, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace);
int64_t springBytes = std::max<int64_t>(1, std::min(SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, (ss.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2));
int64_t targetBytes = std::max<int64_t>(1, std::min(SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace));
if (targetBytes != SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER) {
int64_t springBytes = std::max<int64_t>(1, std::min<int64_t>(limits.storageSpringBytes, (ss.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2));
int64_t targetBytes = std::max<int64_t>(1, std::min(limits.storageTargetBytes, (int64_t)ss.smoothFreeSpace.smoothTotal() - minFreeSpace));
if (targetBytes != limits.storageTargetBytes) {
if (minFreeSpace == SERVER_KNOBS->MIN_FREE_SPACE) {
ss.limitReason = limitReason_t::storage_server_min_free_space;
ssLimitReason = limitReason_t::storage_server_min_free_space;
} else {
ss.limitReason = limitReason_t::storage_server_min_free_space_ratio;
ssLimitReason = limitReason_t::storage_server_min_free_space_ratio;
}
}
int64_t storageQueue = ss.lastReply.bytesInput - ss.smoothDurableBytes.smoothTotal();
worstStorageQueueStorageServer = std::max(worstStorageQueueStorageServer, storageQueue);
int64_t storageDurabilityLag = ss.smoothLatestVersion.smoothTotal() - ss.smoothDurableVersion.smoothTotal();
worstStorageDurabilityLagStorageServer = std::max(worstStorageDurabilityLagStorageServer, storageDurabilityLag);
auto& ssMetrics = self->healthMetrics.storageStats[ss.id];
ssMetrics.storageQueue = storageQueue;
ssMetrics.storageDurabilityLag = storageDurabilityLag;
ssMetrics.cpuUsage = ss.lastReply.cpuUsage;
ssMetrics.diskUsage = ss.lastReply.diskUsage;
int64_t b = storageQueue - targetBytes;
double targetRateRatio = std::min(( b + springBytes ) / (double)springBytes, 2.0);
double inputRate = ss.smoothInputBytes.smoothRate();
//inputRate = std::max( inputRate, actualTPS / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
//inputRate = std::max( inputRate, actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
/*if( g_random->random01() < 0.1 ) {
TraceEvent("RateKeeperUpdateRate", ss.id)
std::string name = "RateKeeperUpdateRate" + limits.context;
TraceEvent(name, ss.id)
.detail("MinFreeSpace", minFreeSpace)
.detail("SpringBytes", springBytes)
.detail("TargetBytes", targetBytes)
@ -309,7 +363,7 @@ void updateRate( Ratekeeper* self ) {
.detail("SmoothDurableBytesTotal", ss.smoothDurableBytes.smoothTotal())
.detail("TargetRateRatio", targetRateRatio)
.detail("SmoothInputBytesRate", ss.smoothInputBytes.smoothRate())
.detail("ActualTPS", actualTPS)
.detail("ActualTPS", actualTps)
.detail("InputRate", inputRate)
.detail("VerySmoothDurableBytesRate", ss.verySmoothDurableBytes.smoothRate())
.detail("B", b);
@ -317,33 +371,38 @@ void updateRate( Ratekeeper* self ) {
// Don't let any storage server use up its target bytes faster than its MVCC window!
double maxBytesPerSecond = (targetBytes - springBytes) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)/SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0);
double limitTPS = std::min(actualTPS * maxBytesPerSecond / std::max(1.0e-8, inputRate), maxBytesPerSecond * SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE);
if (ss.limitReason == limitReason_t::unlimited)
ss.limitReason = limitReason_t::storage_server_write_bandwidth_mvcc;
double limitTps = std::min(actualTps * maxBytesPerSecond / std::max(1.0e-8, inputRate), maxBytesPerSecond * SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE);
if (ssLimitReason == limitReason_t::unlimited)
ssLimitReason = limitReason_t::storage_server_write_bandwidth_mvcc;
if (targetRateRatio > 0 && inputRate > 0) {
ASSERT(inputRate != 0);
double smoothedRate = std::max( ss.verySmoothDurableBytes.smoothRate(), actualTPS / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
double smoothedRate = std::max( ss.verySmoothDurableBytes.smoothRate(), actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
double x = smoothedRate / (inputRate * targetRateRatio);
double lim = actualTPS * x;
if (lim < limitTPS) {
limitTPS = lim;
if (ss.limitReason == limitReason_t::unlimited || ss.limitReason == limitReason_t::storage_server_write_bandwidth_mvcc)
ss.limitReason = limitReason_t::storage_server_write_queue_size;
double lim = actualTps * x;
if (lim < limitTps) {
limitTps = lim;
if (ssLimitReason == limitReason_t::unlimited || ssLimitReason == limitReason_t::storage_server_write_bandwidth_mvcc)
ssLimitReason = limitReason_t::storage_server_write_queue_size;
}
}
storageTPSLimitReverseIndex.insert(std::make_pair(limitTPS, &ss));
storageTpsLimitReverseIndex.insert(std::make_pair(limitTps, &ss));
if(limitTPS < self->TPSLimit && (ss.limitReason == limitReason_t::storage_server_min_free_space || ss.limitReason == limitReason_t::storage_server_min_free_space_ratio)) {
if(limitTps < limits.tpsLimit && (ssLimitReason == limitReason_t::storage_server_min_free_space || ssLimitReason == limitReason_t::storage_server_min_free_space_ratio)) {
reasonID = ss.id;
self->TPSLimit = limitTPS;
limitReason = ss.limitReason;
limits.tpsLimit = limitTps;
limitReason = ssLimitReason;
}
ssReasons[ss.id] = ssLimitReason;
}
self->healthMetrics.worstStorageQueue = worstStorageQueueStorageServer;
self->healthMetrics.worstStorageDurabilityLag = worstStorageDurabilityLagStorageServer;
std::set<Optional<Standalone<StringRef>>> ignoredMachines;
for(auto ss = storageTPSLimitReverseIndex.begin(); ss != storageTPSLimitReverseIndex.end() && ss->first < self->TPSLimit; ++ss) {
for(auto ss = storageTpsLimitReverseIndex.begin(); ss != storageTpsLimitReverseIndex.end() && ss->first < limits.tpsLimit; ++ss) {
if(ignoredMachines.size() < std::min(self->configuration.storageTeamSize - 1, SERVER_KNOBS->MAX_MACHINES_FALLING_BEHIND)) {
ignoredMachines.insert(ss->second->locality.zoneId());
continue;
@ -353,9 +412,9 @@ void updateRate( Ratekeeper* self ) {
}
limitingStorageQueueStorageServer = ss->second->lastReply.bytesInput - ss->second->smoothDurableBytes.smoothTotal();
self->TPSLimit = ss->first;
limitReason = storageTPSLimitReverseIndex.begin()->second->limitReason;
reasonID = storageTPSLimitReverseIndex.begin()->second->id; // Although we aren't controlling based on the worst SS, we still report it as the limiting process
limits.tpsLimit = ss->first;
limitReason = ssReasons[storageTpsLimitReverseIndex.begin()->second->id];
reasonID = storageTpsLimitReverseIndex.begin()->second->id; // Although we aren't controlling based on the worst SS, we still report it as the limiting process
break;
}
@ -371,11 +430,11 @@ void updateRate( Ratekeeper* self ) {
auto& ss = i->value;
if (!ss.valid) continue;
minSSVer = std::min(minSSVer, ss.lastReply.v);
minSSVer = std::min(minSSVer, ss.lastReply.version);
// Machines that ratekeeper isn't controlling can fall arbitrarily far behind
if(ignoredMachines.count(i->value.locality.zoneId()) == 0) {
minLimitingSSVer = std::min(minLimitingSSVer, ss.lastReply.v);
minLimitingSSVer = std::min(minLimitingSSVer, ss.lastReply.version);
}
}
@ -387,7 +446,7 @@ void updateRate( Ratekeeper* self ) {
}
// writeToReadLatencyLimit: 0 = infinte speed; 1 = TL durable speed ; 2 = half TL durable speed
writeToReadLatencyLimit = ((maxTLVer - minLimitingSSVer) - SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE/2) / (SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE/4);
writeToReadLatencyLimit = ((maxTLVer - minLimitingSSVer) - limits.maxVersionDifference/2) / (limits.maxVersionDifference/4);
worstVersionLag = std::max((Version)0, maxTLVer - minSSVer);
limitingVersionLag = std::max((Version)0, maxTLVer - minLimitingSSVer);
}
@ -406,9 +465,9 @@ void updateRate( Ratekeeper* self ) {
worstFreeSpaceTLog = std::min(worstFreeSpaceTLog, (int64_t)tl.smoothFreeSpace.smoothTotal() - minFreeSpace);
int64_t springBytes = std::max<int64_t>(1, std::min(SERVER_KNOBS->SPRING_BYTES_TLOG, (tl.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2));
int64_t targetBytes = std::max<int64_t>(1, std::min(SERVER_KNOBS->TARGET_BYTES_PER_TLOG, (int64_t)tl.smoothFreeSpace.smoothTotal() - minFreeSpace));
if (targetBytes != SERVER_KNOBS->TARGET_BYTES_PER_TLOG) {
int64_t springBytes = std::max<int64_t>(1, std::min<int64_t>(limits.logSpringBytes, (tl.smoothFreeSpace.smoothTotal() - minFreeSpace) * 0.2));
int64_t targetBytes = std::max<int64_t>(1, std::min(limits.logTargetBytes, (int64_t)tl.smoothFreeSpace.smoothTotal() - minFreeSpace));
if (targetBytes != limits.logTargetBytes) {
if (minFreeSpace == SERVER_KNOBS->MIN_FREE_SPACE) {
tlogLimitReason = limitReason_t::log_server_min_free_space;
} else {
@ -417,6 +476,7 @@ void updateRate( Ratekeeper* self ) {
}
int64_t queue = tl.lastReply.bytesInput - tl.smoothDurableBytes.smoothTotal();
self->healthMetrics.tLogQueue[tl.id] = queue;
int64_t b = queue - targetBytes;
worstStorageQueueTLog = std::max(worstStorageQueueTLog, queue);
@ -427,7 +487,7 @@ void updateRate( Ratekeeper* self ) {
}
reasonID = tl.id;
limitReason = limitReason_t::log_server_min_free_space;
self->TPSLimit = 0.0;
limits.tpsLimit = 0.0;
}
double targetRateRatio = std::min( ( b + springBytes ) / (double)springBytes, 2.0 );
@ -440,13 +500,13 @@ void updateRate( Ratekeeper* self ) {
double inputRate = tl.smoothInputBytes.smoothRate();
if (targetRateRatio > 0) {
double smoothedRate = std::max( tl.verySmoothDurableBytes.smoothRate(), actualTPS / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
double smoothedRate = std::max( tl.verySmoothDurableBytes.smoothRate(), actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
double x = smoothedRate / (inputRate * targetRateRatio);
if (targetRateRatio < .75) //< FIXME: KNOB for 2.0
x = std::max(x, 0.95);
double lim = actualTPS * x;
if (lim < self->TPSLimit){
self->TPSLimit = lim;
double lim = actualTps * x;
if (lim < limits.tpsLimit){
limits.tpsLimit = lim;
reasonID = tl.id;
limitReason = tlogLimitReason;
}
@ -454,19 +514,21 @@ void updateRate( Ratekeeper* self ) {
if (inputRate > 0) {
// Don't let any tlogs use up its target bytes faster than its MVCC window!
double x = ((targetBytes - springBytes) / ((((double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)/SERVER_KNOBS->VERSIONS_PER_SECOND) + 2.0)) / inputRate;
double lim = actualTPS * x;
if (lim < self->TPSLimit){
self->TPSLimit = lim;
double lim = actualTps * x;
if (lim < limits.tpsLimit){
limits.tpsLimit = lim;
reasonID = tl.id;
limitReason = limitReason_t::log_server_mvcc_write_bandwidth;
}
}
}
self->TPSLimit = std::max(self->TPSLimit, 0.0);
self->healthMetrics.worstTLogQueue = worstStorageQueueTLog;
limits.tpsLimit = std::max(limits.tpsLimit, 0.0);
if(g_network->isSimulated() && g_simulator.speedUpSimulation) {
self->TPSLimit = std::max(self->TPSLimit, 100.0);
limits.tpsLimit = std::max(limits.tpsLimit, 100.0);
}
int64_t totalDiskUsageBytes = 0;
@ -477,22 +539,20 @@ void updateRate( Ratekeeper* self ) {
if (s.value.valid)
totalDiskUsageBytes += s.value.lastReply.storageBytes.used;
self->tpsLimitMetric = std::min(self->TPSLimit, 1e6);
self->reasonMetric = limitReason;
limits.tpsLimitMetric = std::min(limits.tpsLimit, 1e6);
limits.reasonMetric = limitReason;
if( self->smoothReleasedTransactions.smoothRate() > SERVER_KNOBS->LAST_LIMITED_RATIO * self->TPSLimit ) {
(*self->lastLimited) = now();
}
if (g_random->random01() < 0.1){
TraceEvent("RkUpdate")
.detail("TPSLimit", self->TPSLimit)
if (g_random->random01() < 0.1) {
std::string name = "RkUpdate" + limits.context;
TraceEvent(name.c_str())
.detail("TPSLimit", limits.tpsLimit)
.detail("Reason", limitReason)
.detail("ReasonServerID", reasonID)
.detail("ReleasedTPS", self->smoothReleasedTransactions.smoothRate())
.detail("TPSBasis", actualTPS)
.detail("ReleasedBatchTPS", self->smoothBatchReleasedTransactions.smoothRate())
.detail("TPSBasis", actualTps)
.detail("StorageServers", sscount)
.detail("Proxies", self->proxy_transactionCountAndTime.size())
.detail("Proxies", self->proxy_transactionCounts.size())
.detail("TLogs", tlcount)
.detail("WorstFreeSpaceStorageServer", worstFreeSpaceStorageServer)
.detail("WorstFreeSpaceTLog", worstFreeSpaceTLog)
@ -502,7 +562,7 @@ void updateRate( Ratekeeper* self ) {
.detail("TotalDiskUsageBytes", totalDiskUsageBytes)
.detail("WorstStorageServerVersionLag", worstVersionLag)
.detail("LimitingStorageServerVersionLag", limitingVersionLag)
.trackLatest("RkUpdate");
.trackLatest(name.c_str());
}
}
@ -561,11 +621,18 @@ ACTOR Future<Void> rateKeeper(
choose {
when (wait( track )) { break; }
when (wait( timeout )) {
updateRate( &self );
updateRate(&self, self.normalLimits);
updateRate(&self, self.batchLimits);
if(self.smoothReleasedTransactions.smoothRate() > SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit) {
*self.lastLimited = now();
}
double tooOld = now() - 1.0;
for(auto p=self.proxy_transactionCountAndTime.begin(); p!=self.proxy_transactionCountAndTime.end(); ) {
if (p->second.second < tooOld)
p = self.proxy_transactionCountAndTime.erase(p);
for(auto p=self.proxy_transactionCounts.begin(); p!=self.proxy_transactionCounts.end(); ) {
if (p->second.time < tooOld)
p = self.proxy_transactionCounts.erase(p);
else
++p;
}
@ -574,16 +641,26 @@ ACTOR Future<Void> rateKeeper(
when (GetRateInfoRequest req = waitNext(getRateInfo)) {
GetRateInfoReply reply;
auto& p = self.proxy_transactionCountAndTime[ req.requesterID ];
auto& p = self.proxy_transactionCounts[ req.requesterID ];
//TraceEvent("RKMPU", req.requesterID).detail("TRT", req.totalReleasedTransactions).detail("Last", p.first).detail("Delta", req.totalReleasedTransactions - p.first);
if (p.first > 0)
self.smoothReleasedTransactions.addDelta( req.totalReleasedTransactions - p.first );
if (p.total > 0) {
self.smoothReleasedTransactions.addDelta( req.totalReleasedTransactions - p.total );
}
if(p.batch > 0) {
self.smoothBatchReleasedTransactions.addDelta( req.batchReleasedTransactions - p.batch );
}
p.first = req.totalReleasedTransactions;
p.second = now();
p.total = req.totalReleasedTransactions;
p.batch = req.batchReleasedTransactions;
p.time = now();
reply.transactionRate = self.TPSLimit / self.proxy_transactionCountAndTime.size();
reply.transactionRate = self.normalLimits.tpsLimit / self.proxy_transactionCounts.size();
reply.batchTransactionRate = self.batchLimits.tpsLimit / self.proxy_transactionCounts.size();
reply.leaseDuration = SERVER_KNOBS->METRIC_UPDATE_RATE;
reply.healthMetrics.update(self.healthMetrics, true, req.detailed);
reply.healthMetrics.tpsLimit = self.normalLimits.tpsLimit;
req.reply.send( reply );
}
when (wait(err.getFuture())) {}

View File

@ -209,37 +209,6 @@ protected:
int64_t counter;
};
static double parseDouble(std::string const& s, bool permissive = false) {
double d = 0;
int consumed = 0;
int r = sscanf(s.c_str(), "%lf%n", &d, &consumed);
if (r == 1 && (consumed == s.size() || permissive))
return d;
throw attribute_not_found();
}
static int parseInt(std::string const& s, bool permissive = false) {
long long int iLong = 0;
int consumed = 0;
int r = sscanf(s.c_str(), "%lld%n", &iLong, &consumed);
if (r == 1 && (consumed == s.size() || permissive)){
if (std::numeric_limits<int>::min() <= iLong && iLong <= std::numeric_limits<int>::max())
return (int)iLong; // Downcast definitely safe
else
throw attribute_too_large();
}
throw attribute_not_found();
}
static int64_t parseInt64(std::string const& s, bool permissive = false) {
long long int i = 0;
int consumed = 0;
int r = sscanf(s.c_str(), "%lld%n", &i, &consumed);
if (r == 1 && (consumed == s.size() || permissive))
return i;
throw attribute_not_found();
}
static JsonBuilderObject getLocalityInfo(const LocalityData& locality) {
JsonBuilderObject localityObj;
@ -345,8 +314,8 @@ static JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics, vector<std:
statusObj["memory"] = memoryObj;
JsonBuilderObject cpuObj;
double cpu_seconds = parseDouble(event.getValue("CPUSeconds"));
double elapsed = parseDouble(event.getValue("Elapsed"));
double cpu_seconds = event.getDouble("CPUSeconds");
double elapsed = event.getDouble("Elapsed");
if (elapsed > 0){
cpuObj["logical_core_utilization"] = std::max(0.0, std::min(cpu_seconds / elapsed, 1.0));
}
@ -356,7 +325,7 @@ static JsonBuilderObject machineStatusFetcher(WorkerEvents mMetrics, vector<std:
networkObj["megabits_sent"] = JsonBuilderObject().setKeyRawNumber("hz", event.getValue("MbpsSent"));
networkObj["megabits_received"] = JsonBuilderObject().setKeyRawNumber("hz", event.getValue("MbpsReceived"));
metric = parseDouble(event.getValue("RetransSegs"));
metric = event.getDouble("RetransSegs");
JsonBuilderObject retransSegsObj;
if (elapsed > 0){
retransSegsObj["hz"] = metric / elapsed;
@ -460,13 +429,13 @@ struct RolesInfo {
obj["mutation_bytes"] = StatusCounter(storageMetrics.getValue("MutationBytes")).getStatus();
obj["mutations"] = StatusCounter(storageMetrics.getValue("Mutations")).getStatus();
Version version = parseInt64(storageMetrics.getValue("Version"));
Version durableVersion = parseInt64(storageMetrics.getValue("DurableVersion"));
Version version = storageMetrics.getInt64("Version");
Version durableVersion = storageMetrics.getInt64("DurableVersion");
obj["data_version"] = version;
obj["durable_version"] = durableVersion;
int64_t versionLag = parseInt64(storageMetrics.getValue("VersionLag"));
int64_t versionLag = storageMetrics.getInt64("VersionLag");
if(maxTLogVersion > 0) {
// It's possible that the storage server hasn't talked to the logs recently, in which case it may not be aware of how far behind it is.
// To account for that, we also compute the version difference between each storage server and the tlog with the largest version.
@ -522,7 +491,7 @@ struct RolesInfo {
obj.setKeyRawNumber("queue_disk_total_bytes", tlogMetrics.getValue("QueueDiskBytesTotal"));
obj["input_bytes"] = StatusCounter(tlogMetrics.getValue("BytesInput")).getStatus();
obj["durable_bytes"] = StatusCounter(tlogMetrics.getValue("BytesDurable")).getStatus();
metricVersion = parseInt64(tlogMetrics.getValue("Version"));
metricVersion = tlogMetrics.getInt64("Version");
obj["data_version"] = metricVersion;
} catch (Error& e) {
if(e.code() != error_code_attribute_not_found)
@ -622,7 +591,7 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
if(memInfo->second.valid()) {
if(processMetrics.size() > 0) {
memInfo->second.memoryUsage += parseDouble(processMetrics.getValue("Memory"));
memInfo->second.memoryUsage += processMetrics.getDouble("Memory");
++memInfo->second.numProcesses;
}
else
@ -697,11 +666,11 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
statusObj.setKeyRawNumber("uptime_seconds",event.getValue("UptimeSeconds"));
// rates are calculated over the last elapsed seconds
double elapsed = parseDouble(event.getValue("Elapsed"));;
double cpu_seconds = parseDouble(event.getValue("CPUSeconds"));
double diskIdleSeconds = parseDouble(event.getValue("DiskIdleSeconds"));
double diskReads = parseDouble(event.getValue("DiskReads"));
double diskWrites = parseDouble(event.getValue("DiskWrites"));
double elapsed = event.getDouble("Elapsed");
double cpu_seconds = event.getDouble("CPUSeconds");
double diskIdleSeconds = event.getDouble("DiskIdleSeconds");
double diskReads = event.getDouble("DiskReads");
double diskWrites = event.getDouble("DiskWrites");
JsonBuilderObject diskObj;
if (elapsed > 0){
@ -779,7 +748,7 @@ ACTOR static Future<JsonBuilderObject> processStatusFetcher(
// if this process address is in the machine metrics
if (mMetrics.count(address) && mMetrics[address].size()){
double availableMemory;
availableMemory = parseDouble(mMetrics[address].getValue("AvailableMemory"));
availableMemory = mMetrics[address].getDouble("AvailableMemory");
auto machineMemInfo = machineMemoryUsage[workerItr->first.locality.machineId()];
if (machineMemInfo.valid()) {
@ -883,7 +852,7 @@ ACTOR static Future<JsonBuilderObject> recoveryStateStatusFetcher(std::pair<Work
try {
TraceEventFields md = wait( timeoutError(mWorker.first.eventLogRequest.getReply( EventLogRequest( LiteralStringRef("MasterRecoveryState") ) ), 1.0) );
state int mStatusCode = parseInt( md.getValue("StatusCode") );
state int mStatusCode = md.getInt("StatusCode");
if (mStatusCode < 0 || mStatusCode >= RecoveryStatus::END)
throw attribute_not_found();
@ -1162,9 +1131,9 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(std::pair<WorkerInterfa
// If we have a MovingData message, parse it.
if (md.size())
{
int64_t partitionsInQueue = parseInt64(md.getValue("InQueue"));
int64_t partitionsInFlight = parseInt64(md.getValue("InFlight"));
int64_t averagePartitionSize = parseInt64(md.getValue("AverageShardSize"));
int64_t partitionsInQueue = md.getInt64("InQueue");
int64_t partitionsInFlight = md.getInt64("InFlight");
int64_t averagePartitionSize = md.getInt64("AverageShardSize");
if( averagePartitionSize >= 0 ) {
JsonBuilderObject moving_data;
@ -1192,8 +1161,8 @@ ACTOR static Future<JsonBuilderObject> dataStatusFetcher(std::pair<WorkerInterfa
continue;
}
bool primary = parseInt(inFlight.getValue("Primary"));
int highestPriority = parseInt(inFlight.getValue("HighestPriority"));
bool primary = inFlight.getInt("Primary");
int highestPriority = inFlight.getInt("HighestPriority");
JsonBuilderObject team_tracker;
team_tracker["primary"] = primary;
@ -1388,6 +1357,30 @@ static int getExtraTLogEligibleMachines(const vector<std::pair<WorkerInterface,
return extraTlogEligibleMachines;
}
JsonBuilderObject getPerfLimit(TraceEventFields const& ratekeeper, double transPerSec, double tpsLimit) {
int reason = ratekeeper.getInt("Reason");
JsonBuilderObject perfLimit;
if (transPerSec > tpsLimit * 0.8) {
// If reason is known, set qos.performance_limited_by, otherwise omit
if (reason >= 0 && reason < limitReasonEnd) {
perfLimit = JsonString::makeMessage(limitReasonName[reason], limitReasonDesc[reason]);
std::string reason_server_id = ratekeeper.getValue("ReasonServerID");
if (!reason_server_id.empty())
perfLimit["reason_server_id"] = reason_server_id;
}
}
else {
perfLimit = JsonString::makeMessage("workload", "The database is not being saturated by the workload.");
}
if(!perfLimit.empty()) {
perfLimit["reason_id"] = reason;
}
return perfLimit;
}
ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<struct ServerDBInfo>> db, vector<std::pair<WorkerInterface, ProcessClass>> workers, std::pair<WorkerInterface, ProcessClass> mWorker, std::pair<WorkerInterface, ProcessClass> ddWorker,
JsonBuilderObject *qos, JsonBuilderObject *data_overlay, std::set<std::string> *incomplete_reasons, Future<ErrorOr<vector<std::pair<StorageServerInterface, EventMap>>>> storageServerFuture)
{
@ -1440,50 +1433,46 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
// Transactions
try {
TraceEventFields md = wait( timeoutError(ddWorker.first.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdate") ) ), 1.0) );
double tpsLimit = parseDouble(md.getValue("TPSLimit"));
double transPerSec = parseDouble(md.getValue("ReleasedTPS"));
int ssCount = parseInt(md.getValue("StorageServers"));
int tlogCount = parseInt(md.getValue("TLogs"));
int64_t worstFreeSpaceStorageServer = parseInt64(md.getValue("WorstFreeSpaceStorageServer"));
int64_t worstFreeSpaceTLog = parseInt64(md.getValue("WorstFreeSpaceTLog"));
(*data_overlay).setKeyRawNumber("total_disk_used_bytes",md.getValue("TotalDiskUsageBytes"));
state TraceEventFields ratekeeper = wait( timeoutError(ddWorker.first.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdate") ) ), 1.0) );
TraceEventFields batchRatekeeper = wait( timeoutError(ddWorker.first.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdateBatch") ) ), 1.0) );
double tpsLimit = ratekeeper.getDouble("TPSLimit");
double batchTpsLimit = batchRatekeeper.getDouble("TPSLimit");
double transPerSec = ratekeeper.getDouble("ReleasedTPS");
double batchTransPerSec = ratekeeper.getDouble("ReleasedBatchTPS");
int ssCount = ratekeeper.getInt("StorageServers");
int tlogCount = ratekeeper.getInt("TLogs");
int64_t worstFreeSpaceStorageServer = ratekeeper.getInt64("WorstFreeSpaceStorageServer");
int64_t worstFreeSpaceTLog = ratekeeper.getInt64("WorstFreeSpaceTLog");
(*data_overlay).setKeyRawNumber("total_disk_used_bytes",ratekeeper.getValue("TotalDiskUsageBytes"));
if(ssCount > 0) {
(*data_overlay)["least_operating_space_bytes_storage_server"] = std::max(worstFreeSpaceStorageServer, (int64_t)0);
(*qos).setKeyRawNumber("worst_queue_bytes_storage_server",md.getValue("WorstStorageServerQueue"));
(*qos).setKeyRawNumber("limiting_queue_bytes_storage_server",md.getValue("LimitingStorageServerQueue"));
(*qos).setKeyRawNumber("worst_version_lag_storage_server",md.getValue("WorstStorageServerVersionLag"));
(*qos).setKeyRawNumber("limiting_version_lag_storage_server",md.getValue("LimitingStorageServerVersionLag"));
(*qos).setKeyRawNumber("worst_queue_bytes_storage_server", ratekeeper.getValue("WorstStorageServerQueue"));
(*qos).setKeyRawNumber("limiting_queue_bytes_storage_server", ratekeeper.getValue("LimitingStorageServerQueue"));
(*qos).setKeyRawNumber("worst_version_lag_storage_server", ratekeeper.getValue("WorstStorageServerVersionLag"));
(*qos).setKeyRawNumber("limiting_version_lag_storage_server", ratekeeper.getValue("LimitingStorageServerVersionLag"));
}
if(tlogCount > 0) {
(*data_overlay)["least_operating_space_bytes_log_server"] = std::max(worstFreeSpaceTLog, (int64_t)0);
(*qos).setKeyRawNumber("worst_queue_bytes_log_server",md.getValue("WorstTLogQueue"));
(*qos).setKeyRawNumber("worst_queue_bytes_log_server", ratekeeper.getValue("WorstTLogQueue"));
}
(*qos)["transactions_per_second_limit"] = tpsLimit;
(*qos)["batch_transactions_per_second_limit"] = batchTpsLimit;
(*qos)["released_transactions_per_second"] = transPerSec;
(*qos)["batch_released_transactions_per_second"] = batchTransPerSec;
int reason = parseInt(md.getValue("Reason"));
JsonBuilderObject perfLimit;
if (transPerSec > tpsLimit * 0.8) {
// If reason is known, set qos.performance_limited_by, otherwise omit
if (reason >= 0 && reason < limitReasonEnd) {
perfLimit = JsonString::makeMessage(limitReasonName[reason], limitReasonDesc[reason]);
std::string reason_server_id = md.getValue("ReasonServerID");
if (!reason_server_id.empty())
perfLimit["reason_server_id"] = reason_server_id;
}
}
else {
perfLimit = JsonString::makeMessage("workload", "The database is not being saturated by the workload.");
}
JsonBuilderObject perfLimit = getPerfLimit(ratekeeper, transPerSec, tpsLimit);
if(!perfLimit.empty()) {
perfLimit["reason_id"] = reason;
(*qos)["performance_limited_by"] = perfLimit;
}
JsonBuilderObject batchPerfLimit = getPerfLimit(batchRatekeeper, transPerSec, batchTpsLimit);
if(!batchPerfLimit.empty()) {
(*qos)["batch_performance_limited_by"] = batchPerfLimit;
}
} catch (Error &e){
if (e.code() == error_code_actor_cancelled)
throw;

View File

@ -117,6 +117,7 @@
<ActorCompiler Include="workloads\SelectorCorrectness.actor.cpp" />
<ActorCompiler Include="workloads\KVStoreTest.actor.cpp" />
<ActorCompiler Include="workloads\StreamingRead.actor.cpp" />
<ActorCompiler Include="workloads\Throttling.actor.cpp" />
<ActorCompiler Include="workloads\Throughput.actor.cpp" />
<ActorCompiler Include="workloads\WriteBandwidth.actor.cpp" />
<ActorCompiler Include="workloads\QueuePush.actor.cpp" />

View File

@ -80,6 +80,9 @@
<ActorCompiler Include="workloads\ConflictRange.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
<ActorCompiler Include="workloads\Throttling.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>
<ActorCompiler Include="workloads\Throughput.actor.cpp">
<Filter>workloads</Filter>
</ActorCompiler>

View File

@ -24,6 +24,7 @@
#include "flow/IndexedSet.h"
#include "flow/Hash3.h"
#include "flow/ActorCollection.h"
#include "flow/SystemMonitor.h"
#include "flow/Util.h"
#include "fdbclient/Atomic.h"
#include "fdbclient/KeyRangeMap.h"
@ -296,6 +297,8 @@ public:
Version poppedAllAfter;
std::map<Version, Arena> freeable; // for each version, an Arena that must be held until that version is < oldestVersion
Arena lastArena;
double cpuUsage;
double diskUsage;
std::map<Version, Standalone<VersionUpdateRef>> const & getMutationLog() { return mutationLog; }
std::map<Version, Standalone<VersionUpdateRef>>& getMutableMutationLog() { return mutationLog; }
@ -511,7 +514,7 @@ public:
logProtocol(0), counters(this), tag(invalidTag), maxQueryQueue(0), thisServerID(ssi.id()),
readQueueSizeMetric(LiteralStringRef("StorageServer.ReadQueueSize")),
behind(false), byteSampleClears(false, LiteralStringRef("\xff\xff\xff")), noRecentUpdates(false),
lastUpdate(now()), poppedAllAfter(std::numeric_limits<Version>::max())
lastUpdate(now()), poppedAllAfter(std::numeric_limits<Version>::max()), cpuUsage(0.0), diskUsage(0.0)
{
version.initMetric(LiteralStringRef("StorageServer.Version"), counters.cc.id);
oldestVersion.initMetric(LiteralStringRef("StorageServer.OldestVersion"), counters.cc.id);
@ -587,7 +590,7 @@ public:
}
double getPenalty() {
return std::max(1.0, (queueSize() - (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER - 2*SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER)) / SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER);
return std::max(1.0, (queueSize() - (SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER - 2.0*SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER)) / SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER);
}
};
@ -684,6 +687,24 @@ void validate(StorageServer* data, bool force = false) {
}
#pragma endregion
void
updateProcessStats(StorageServer* self)
{
if (g_network->isSimulated()) {
// diskUsage and cpuUsage are not relevant in the simulator,
// and relying on the actual values could break seed determinism
self->cpuUsage = 100.0;
self->diskUsage = 100.0;
return;
}
SystemStatistics sysStats = getSystemStatistics();
if (sysStats.initialized) {
self->cpuUsage = 100 * sysStats.processCPUSeconds / sysStats.elapsed;
self->diskUsage = 100 * std::max(0.0, (sysStats.elapsed - sysStats.processDiskIdleSeconds) / sysStats.elapsed);
}
}
///////////////////////////////////// Queries /////////////////////////////////
#pragma region Queries
ACTOR Future<Version> waitForVersion( StorageServer* data, Version version ) {
@ -1431,7 +1452,10 @@ void getQueuingMetrics( StorageServer* self, StorageQueuingMetricsRequest const&
reply.storageBytes = self->storage.getStorageBytes();
reply.v = self->version.get();
reply.version = self->version.get();
reply.cpuUsage = self->cpuUsage;
reply.diskUsage = self->diskUsage;
reply.durableVersion = self->durableVersion.get();
req.reply.send( reply );
}
@ -3317,6 +3341,8 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
state double lastLoopTopTime = now();
state Future<Void> dbInfoChange = Void();
state Future<Void> checkLastUpdate = Void();
state double updateProcessStatsDelay = SERVER_KNOBS->UPDATE_STORAGE_PROCESS_STATS_INTERVAL;
state Future<Void> updateProcessStatsTimer = delay(updateProcessStatsDelay);
actors.add(updateStorage(self));
actors.add(waitFailureServer(ssi.waitFailure.getFuture()));
@ -3428,6 +3454,10 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
else
doUpdate = update( self, &updateReceived );
}
when(wait(updateProcessStatsTimer)) {
updateProcessStats(self);
updateProcessStatsTimer = delay(updateProcessStatsDelay);
}
when(wait(actors.getResult())) {}
}
}

View File

@ -150,8 +150,10 @@ int getOption( VectorRef<KeyValueRef> options, Key key, int defaultValue) {
if( sscanf(options[i].value.toString().c_str(), "%d", &r) ) {
options[i].value = LiteralStringRef("");
return r;
} else
} else {
TraceEvent(SevError, "InvalidTestOption").detail("OptionName", printable(key));
throw test_specification_invalid();
}
}
return defaultValue;
@ -164,8 +166,10 @@ uint64_t getOption( VectorRef<KeyValueRef> options, Key key, uint64_t defaultVal
if( sscanf(options[i].value.toString().c_str(), "%lld", &r) ) {
options[i].value = LiteralStringRef("");
return r;
} else
} else {
TraceEvent(SevError, "InvalidTestOption").detail("OptionName", printable(key));
throw test_specification_invalid();
}
}
return defaultValue;
@ -178,8 +182,10 @@ int64_t getOption( VectorRef<KeyValueRef> options, Key key, int64_t defaultValue
if( sscanf(options[i].value.toString().c_str(), "%lld", &r) ) {
options[i].value = LiteralStringRef("");
return r;
} else
} else {
TraceEvent(SevError, "InvalidTestOption").detail("OptionName", printable(key));
throw test_specification_invalid();
}
}
return defaultValue;

View File

@ -1092,7 +1092,8 @@ struct FuzzApiCorrectnessWorkload : TestWorkload {
contract = {
std::make_pair( error_code_invalid_option_value, ExceptionContract::Possible ),
std::make_pair( error_code_client_invalid_operation, ExceptionContract::possibleIf((FDBTransactionOptions::Option)op == FDBTransactionOptions::READ_YOUR_WRITES_DISABLE) ),
std::make_pair( error_code_client_invalid_operation, ExceptionContract::possibleIf((FDBTransactionOptions::Option)op == FDBTransactionOptions::READ_YOUR_WRITES_DISABLE ||
(FDBTransactionOptions::Option)op == FDBTransactionOptions::LOG_TRANSACTION) ),
std::make_pair( error_code_read_version_already_set, ExceptionContract::possibleIf((FDBTransactionOptions::Option)op == FDBTransactionOptions::INITIALIZE_NEW_DATABASE) )
};
}

View File

@ -96,6 +96,9 @@ struct ReadWriteWorkload : KVWorkload {
bool useRYW;
bool rampTransactionType;
bool rampUpConcurrency;
bool batchPriority;
Standalone<StringRef> descriptionString;
Int64MetricHandle totalReadsMetric;
Int64MetricHandle totalRetriesMetric;
@ -174,6 +177,8 @@ struct ReadWriteWorkload : KVWorkload {
rampTransactionType = getOption(options, LiteralStringRef("rampTransactionType"), false);
rampUpConcurrency = getOption(options, LiteralStringRef("rampUpConcurrency"), false);
doSetup = getOption(options, LiteralStringRef("setup"), true);
batchPriority = getOption(options, LiteralStringRef("batchPriority"), false);
descriptionString = getOption(options, LiteralStringRef("description"), LiteralStringRef("ReadWrite"));
if (rampUpConcurrency) ASSERT( rampSweepCount == 2 ); // Implementation is hard coded to ramp up and down
@ -213,7 +218,7 @@ struct ReadWriteWorkload : KVWorkload {
}
}
virtual std::string description() { return "ReadWrite"; }
virtual std::string description() { return descriptionString.toString(); }
virtual Future<Void> setup( Database const& cx ) { return _setup( cx, this ); }
virtual Future<Void> start( Database const& cx ) { return _start( cx, this ); }
@ -304,6 +309,13 @@ struct ReadWriteWorkload : KVWorkload {
return KeyValueRef( keyForIndex( n, false ), randomValue() );
}
template <class Trans>
void setupTransaction(Trans *tr) {
if(batchPriority) {
tr->setOption(FDBTransactionOptions::PRIORITY_BATCH);
}
}
ACTOR static Future<Void> tracePeriodically( ReadWriteWorkload *self ) {
state double start = now();
state double elapsed = 0.0;
@ -313,10 +325,10 @@ struct ReadWriteWorkload : KVWorkload {
elapsed += self->periodicLoggingInterval;
wait( delayUntil(start + elapsed) );
TraceEvent("RW_RowReadLatency").detail("Mean", self->readLatencies.mean()).detail("Median", self->readLatencies.median()).detail("Percentile5", self->readLatencies.percentile(.05)).detail("Percentile95", self->readLatencies.percentile(.95)).detail("Count", self->readLatencyCount).detail("Elapsed", elapsed);
TraceEvent("RW_GRVLatency").detail("Mean", self->GRVLatencies.mean()).detail("Median", self->GRVLatencies.median()).detail("Percentile5", self->GRVLatencies.percentile(.05)).detail("Percentile95", self->GRVLatencies.percentile(.95));
TraceEvent("RW_CommitLatency").detail("Mean", self->commitLatencies.mean()).detail("Median", self->commitLatencies.median()).detail("Percentile5", self->commitLatencies.percentile(.05)).detail("Percentile95", self->commitLatencies.percentile(.95));
TraceEvent("RW_TotalLatency").detail("Mean", self->latencies.mean()).detail("Median", self->latencies.median()).detail("Percentile5", self->latencies.percentile(.05)).detail("Percentile95", self->latencies.percentile(.95));
TraceEvent((self->description() + "_RowReadLatency").c_str()).detail("Mean", self->readLatencies.mean()).detail("Median", self->readLatencies.median()).detail("Percentile5", self->readLatencies.percentile(.05)).detail("Percentile95", self->readLatencies.percentile(.95)).detail("Count", self->readLatencyCount).detail("Elapsed", elapsed);
TraceEvent((self->description() + "_GRVLatency").c_str()).detail("Mean", self->GRVLatencies.mean()).detail("Median", self->GRVLatencies.median()).detail("Percentile5", self->GRVLatencies.percentile(.05)).detail("Percentile95", self->GRVLatencies.percentile(.95));
TraceEvent((self->description() + "_CommitLatency").c_str()).detail("Mean", self->commitLatencies.mean()).detail("Median", self->commitLatencies.median()).detail("Percentile5", self->commitLatencies.percentile(.05)).detail("Percentile95", self->commitLatencies.percentile(.95));
TraceEvent((self->description() + "_TotalLatency").c_str()).detail("Mean", self->latencies.mean()).detail("Median", self->latencies.median()).detail("Percentile5", self->latencies.percentile(.05)).detail("Percentile95", self->latencies.percentile(.95));
int64_t ops = (self->aTransactions.getValue() * (self->readsPerTransactionA+self->writesPerTransactionA)) +
(self->bTransactions.getValue() * (self->readsPerTransactionB+self->writesPerTransactionB));
@ -456,7 +468,9 @@ struct ReadWriteWorkload : KVWorkload {
state double startTime = now();
loop {
state Transaction tr(cx);
try {
self->setupTransaction(&tr);
wait( self->readOp( &tr, keys, self, false ) );
wait( tr.warmRange( cx, allKeys ) );
break;
@ -564,6 +578,7 @@ struct ReadWriteWorkload : KVWorkload {
extra_ranges.push_back(singleKeyRange( g_random->randomUniqueID().toString() ));
state Trans tr(cx);
if(tstart - self->clientBegin > self->debugTime && tstart - self->clientBegin <= self->debugTime + self->debugInterval) {
debugID = g_random->randomUniqueID();
tr.debugTransaction(debugID);
@ -578,6 +593,8 @@ struct ReadWriteWorkload : KVWorkload {
loop{
try {
self->setupTransaction(&tr);
GRVStartTime = now();
self->transactionFailureMetric->startLatency = -1;

View File

@ -0,0 +1,254 @@
/*
* Throttling.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <boost/lexical_cast.hpp>
#include "fdbclient/ReadYourWrites.h"
#include "fdbserver/workloads/workloads.actor.h"
struct TokenBucket {
static constexpr const double addTokensInterval = 0.1;
static constexpr const double maxSleepTime = 60.0;
double transactionRate;
double maxBurst;
double bucketSize;
Future<Void> tokenAdderActor;
ACTOR static Future<Void> tokenAdder(TokenBucket* self) {
loop {
self->bucketSize = std::min(self->bucketSize + self->transactionRate * addTokensInterval, self->maxBurst);
if (g_random->randomInt(0, 100) == 0)
TraceEvent("AddingTokensx100")
.detail("BucketSize", self->bucketSize)
.detail("TransactionRate", self->transactionRate);
wait(delay(addTokensInterval));
}
}
TokenBucket(double maxBurst = 1000) : transactionRate(0), maxBurst(maxBurst), bucketSize(maxBurst) {
tokenAdderActor = tokenAdder(this);
}
ACTOR static Future<Void> startTransaction(TokenBucket* self) {
state double sleepTime = addTokensInterval;
loop {
if (self->bucketSize >= 1.0) {
--self->bucketSize;
return Void();
}
if (g_random->randomInt(0, 100) == 0)
TraceEvent("ThrottlingTransactionx100").detail("SleepTime", sleepTime);
wait(delay(sleepTime));
sleepTime = std::min(sleepTime * 2, maxSleepTime);
}
}
};
struct ThrottlingWorkload : KVWorkload {
double testDuration;
double healthMetricsCheckInterval;
int actorsPerClient;
int writesPerTransaction;
int readsPerTransaction;
double throttlingMultiplier;
int transactionsCommitted;
int64_t worstStorageQueue;
int64_t worstStorageDurabilityLag;
int64_t worstTLogQueue;
int64_t detailedWorstStorageQueue;
int64_t detailedWorstStorageDurabilityLag;
int64_t detailedWorstTLogQueue;
double detailedWorstCpuUsage;
double detailedWorstDiskUsage;
bool sendDetailedHealthMetrics;
double maxAllowedStaleness;
TokenBucket tokenBucket;
bool healthMetricsStoppedUpdating;
ThrottlingWorkload(WorkloadContext const& wcx)
: KVWorkload(wcx), transactionsCommitted(0), worstStorageQueue(0), worstStorageDurabilityLag(0), worstTLogQueue(0),
detailedWorstStorageQueue(0), detailedWorstStorageDurabilityLag(0), detailedWorstTLogQueue(0), detailedWorstCpuUsage(0.0),
detailedWorstDiskUsage(0.0), healthMetricsStoppedUpdating(false) {
testDuration = getOption(options, LiteralStringRef("testDuration"), 60.0);
healthMetricsCheckInterval = getOption(options, LiteralStringRef("healthMetricsCheckInterval"), 1.0);
actorsPerClient = getOption(options, LiteralStringRef("actorsPerClient"), 10);
writesPerTransaction = getOption(options, LiteralStringRef("writesPerTransaction"), 10);
readsPerTransaction = getOption(options, LiteralStringRef("readsPerTransaction"), 10);
throttlingMultiplier = getOption(options, LiteralStringRef("throttlingMultiplier"), 0.5);
sendDetailedHealthMetrics = getOption(options, LiteralStringRef("sendDetailedHealthMetrics"), true);
maxAllowedStaleness = getOption(options, LiteralStringRef("maxAllowedStaleness"), 10.0);
int maxBurst = getOption(options, LiteralStringRef("maxBurst"), 1000);
tokenBucket.maxBurst = maxBurst;
}
ACTOR static Future<Void> healthMetricsChecker(Database cx, ThrottlingWorkload* self) {
state int repeated = 0;
state HealthMetrics healthMetrics;
loop {
wait(delay(self->healthMetricsCheckInterval));
if (healthMetrics == cx->healthMetrics)
{
if (++repeated > self->maxAllowedStaleness / self->healthMetricsCheckInterval)
self->healthMetricsStoppedUpdating = true;
}
else
repeated = 0;
healthMetrics = cx->healthMetrics;
self->tokenBucket.transactionRate = healthMetrics.tpsLimit * self->throttlingMultiplier / self->clientCount;
self->worstStorageQueue = std::max(self->worstStorageQueue, healthMetrics.worstStorageQueue);
self->worstStorageDurabilityLag = std::max(self->worstStorageDurabilityLag, healthMetrics.worstStorageDurabilityLag);
self->worstTLogQueue = std::max(self->worstTLogQueue, healthMetrics.worstTLogQueue);
TraceEvent("HealthMetrics")
.detail("WorstStorageQueue", healthMetrics.worstStorageQueue)
.detail("WorstStorageDurabilityLag", healthMetrics.worstStorageDurabilityLag)
.detail("WorstTLogQueue", healthMetrics.worstTLogQueue)
.detail("TpsLimit", healthMetrics.tpsLimit);
TraceEvent traceStorageQueue("StorageQueue");
TraceEvent traceStorageDurabilityLag("StorageDurabilityLag");
TraceEvent traceCpuUsage("CpuUsage");
TraceEvent traceDiskUsage("DiskUsage");
for (const auto& ss : healthMetrics.storageStats) {
auto storageStats = ss.second;
self->detailedWorstStorageQueue = std::max(self->detailedWorstStorageQueue, storageStats.storageQueue);
traceStorageQueue.detail(format("Storage/%s", ss.first.toString().c_str()), storageStats.storageQueue);
self->detailedWorstStorageDurabilityLag = std::max(self->detailedWorstStorageDurabilityLag, storageStats.storageDurabilityLag);
traceStorageDurabilityLag.detail(format("Storage/%s", ss.first.toString().c_str()), storageStats.storageDurabilityLag);
self->detailedWorstCpuUsage = std::max(self->detailedWorstCpuUsage, storageStats.cpuUsage);
traceCpuUsage.detail(format("Storage/%s", ss.first.toString().c_str()), storageStats.cpuUsage);
self->detailedWorstDiskUsage = std::max(self->detailedWorstDiskUsage, storageStats.diskUsage);
traceDiskUsage.detail(format("Storage/%s", ss.first.toString().c_str()), storageStats.diskUsage);
}
TraceEvent traceTLogQueue("TLogQueue");
for (const auto& ss : healthMetrics.tLogQueue) {
self->detailedWorstTLogQueue = std::max(self->detailedWorstTLogQueue, ss.second);
traceTLogQueue.detail(format("TLog/%s", ss.first.toString().c_str()), ss.second);
}
}
}
static Value getRandomValue() { return Standalone<StringRef>(format("Value/%d", g_random->randomInt(0, 10e6))); }
ACTOR static Future<Void> clientActor(Database cx, ThrottlingWorkload* self) {
state ReadYourWritesTransaction tr(cx);
loop {
wait(TokenBucket::startTransaction(&self->tokenBucket));
tr.reset();
try {
state int i;
for (i = 0; i < self->readsPerTransaction; ++i) {
state Optional<Value> value = wait(tr.get(self->getRandomKey()));
}
for (i = 0; i < self->writesPerTransaction; ++i) {
tr.set(self->getRandomKey(), getRandomValue());
}
wait(tr.commit());
if (g_random->randomInt(0, 1000) == 0) TraceEvent("TransactionCommittedx1000");
++self->transactionsCommitted;
} catch (Error& e) {
// ignore failing transactions
}
}
}
ACTOR static Future<Void> _setup(Database cx, ThrottlingWorkload* self) {
Standalone<StringRef> value(format("%d", self->sendDetailedHealthMetrics ? 1 : 0));
setNetworkOption(FDBNetworkOptions::SEND_DETAILED_HEALTH_METRICS, Optional<StringRef>(value));
if (!self->sendDetailedHealthMetrics) {
// Clear detailed health metrics that are already populated
wait(delay(2 * CLIENT_KNOBS->UPDATE_DETAILED_HEALTH_METRICS_INTERVAL));
cx->healthMetrics.storageStats.clear();
cx->healthMetrics.tLogQueue.clear();
}
return Void();
}
ACTOR static Future<Void> _start(Database cx, ThrottlingWorkload* self) {
state Future<Void> hmChecker = timeout(healthMetricsChecker(cx, self), self->testDuration, Void());
state vector<Future<Void>> clientActors;
state int actorId;
for (actorId = 0; actorId < self->actorsPerClient; ++actorId) {
clientActors.push_back(timeout(clientActor(cx, self), self->testDuration, Void()));
}
wait(hmChecker);
return Void();
}
virtual std::string description() { return "Throttling"; }
virtual Future<Void> setup(Database const& cx) { return _setup(cx, this); }
virtual Future<Void> start(Database const& cx) { return _start(cx, this); }
virtual Future<bool> check(Database const& cx) {
if (healthMetricsStoppedUpdating) {
TraceEvent(SevError, "HealthMetricsStoppedUpdating");
return false;
}
if (transactionsCommitted == 0) {
TraceEvent(SevError, "NoTransactionsCommitted");
return false;
}
bool correctHealthMetricsState = true;
if (worstStorageQueue == 0 || worstStorageDurabilityLag == 0 || worstTLogQueue == 0 || transactionsCommitted == 0)
correctHealthMetricsState = false;
if (sendDetailedHealthMetrics) {
if (detailedWorstStorageQueue == 0 || detailedWorstStorageDurabilityLag == 0 || detailedWorstTLogQueue == 0 ||
detailedWorstCpuUsage == 0.0 || detailedWorstDiskUsage == 0.0)
correctHealthMetricsState = false;
} else {
if (detailedWorstStorageQueue != 0 || detailedWorstStorageDurabilityLag != 0 || detailedWorstTLogQueue != 0 ||
detailedWorstCpuUsage != 0.0 || detailedWorstDiskUsage != 0.0)
correctHealthMetricsState = false;
}
if (!correctHealthMetricsState) {
TraceEvent(SevError, "IncorrectHealthMetricsState")
.detail("WorstStorageQueue", worstStorageQueue)
.detail("WorstStorageDurabilityLag", worstStorageDurabilityLag)
.detail("WorstTLogQueue", worstTLogQueue)
.detail("DetailedWorstStorageQueue", detailedWorstStorageQueue)
.detail("DetailedWorstStorageDurabilityLag", detailedWorstStorageDurabilityLag)
.detail("DetailedWorstTLogQueue", detailedWorstTLogQueue)
.detail("DetailedWorstCpuUsage", detailedWorstCpuUsage)
.detail("DetailedWorstDiskUsage", detailedWorstDiskUsage)
.detail("SendingDetailedHealthMetrics", sendDetailedHealthMetrics);
}
return correctHealthMetricsState;
}
virtual void getMetrics(vector<PerfMetric>& m) {
m.push_back(PerfMetric("TransactionsCommitted", transactionsCommitted, false));
m.push_back(PerfMetric("WorstStorageQueue", worstStorageQueue, true));
m.push_back(PerfMetric("DetailedWorstStorageQueue", detailedWorstStorageQueue, true));
m.push_back(PerfMetric("WorstStorageDurabilityLag", worstStorageDurabilityLag, true));
m.push_back(PerfMetric("DetailedWorstStorageDurabilityLag", detailedWorstStorageDurabilityLag, true));
m.push_back(PerfMetric("WorstTLogQueue", worstTLogQueue, true));
m.push_back(PerfMetric("DetailedWorstTLogQueue", detailedWorstTLogQueue, true));
m.push_back(PerfMetric("DetailedWorstCpuUsage", detailedWorstCpuUsage, true));
m.push_back(PerfMetric("DetailedWorstDiskUsage", detailedWorstDiskUsage, true));
}
};
WorkloadFactory<ThrottlingWorkload> ThrottlingWorkloadFactory("Throttling");

View File

@ -357,7 +357,7 @@ struct WriteDuringReadWorkload : TestWorkload {
}
}
ACTOR Future<Void> commitAndUpdateMemory( ReadYourWritesTransaction *tr, WriteDuringReadWorkload* self, bool *cancelled, bool readYourWritesDisabled, bool snapshotRYWDisabled, bool readAheadDisabled, bool* doingCommit, double* startTime, Key timebombStr ) {
ACTOR Future<Void> commitAndUpdateMemory( ReadYourWritesTransaction *tr, WriteDuringReadWorkload* self, bool *cancelled, bool readYourWritesDisabled, bool snapshotRYWDisabled, bool readAheadDisabled, bool useBatchPriority, bool* doingCommit, double* startTime, Key timebombStr ) {
state UID randomID = g_nondeterministic_random->randomUniqueID();
//TraceEvent("WDRCommit", randomID);
try {
@ -407,6 +407,8 @@ struct WriteDuringReadWorkload : TestWorkload {
tr->setOption(FDBTransactionOptions::SNAPSHOT_RYW_DISABLE);
if(readAheadDisabled)
tr->setOption(FDBTransactionOptions::READ_AHEAD_DISABLE);
if(useBatchPriority)
tr->setOption(FDBTransactionOptions::PRIORITY_BATCH);
if(self->useSystemKeys)
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->addWriteConflictRange( self->conflictRange );
@ -574,6 +576,7 @@ struct WriteDuringReadWorkload : TestWorkload {
state bool readYourWritesDisabled = g_random->random01() < 0.5;
state bool readAheadDisabled = g_random->random01() < 0.5;
state bool snapshotRYWDisabled = g_random->random01() < 0.5;
state bool useBatchPriority = g_random->random01() < 0.5;
state int64_t timebomb = g_random->random01() < 0.01 ? g_random->randomInt64(1, 6000) : 0;
state std::vector<Future<Void>> operations;
state ActorCollection commits(false);
@ -614,6 +617,8 @@ struct WriteDuringReadWorkload : TestWorkload {
tr.setOption( FDBTransactionOptions::SNAPSHOT_RYW_DISABLE );
if( readAheadDisabled )
tr.setOption( FDBTransactionOptions::READ_AHEAD_DISABLE );
if( useBatchPriority )
tr.setOption( FDBTransactionOptions::PRIORITY_BATCH );
if( self->useSystemKeys )
tr.setOption( FDBTransactionOptions::ACCESS_SYSTEM_KEYS );
tr.setOption( FDBTransactionOptions::TIMEOUT, timebombStr );
@ -647,7 +652,7 @@ struct WriteDuringReadWorkload : TestWorkload {
g_random->random01() > 0.5, readYourWritesDisabled, snapshotRYWDisabled, self, &doingCommit, &memLimit ) );
} else if( operationType == 3 && !disableCommit ) {
if( !self->rarelyCommit || g_random->random01() < 1.0 / self->numOps ) {
Future<Void> commit = self->commitAndUpdateMemory( &tr, self, &cancelled, readYourWritesDisabled, snapshotRYWDisabled, readAheadDisabled, &doingCommit, &startTime, timebombStr );
Future<Void> commit = self->commitAndUpdateMemory( &tr, self, &cancelled, readYourWritesDisabled, snapshotRYWDisabled, readAheadDisabled, useBatchPriority, &doingCommit, &startTime, timebombStr );
operations.push_back( commit );
commits.add( commit );
}

View File

@ -55,7 +55,7 @@ using namespace boost::asio::ip;
//
// xyzdev
// vvvv
const uint64_t currentProtocolVersion = 0x0FDB00B061030001LL;
const uint64_t currentProtocolVersion = 0x0FDB00B061040001LL;
const uint64_t compatibleProtocolVersionMask = 0xffffffffffff0000LL;
const uint64_t minValidProtocolVersion = 0x0FDB00A200060001LL;

View File

@ -41,6 +41,14 @@ void systemMonitor() {
customSystemMonitor("ProcessMetrics", &statState, true );
}
SystemStatistics getSystemStatistics() {
static StatisticsState statState = StatisticsState();
return getSystemStatistics(
machineState.folder.present() ? machineState.folder.get() : "",
machineState.ip.present() ? machineState.ip.get() : 0,
&statState.systemState);
}
#define TRACEALLOCATOR( size ) TraceEvent("MemSample").detail("Count", FastAllocator<size>::getApproximateMemoryUnused()/size).detail("TotalSize", FastAllocator<size>::getApproximateMemoryUnused()).detail("SampleCount", 1).detail("Hash", "FastAllocatedUnused" #size ).detail("Bt", "na")
#define DETAILALLOCATORMEMUSAGE( size ) detail("TotalMemory"#size, FastAllocator<size>::getTotalMemory()).detail("ApproximateUnusedMemory"#size, FastAllocator<size>::getApproximateMemoryUnused()).detail("ActiveThreads"#size, FastAllocator<size>::getActiveThreads())

View File

@ -133,5 +133,6 @@ struct StatisticsState {
void systemMonitor();
SystemStatistics customSystemMonitor(std::string eventName, StatisticsState *statState, bool machineMetrics = false);
SystemStatistics getSystemStatistics();
#endif /* FLOW_SYSTEM_MONITOR_H */

View File

@ -1120,10 +1120,95 @@ std::string TraceEventFields::getValue(std::string key) const {
return value;
}
else {
TraceEvent ev(SevWarn, "TraceEventFieldNotFound");
if(tryGetValue("Type", value)) {
ev.detail("Event", value);
}
ev.detail("FieldName", key);
throw attribute_not_found();
}
}
namespace {
void parseNumericValue(std::string const& s, double &outValue, bool permissive = false) {
double d = 0;
int consumed = 0;
int r = sscanf(s.c_str(), "%lf%n", &d, &consumed);
if (r == 1 && (consumed == s.size() || permissive)) {
outValue = d;
return;
}
throw attribute_not_found();
}
void parseNumericValue(std::string const& s, int &outValue, bool permissive = false) {
long long int iLong = 0;
int consumed = 0;
int r = sscanf(s.c_str(), "%lld%n", &iLong, &consumed);
if (r == 1 && (consumed == s.size() || permissive)) {
if (std::numeric_limits<int>::min() <= iLong && iLong <= std::numeric_limits<int>::max()) {
outValue = (int)iLong; // Downcast definitely safe
return;
}
else {
throw attribute_too_large();
}
}
throw attribute_not_found();
}
void parseNumericValue(std::string const& s, int64_t &outValue, bool permissive = false) {
long long int i = 0;
int consumed = 0;
int r = sscanf(s.c_str(), "%lld%n", &i, &consumed);
if (r == 1 && (consumed == s.size() || permissive)) {
outValue = i;
return;
}
throw attribute_not_found();
}
template<class T>
T getNumericValue(TraceEventFields const& fields, std::string key, bool permissive) {
std::string field = fields.getValue(key);
try {
T value;
parseNumericValue(field, value, permissive);
return value;
}
catch(Error &e) {
std::string type;
TraceEvent ev(SevWarn, "ErrorParsingNumericTraceEventField");
ev.error(e);
if(fields.tryGetValue("Type", type)) {
ev.detail("Event", type);
}
ev.detail("FieldName", key);
ev.detail("FieldValue", field);
throw;
}
}
} // namespace
int TraceEventFields::getInt(std::string key, bool permissive) const {
return getNumericValue<int>(*this, key, permissive);
}
int64_t TraceEventFields::getInt64(std::string key, bool permissive) const {
return getNumericValue<int64_t>(*this, key, permissive);
}
double TraceEventFields::getDouble(std::string key, bool permissive) const {
return getNumericValue<double>(*this, key, permissive);
}
std::string TraceEventFields::toString() const {
std::string str;
bool first = true;

View File

@ -71,6 +71,9 @@ public:
const Field &operator[] (int index) const;
bool tryGetValue(std::string key, std::string &outValue) const;
std::string getValue(std::string key) const;
int getInt(std::string key, bool permissive=false) const;
int64_t getInt64(std::string key, bool permissive=false) const;
double getDouble(std::string key, bool permissive=false) const;
std::string toString() const;
void validateFormat() const;

View File

@ -170,6 +170,27 @@ inline void load( Archive& ar, std::set<T>& value ) {
ASSERT( ar.protocolVersion() != 0 );
}
template <class Archive, class K, class V>
inline void save( Archive& ar, const std::map<K, V>& value ) {
ar << (int)value.size();
for (const auto &it : value) {
ar << it.first << it.second;
}
ASSERT( ar.protocolVersion() != 0 );
}
template <class Archive, class K, class V>
inline void load( Archive& ar, std::map<K, V>& value ) {
int s;
ar >> s;
value.clear();
for (int i = 0; i < s; ++i) {
std::pair<K, V> p;
ar >> p.first >> p.second;
value.emplace(p);
}
ASSERT( ar.protocolVersion() != 0 );
}
#pragma intrinsic (memcpy)
#if VALGRIND

View File

@ -66,6 +66,7 @@ add_fdb_test(TEST_FILES SlowTask.txt IGNORE)
add_fdb_test(TEST_FILES SpecificUnitTest.txt IGNORE)
add_fdb_test(TEST_FILES StreamingWrite.txt IGNORE)
add_fdb_test(TEST_FILES ThreadSafety.txt IGNORE)
add_fdb_test(TEST_FILES Throttling.txt IGNORE)
add_fdb_test(TEST_FILES TraceEventMetrics.txt IGNORE)
add_fdb_test(TEST_FILES default.txt IGNORE)
add_fdb_test(TEST_FILES errors.txt IGNORE)

10
tests/Throttling.txt Normal file
View File

@ -0,0 +1,10 @@
testTitle=Throttling
testName=Throttling
testDuration=60.0
healthMetricsCheckInterval=1.0
actorsPerClient=10
readsPerTransaction=10
writesPerTransaction=10
throttlingMultiplier=0.5
sendDetailedHealthMetrics=true
maxBurst=10000