Checkpoint: Ratekeeper sets absolute limits for tag throttles and enforces them by distributing requests to proxies, who distribute them to clients.

A few refactorings.
This commit is contained in:
A.J. Beamon 2020-04-16 14:43:22 -07:00
parent 7f3fa00897
commit 0fba8c47be
15 changed files with 280 additions and 138 deletions

View File

@ -3808,15 +3808,15 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
}
}
std::map<Standalone<StringRef>, TagThrottleInfo> tags = wait(ThrottleApi::getTags(db, throttleListLimit));
std::map<TransactionTag, TagThrottleInfo> tags = wait(ThrottleApi::getTags(db, throttleListLimit));
if(tags.size() > 0) {
printf("Throttled tags:\n\n");
printf(" Rate | Expiration (s) | Priority | Type | Tag\n");
printf(" ------+----------------+-----------+--------+------------------\n");
printf(" Rate (txn/s) | Expiration (s) | Priority | Type | Tag\n");
printf(" --------------+----------------+-----------+--------+------------------\n");
for(auto itr = tags.begin(); itr != tags.end(); ++itr) {
printf(" %3d%% | %13ds | %9s | %6s | %s\n",
(int)(itr->second.rate*100),
printf(" %12d | %13ds | %9s | %6s | %s\n",
(int)(itr->second.tpsRate),
(int)(itr->second.expiration-now()),
TagThrottleInfo::priorityToString(itr->second.priority, false),
itr->second.autoThrottled ? "auto" : "manual",
@ -3837,25 +3837,25 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
printf("Usage: throttle on tag <TAG> [RATE] [DURATION]\n");
printf("\n");
printf("Enables throttling for transactions with the specified tag.\n");
printf("An optional throttling rate (out of 1.0) can be specified (default 1.0).\n");
printf("An optional transactions per second rate can be specified (default 0).\n");
printf("An optional duration can be specified, which must include a time suffix (s, m, h, d) (default 1h).\n");
is_error = true;
continue;
}
double rate = 1.0;
double tpsRate = 0.0;
uint64_t duration = 3600;
if(tokens.size() >= 5) {
char *end;
rate = std::strtod((const char*)tokens[4].begin(), &end);
tpsRate = std::strtod((const char*)tokens[4].begin(), &end);
if((tokens.size() > 5 && !std::isspace(*end)) || (tokens.size() == 5 && *end != '\0')) {
printf("ERROR: failed to parse rate `%s'.\n", printable(tokens[4]).c_str());
is_error = true;
continue;
}
if(rate <= 0 || rate > 1) {
printf("ERROR: invalid rate `%f'; must satisfy 0 < rate <= 1\n", rate);
if(tpsRate < 0) {
printf("ERROR: rate cannot be negative `%f'\n", tpsRate);
is_error = true;
continue;
}
@ -3870,7 +3870,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
duration = parsedDuration.get();
}
wait(ThrottleApi::throttleTag(db, tokens[3], rate, now()+duration, true, false));
wait(ThrottleApi::throttleTag(db, tokens[3], tpsRate, now()+duration, true, false));
printf("Tag `%s' has been throttled\n", tokens[3].toString().c_str());
}
else if(tokencmp(tokens[1], "off")) {

View File

@ -30,6 +30,7 @@
#include "flow/TDMetric.actor.h"
#include "fdbclient/EventTypes.actor.h"
#include "fdbrpc/ContinuousSample.h"
#include "fdbrpc/Smoother.h"
class StorageServerInfo : public ReferencedInterface<StorageServerInterface> {
public:
@ -45,6 +46,51 @@ private:
typedef MultiInterface<ReferencedInterface<StorageServerInterface>> LocationInfo;
typedef MultiInterface<MasterProxyInterface> ProxyInfo;
class ClientTagThrottleData {
private:
double tpsRate;
Smoother smoothRate;
Smoother smoothReleased;
public:
double expiration;
ClientTagThrottleData(double tpsRate, double expiration)
: tpsRate(tpsRate), expiration(expiration), smoothRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW),
smoothReleased(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW)
{
ASSERT(tpsRate >= 0);
smoothRate.reset(tpsRate);
}
void updateRate(double tpsRate) {
ASSERT(tpsRate >= 0);
this->tpsRate = tpsRate;
smoothRate.setTotal(tpsRate);
}
void addReleased(int released) {
smoothReleased.addDelta(released);
}
double throttleDuration() {
if(expiration <= now()) {
return 0.0;
}
double capacity = (smoothRate.smoothTotal() - smoothReleased.smoothRate()) * CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW;
if(capacity >= 1) {
return 0.0;
}
if(tpsRate == 0) {
return std::max(0.0, expiration - now());
}
return std::min(expiration - now(), capacity / tpsRate);
}
};
class DatabaseContext : public ReferenceCounted<DatabaseContext>, public FastAllocated<DatabaseContext>, NonCopyable {
public:
static DatabaseContext* allocateOnForeignThread() {
@ -167,7 +213,7 @@ public:
UID dbId;
bool internal; // Only contexts created through the C client and fdbcli are non-internal
PrioritizedTagThrottleMap throttledTags;
PrioritizedTagThrottleMap<ClientTagThrottleData> throttledTags;
CounterCollection cc;

View File

@ -220,6 +220,7 @@ ClientKnobs::ClientKnobs(bool randomize) {
init( MAX_TAGS_PER_TRANSACTION, 100 );
init( MAX_TRANSACTION_TAG_LENGTH, 100 );
init( READ_TAG_SAMPLE_RATE, 1.0 ); // Communicated to clients from cluster
init( TAG_THROTTLE_SMOOTHING_WINDOW, 5.0 );
// clang-format on
}

View File

@ -209,6 +209,7 @@ public:
int MAX_TRANSACTION_TAG_LENGTH;
int MAX_TAGS_PER_TRANSACTION;
double READ_TAG_SAMPLE_RATE; // Communicated to clients from cluster
double TAG_THROTTLE_SMOOTHING_WINDOW;
ClientKnobs(bool randomize = false);
};

View File

@ -165,7 +165,7 @@ struct GetReadVersionReply {
bool locked;
Optional<Value> metadataVersion;
TagThrottleMap tagThrottleInfo;
TagThrottleMap<TagThrottleInfo> tagThrottleInfo;
GetReadVersionReply() : version(invalidVersion), locked(false) {}
@ -191,12 +191,14 @@ struct GetReadVersionRequest : TimedRequest {
uint32_t transactionCount;
uint32_t flags;
TagSet tags;
std::map<TransactionTag, uint32_t> tags; // TODO: compact
Optional<UID> debugID;
ReplyPromise<GetReadVersionReply> reply;
GetReadVersionRequest() : transactionCount( 1 ), flags( PRIORITY_DEFAULT ) {}
GetReadVersionRequest( uint32_t transactionCount, uint32_t flags, TagSet tags = TagSet(), Optional<UID> debugID = Optional<UID>() )
GetReadVersionRequest( uint32_t transactionCount, uint32_t flags, std::map<TransactionTag, uint32_t> tags = std::map<TransactionTag, uint32_t>(), Optional<UID> debugID = Optional<UID>() )
: transactionCount( transactionCount ), flags( flags ), tags(tags), debugID( debugID ) {}
int priority() const { return flags & FLAG_PRIORITY_MASK; }

View File

@ -3169,7 +3169,7 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
}
}
ACTOR Future<GetReadVersionReply> getConsistentReadVersion( DatabaseContext *cx, uint32_t transactionCount, uint32_t flags, TagSet tags, Optional<UID> debugID ) {
ACTOR Future<GetReadVersionReply> getConsistentReadVersion( DatabaseContext *cx, uint32_t transactionCount, uint32_t flags, std::map<TransactionTag, uint32_t> tags, Optional<UID> debugID ) {
try {
++cx->transactionReadVersionBatches;
if( debugID.present() )
@ -3180,8 +3180,12 @@ ACTOR Future<GetReadVersionReply> getConsistentReadVersion( DatabaseContext *cx,
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetReadVersionReply v = wait( loadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) {
auto &priorityThrottledTags = cx->throttledTags[TagThrottleInfo::priorityFromReadVersionFlags(flags)];
for(auto tag : v.tagThrottleInfo) {
priorityThrottledTags[tag.first] = tag.second;
for(auto tag : v.tagThrottleInfo) { // TODO: remove any that aren't sent back?
auto result = priorityThrottledTags.try_emplace(tag.first, tag.second.tpsRate, tag.second.expiration);
if(!result.second) {
result.first->second.updateRate(tag.second.tpsRate);
result.first->second.expiration = tag.second.expiration;
}
}
if( debugID.present() )
@ -3207,7 +3211,7 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
state Optional<UID> debugID;
state bool send_batch;
state TagSet tags;
state std::map<TransactionTag, uint32_t> tags;
// dynamic batching
state PromiseStream<double> replyTimes;
@ -3226,7 +3230,7 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
}
requests.push_back(req.reply);
for(auto tag : req.tags) {
tags.addTag(tag);
++tags[tag];
}
if (requests.size() == CLIENT_KNOBS->MAX_BATCH_SIZE)
@ -3254,7 +3258,7 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
getConsistentReadVersion(cx, count, flags, tags, std::move(debugID)),
std::vector<Promise<GetReadVersionReply>>(std::move(requests)), CLIENT_KNOBS->BROADCAST_BATCH_SIZE);
tags = TagSet();
tags.clear();
debugID = Optional<UID>();
requests = std::vector< Promise<GetReadVersionReply> >();
addActor.send(batch);
@ -3263,7 +3267,7 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
}
}
ACTOR Future<Version> extractReadVersion(DatabaseContext* cx, uint32_t flags, Reference<TransactionLogInfo> trLogInfo, Future<GetReadVersionReply> f, bool lockAware, double startTime, Promise<Optional<Value>> metadataVersion, TagSet tags, double throttledRate) {
ACTOR Future<Version> extractReadVersion(DatabaseContext* cx, uint32_t flags, Reference<TransactionLogInfo> trLogInfo, Future<GetReadVersionReply> f, bool lockAware, double startTime, Promise<Optional<Value>> metadataVersion, TagSet tags) {
GetReadVersionReply rep = wait(f);
double latency = now() - startTime;
cx->GRVLatencies.addSample(latency);
@ -3278,23 +3282,24 @@ ACTOR Future<Version> extractReadVersion(DatabaseContext* cx, uint32_t flags, Re
++cx->transactionReadVersionsCompleted;
auto &priorityThrottledTags = cx->throttledTags[TagThrottleInfo::priorityFromReadVersionFlags(flags)];
double newMaxThrottle = throttledRate;
for(auto tag : tags) {
auto itr = priorityThrottledTags.find(tag);
if(itr != priorityThrottledTags.end()) {
if(itr->second.expiration > now()) {
newMaxThrottle = std::max(newMaxThrottle, itr->second.rate);
}
else {
if(itr->second.expiration <= now()) {
priorityThrottledTags.erase(itr);
}
else if(itr->second.throttleDuration() > 0) {
++cx->transactionReadVersionsThrottled;
throw tag_throttled();
}
}
}
// If the throttle rate is higher now than when we made the request, we may still need to throttle this response
if(newMaxThrottle > throttledRate && deterministicRandom()->random01() < 1.0 - (1.0-newMaxThrottle)/(1.0-throttledRate)) {
++cx->transactionReadVersionsThrottled;
throw tag_throttled();
for(auto tag : tags) {
auto itr = priorityThrottledTags.find(tag);
if(itr != priorityThrottledTags.end()) {
itr->second.addReleased(1);
}
}
if(rep.version > cx->metadataVersionCache[cx->mvCacheInsertLocation].first) {
@ -3309,32 +3314,28 @@ ACTOR Future<Version> extractReadVersion(DatabaseContext* cx, uint32_t flags, Re
Future<Version> Transaction::getReadVersion(uint32_t flags) {
if (!readVersion.isValid()) {
++cx->transactionReadVersions;
TagThrottleInfo::Priority priority;
flags |= options.getReadVersionFlags;
if((flags & GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE) == GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE) {
priority = TagThrottleInfo::Priority::IMMEDIATE;
++cx->transactionImmediateReadVersions;
}
else if((flags & GetReadVersionRequest::PRIORITY_DEFAULT) == GetReadVersionRequest::PRIORITY_DEFAULT) {
priority = TagThrottleInfo::Priority::DEFAULT;
++cx->transactionDefaultReadVersions;
}
else if((flags & GetReadVersionRequest::PRIORITY_BATCH) == GetReadVersionRequest::PRIORITY_BATCH) {
priority = TagThrottleInfo::Priority::BATCH;
++cx->transactionBatchReadVersions;
}
else {
ASSERT(false);
}
double maxThrottle = 0;
double maxThrottleDelay = 0.0;
if(options.tags.size() != 0) {
auto priorityThrottledTags = cx->throttledTags[priority];
auto priorityThrottledTags = cx->throttledTags[TagThrottleInfo::priorityFromReadVersionFlags(flags)];
for(auto tag : options.tags) {
auto itr = priorityThrottledTags.find(tag);
if(itr != priorityThrottledTags.end()) {
if(itr->second.expiration > now()) {
maxThrottle = std::max(maxThrottle, itr->second.rate);
maxThrottleDelay = std::max(maxThrottleDelay, itr->second.throttleDuration());
}
else {
priorityThrottledTags.erase(itr);
@ -3343,8 +3344,7 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
}
}
// TODO: can we protect against client spamming this request (e.g. set absolute limits or delay for all transactions starting with throttle tag)
if(maxThrottle > 0 && deterministicRandom()->random01() < maxThrottle) {
if(maxThrottleDelay > 0.0) { // TODO: allow delaying?
++cx->transactionReadVersionsThrottled;
return Future<Version>(tag_throttled());
}
@ -3357,7 +3357,7 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
auto const req = DatabaseContext::VersionRequest(options.tags, info.debugID);
batcher.stream.send(req);
startTime = now();
readVersion = extractReadVersion( cx.getPtr(), flags, trLogInfo, req.reply.getFuture(), options.lockAware, startTime, metadataVersion, options.tags, maxThrottle);
readVersion = extractReadVersion( cx.getPtr(), flags, trLogInfo, req.reply.getFuture(), options.lockAware, startTime, metadataVersion, options.tags);
}
return readVersion;
}

View File

@ -429,7 +429,7 @@ struct StorageQueuingMetricsReply {
double cpuUsage;
double diskUsage;
double localRateLimit;
Optional<Standalone<StringRef>> busiestTag;
Optional<TransactionTag> busiestTag;
double busiestTagFractionalBusyness;
double busiestTagRate;

View File

@ -24,7 +24,7 @@
#include "flow/actorcompiler.h" // has to be last include
void TagSet::addTag(StringRef tag) {
void TagSet::addTag(TransactionTagRef tag) {
ASSERT(CLIENT_KNOBS->MAX_TRANSACTION_TAG_LENGTH < 256); // Tag length is encoded with a single byte
if(tag.size() > CLIENT_KNOBS->MAX_TRANSACTION_TAG_LENGTH) {
@ -34,7 +34,7 @@ void TagSet::addTag(StringRef tag) {
throw too_many_tags();
}
tags.insert(StringRef(arena, tag));
tags.insert(TransactionTagRef(arena, tag));
bytes += tag.size();
}
@ -70,7 +70,7 @@ namespace ThrottleApi {
// The tags are listed in sorted order
//
// Currently, the throttle API supports only 1 tag per throttle
Standalone<StringRef> throttleKeyForTags(std::set<StringRef> const& tags) {
Key throttleKeyForTags(std::set<TransactionTagRef> const& tags) {
ASSERT(CLIENT_KNOBS->MAX_TRANSACTION_TAG_LENGTH < 256);
ASSERT(tags.size() > 0);
@ -101,19 +101,19 @@ namespace ThrottleApi {
return result;
}
StringRef tagFromThrottleKey(KeyRef key) {
StringRef tag = key.substr(tagThrottleKeysPrefix.size()+1);
TransactionTagRef tagFromThrottleKey(KeyRef key) {
TransactionTagRef tag = key.substr(tagThrottleKeysPrefix.size()+1);
ASSERT(tag.size() == key.begin()[tagThrottleKeysPrefix.size()]); // TODO: support multiple tags per throttle
return tag;
}
ACTOR Future<std::map<Standalone<StringRef>, TagThrottleInfo>> getTags(Database db, int limit) {
ACTOR Future<std::map<TransactionTag, TagThrottleInfo>> getTags(Database db, int limit) {
state Transaction tr(db);
loop {
try {
Standalone<RangeResultRef> tags = wait(tr.getRange(tagThrottleKeys, limit));
std::map<Standalone<StringRef>, TagThrottleInfo> results;
std::map<TransactionTag, TagThrottleInfo> results;
for(auto tag : tags) {
results[tagFromThrottleKey(tag.key)] = decodeTagThrottleValue(tag.value);
}
@ -125,11 +125,11 @@ namespace ThrottleApi {
}
}
ACTOR Future<Void> throttleTag(Database db, Standalone<StringRef> tag, double rate, double expiration, bool serializeExpirationAsDuration, bool autoThrottled) {
ACTOR Future<Void> throttleTag(Database db, TransactionTagRef tag, double tpsRate, double expiration, bool serializeExpirationAsDuration, bool autoThrottled) {
state Transaction tr(db);
state Key key = throttleKeyForTags(std::set<StringRef>{ tag });
state Key key = throttleKeyForTags(std::set<TransactionTagRef>{ tag });
TagThrottleInfo throttle(rate, expiration, autoThrottled, TagThrottleInfo::Priority::DEFAULT, serializeExpirationAsDuration);
TagThrottleInfo throttle(tpsRate, expiration, autoThrottled, TagThrottleInfo::Priority::DEFAULT, serializeExpirationAsDuration);
BinaryWriter wr(IncludeVersion());
wr << throttle;
state Value value = wr.toValue();
@ -151,9 +151,9 @@ namespace ThrottleApi {
}
}
ACTOR Future<bool> unthrottleTag(Database db, Standalone<StringRef> tag) {
ACTOR Future<bool> unthrottleTag(Database db, TransactionTagRef tag) {
state Transaction tr(db);
state Key key = throttleKeyForTags(std::set<StringRef>{ tag });
state Key key = throttleKeyForTags(std::set<TransactionTagRef>{ tag });
loop {
try {

View File

@ -32,6 +32,9 @@
class Database;
typedef StringRef TransactionTagRef;
typedef Standalone<TransactionTagRef> TransactionTag;
struct TagThrottleInfo {
enum class Priority {
BATCH,
@ -55,22 +58,22 @@ struct TagThrottleInfo {
static Priority priorityFromReadVersionFlags(int flags);
double rate;
double tpsRate;
double expiration;
bool autoThrottled;
Priority priority;
bool serializeExpirationAsDuration;
TagThrottleInfo() : rate(0), expiration(0), autoThrottled(false), priority(Priority::DEFAULT), serializeExpirationAsDuration(true) {}
TagThrottleInfo(double rate, double expiration, bool autoThrottled, Priority priority, bool serializeExpirationAsDuration)
: rate(rate), expiration(expiration), autoThrottled(autoThrottled), priority(priority),
TagThrottleInfo() : tpsRate(0), expiration(0), autoThrottled(false), priority(Priority::DEFAULT), serializeExpirationAsDuration(true) {}
TagThrottleInfo(double tpsRate, double expiration, bool autoThrottled, Priority priority, bool serializeExpirationAsDuration)
: tpsRate(tpsRate), expiration(expiration), autoThrottled(autoThrottled), priority(priority),
serializeExpirationAsDuration(serializeExpirationAsDuration) {}
template<class Ar>
void serialize(Ar& ar) {
if(ar.isDeserializing) {
serializer(ar, rate, expiration, autoThrottled, priority, serializeExpirationAsDuration);
serializer(ar, tpsRate, expiration, autoThrottled, priority, serializeExpirationAsDuration);
if(serializeExpirationAsDuration) {
expiration += now();
}
@ -81,7 +84,7 @@ struct TagThrottleInfo {
serializedExpiration = std::max(expiration - now(), 0.0);
}
serializer(ar, rate, serializedExpiration, autoThrottled, priority, serializeExpirationAsDuration);
serializer(ar, tpsRate, serializedExpiration, autoThrottled, priority, serializeExpirationAsDuration);
}
}
};
@ -90,11 +93,11 @@ BINARY_SERIALIZABLE(TagThrottleInfo::Priority);
class TagSet {
public:
typedef std::set<StringRef>::const_iterator const_iterator;
typedef std::set<TransactionTagRef>::const_iterator const_iterator;
TagSet() : bytes(0) {}
void addTag(StringRef tag);
void addTag(TransactionTagRef tag);
size_t size();
const_iterator begin() const {
@ -107,7 +110,7 @@ public:
//private:
Arena arena; // TODO: where to hold this memory?
std::set<StringRef> tags;
std::set<TransactionTagRef> tags;
size_t bytes;
};
@ -139,7 +142,7 @@ struct dynamic_size_traits<TagSet> : std::true_type {
while(data < end) {
uint8_t len = *data;
++data;
StringRef tag(context.tryReadZeroCopy(data, len), len);
TransactionTagRef tag(context.tryReadZeroCopy(data, len), len);
data += len;
t.tags.insert(tag);
@ -150,20 +153,23 @@ struct dynamic_size_traits<TagSet> : std::true_type {
}
};
typedef std::unordered_map<Standalone<StringRef>, TagThrottleInfo, std::hash<StringRef>> TagThrottleMap;
typedef std::map<TagThrottleInfo::Priority, TagThrottleMap> PrioritizedTagThrottleMap;
template<class Value>
using TagThrottleMap = std::unordered_map<TransactionTag, Value, std::hash<TransactionTagRef>>;
template<class Value>
using PrioritizedTagThrottleMap = std::map<TagThrottleInfo::Priority, TagThrottleMap<Value>>;
namespace ThrottleApi {
// Currently, only 1 tag in a key is supported
Standalone<StringRef> throttleKeyForTags(std::set<StringRef> const& tags);
StringRef tagFromThrottleKey(KeyRef key);
Key throttleKeyForTags(std::set<TransactionTagRef> const& tags);
TransactionTagRef tagFromThrottleKey(KeyRef key);
Future<std::map<Standalone<StringRef>, TagThrottleInfo>> getTags(Database const& db, int const& limit);
Future<std::map<TransactionTag, TagThrottleInfo>> getTags(Database const& db, int const& limit);
Future<Void> throttleTag(Database const& db, Standalone<StringRef> const& tag, double const& rate, double const& expiration,
Future<Void> throttleTag(Database const& db, TransactionTagRef const& tag, double const& tpsRate, double const& expiration,
bool const& serializeExpirationAsDuration, bool const& autoThrottled); // TODO: priorities
Future<bool> unthrottleTag(Database const& db, Standalone<StringRef> const& tag);
Future<bool> unthrottleTag(Database const& db, TransactionTagRef const& tag);
Future<uint64_t> unthrottleManual(Database db);
Future<uint64_t> unthrottleAuto(Database db);

View File

@ -516,7 +516,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( BEHIND_CHECK_VERSIONS, 5 * VERSIONS_PER_SECOND );
init( WAIT_METRICS_WRONG_SHARD_CHANCE, isSimulated ? 1.0 : 0.1 );
init( MIN_TAG_PAGES_READ_RATE, 1.0e4 ); if( randomize && BUGGIFY ) MIN_TAG_PAGES_READ_RATE = 0;
init( TAG_MEASUREMENT_INTERVAL, 30.0 ); if( randomize && BUGGIFY ) TAG_MEASUREMENT_INTERVAL = 1.0;
init( READ_TAG_MEASUREMENT_INTERVAL, 30.0 ); if( randomize && BUGGIFY ) READ_TAG_MEASUREMENT_INTERVAL = 1.0;
//Wait Failure
init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2;

View File

@ -384,7 +384,7 @@ public:
double TLOG_IGNORE_POP_AUTO_ENABLE_DELAY;
int64_t MAX_THROTTLED_TAGS;
int64_t MIN_TAG_BUSYNESS;
double MIN_TAG_BUSYNESS;
double TAG_THROTTLE_DURATION;
bool AUTO_TAG_THROTTLING_ENABLED;
@ -455,7 +455,7 @@ public:
int64_t BEHIND_CHECK_VERSIONS;
double WAIT_METRICS_WRONG_SHARD_CHANCE;
int64_t MIN_TAG_PAGES_READ_RATE;
double TAG_MEASUREMENT_INTERVAL;
double READ_TAG_MEASUREMENT_INTERVAL;
//Wait Failure
int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS;

View File

@ -99,7 +99,7 @@ struct ProxyStats {
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, double* outTransactionRate,
double* outBatchTransactionRate, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply,
PrioritizedTagThrottleMap *throttledTags) {
TagThrottleMap<uint64_t>* transactionTagCounter, PrioritizedTagThrottleMap<TagThrottleInfo>* throttledTags) {
state Future<Void> nextRequestTimer = Never();
state Future<Void> leaseTimeout = Never();
state Future<GetRateInfoReply> reply = Never();
@ -123,7 +123,8 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
when ( wait( nextRequestTimer ) ) {
nextRequestTimer = Never();
bool detailed = now() - lastDetailedReply > SERVER_KNOBS->DETAILED_METRIC_UPDATE_RATE;
reply = brokenPromiseToNever(db->get().ratekeeper.get().getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, detailed)));
reply = brokenPromiseToNever(db->get().ratekeeper.get().getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, *transactionTagCounter, detailed)));
transactionTagCounter->clear();
expectingDetailedReply = detailed;
}
when ( GetRateInfoReply rep = wait(reply) ) {
@ -140,7 +141,12 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
lastDetailedReply = now();
}
*throttledTags = std::move(rep.throttledTags);
for(auto priorityTags : rep.throttledTags) {
auto &localPriorityTags = (*throttledTags)[priorityTags.first];
for(auto tag : priorityTags.second) { // TODO: remove missing tags
localPriorityTags[tag.first] = tag.second;
}
}
}
when ( wait( leaseTimeout ) ) {
*outTransactionRate = 0;
@ -180,7 +186,9 @@ ACTOR Future<Void> queueTransactionStartRequests(
FutureStream<GetReadVersionRequest> readVersionRequests,
PromiseStream<Void> GRVTimer, double *lastGRVTime,
double *GRVBatchTime, FutureStream<double> replyTimes,
ProxyStats* stats, TransactionRateInfo* batchRateInfo)
ProxyStats* stats, TransactionRateInfo* batchRateInfo,
PrioritizedTagThrottleMap<TagThrottleInfo>* throttledTags,
TagThrottleMap<uint64_t>* transactionTagCounter)
{
loop choose{
when(GetReadVersionRequest req = waitNext(readVersionRequests)) {
@ -194,6 +202,11 @@ ACTOR Future<Void> queueTransactionStartRequests(
req.reply.send(rep);
TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceeded").suppressFor(60);
} else {
// TODO: this probably needs to happen outside the high priority path
for(auto tag : req.tags) {
transactionTagCounter[tag] += req.transactionCount;
}
if (req.debugID.present())
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.queueTransactionStartRequests.Before");
@ -1292,7 +1305,7 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commi
}
ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::vector<GetReadVersionRequest> requests,
ProxyStats* stats, Version minKnownCommittedVersion, PrioritizedTagThrottleMap throttledTags) {
ProxyStats* stats, Version minKnownCommittedVersion, PrioritizedTagThrottleMap<TagThrottleInfo> throttledTags) {
GetReadVersionReply _baseReply = wait(replyFuture);
GetReadVersionReply baseReply = _baseReply;
double end = g_network->timer();
@ -1315,13 +1328,13 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::
auto itr = throttledTags.find(TagThrottleInfo::priorityFromReadVersionFlags(request.flags));
ASSERT(itr != throttledTags.end());
auto tagItr = itr->second.find(tag);
auto tagItr = itr->second.find(tag.first);
while(tagItr == itr->second.end() && itr != throttledTags.begin()) {
--itr;
}
if(tagItr != itr->second.end()) {
reply.tagThrottleInfo[tag] = tagItr->second;
reply.tagThrottleInfo[tag.first] = tagItr->second;
}
}
@ -1353,12 +1366,13 @@ ACTOR static Future<Void> transactionStarter(
state Deque<GetReadVersionRequest> batchQueue;
state vector<MasterProxyInterface> otherProxies;
state PrioritizedTagThrottleMap throttledTags;
state TagThrottleMap<uint64_t> transactionTagCounter;
state PrioritizedTagThrottleMap<TagThrottleInfo> throttledTags;
state PromiseStream<double> replyTimes;
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo.rate, &batchRateInfo.rate, healthMetricsReply, detailedHealthMetricsReply, &throttledTags));
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo.rate, &batchRateInfo.rate, healthMetricsReply, detailedHealthMetricsReply, &transactionTagCounter, &throttledTags));
addActor.send(queueTransactionStartRequests(db, &systemQueue, &defaultQueue, &batchQueue, proxy.getConsistentReadVersion.getFuture(),
GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats, &batchRateInfo));
GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats, &batchRateInfo, &throttledTags, &transactionTagCounter));
// Get a list of the other proxies that go together with us
while (std::find(db->get().client.proxies.begin(), db->get().client.proxies.end(), proxy) == db->get().client.proxies.end())

View File

@ -96,7 +96,7 @@ struct StorageQueueInfo {
Smoother smoothTotalSpace;
limitReason_t limitReason;
Optional<Standalone<StringRef>> busiestTag;
Optional<TransactionTag> busiestTag;
double busiestTagFractionalBusyness;
double busiestTagRate;
@ -128,6 +128,58 @@ struct TLogQueueInfo {
}
};
class RkTagThrottleData : NonCopyable {
private:
Smoother clientRate;
Smoother smoothRate;
Smoother smoothRequests;
public:
double tpsRate;
double expiration;
int64_t count;
RkTagThrottleData(double tpsRate, double expiration)
: tpsRate(tpsRate), expiration(expiration), smoothRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW),
smoothRequests(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW), clientRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW), count(0)
{
ASSERT(tpsRate >= 0);
smoothRate.reset(tpsRate);
clientRate.reset(tpsRate);
}
void updateClientRate() {
double startClientRate = clientRate.smoothTotal();
if(tpsRate == 0.0) {
clientRate.reset(0.0);
}
else if(smoothRequests.smoothRate() == 0) {
clientRate.setTotal(smoothRate.smoothTotal());
}
else {
double targetRate = smoothRate.smoothTotal();
clientRate.setTotal(std::min(targetRate, (targetRate / smoothRequests.smoothRate()) * clientRate.smoothTotal()));
}
}
void updateRate(double tpsRate) {
ASSERT(tpsRate >= 0);
this->tpsRate = tpsRate;
smoothRate.setTotal(tpsRate);
updateClientRate();
}
void addRequests(int requests) {
smoothRequests.addDelta(requests);
updateClientRate();
}
double getClientRate() {
return clientRate.smoothTotal();
}
};
struct RatekeeperLimits {
double tpsLimit;
Int64MetricHandle tpsLimitMetric;
@ -187,7 +239,7 @@ struct RatekeeperData {
double lastWarning;
double lastSSListFetchedTimestamp;
PrioritizedTagThrottleMap tagThrottles;
PrioritizedTagThrottleMap<RkTagThrottleData> throttledTags;
RatekeeperLimits normalLimits;
RatekeeperLimits batchLimits;
@ -204,9 +256,9 @@ struct RatekeeperData {
batchLimits("Batch", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH, SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH),
autoThrottlingEnabled(false)
{
tagThrottles.try_emplace(TagThrottleInfo::Priority::IMMEDIATE);
tagThrottles.try_emplace(TagThrottleInfo::Priority::DEFAULT);
tagThrottles.try_emplace(TagThrottleInfo::Priority::BATCH);
throttledTags.try_emplace(TagThrottleInfo::Priority::IMMEDIATE);
throttledTags.try_emplace(TagThrottleInfo::Priority::DEFAULT);
throttledTags.try_emplace(TagThrottleInfo::Priority::BATCH);
}
};
@ -407,15 +459,11 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
self->autoThrottlingEnabled = SERVER_KNOBS->AUTO_TAG_THROTTLING_ENABLED;
}
TraceEvent("RatekeeperReadThrottles").detail("NumThrottledTags", throttledTags.get().size());
PrioritizedTagThrottleMap newThrottles;
std::map<TagThrottleInfo::Priority, std::pair<TagThrottleMap::iterator, TagThrottleMap::iterator>> oldThrottleIterators;
for(auto t : self->tagThrottles) {
oldThrottleIterators[t.first] = std::make_pair(t.second.begin(), t.second.end());
}
PrioritizedTagThrottleMap<RkTagThrottleData> updatedTagThrottles;
TraceEvent("RatekeeperReadThrottles").detail("NumThrottledTags", throttledTags.get().size());
for(auto entry : throttledTags.get()) {
StringRef tag = ThrottleApi::tagFromThrottleKey(entry.key);
TransactionTagRef tag = ThrottleApi::tagFromThrottleKey(entry.key);
TagThrottleInfo throttleInfo = ThrottleApi::decodeTagThrottleValue(entry.value);
TraceEvent("RatekeeperReadThrottleRead").detail("Tag", tag).detail("Expiration", throttleInfo.expiration);
if((!self->autoThrottlingEnabled && throttleInfo.autoThrottled) || throttleInfo.expiration <= now()) { // TODO: keep or delete auto throttles when disabling auto-throttling
@ -432,25 +480,38 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
tr.set(entry.key, value);
}
auto oldItr = oldThrottleIterators[throttleInfo.priority];
while(oldItr.first != oldItr.second && oldItr.first->first < tag) {
++oldItr.first;
}
if(oldItr.first == oldItr.second || oldItr.first->first != tag || oldItr.first->second.rate < throttleInfo.rate * 0.95) {
TraceEvent("RatekeeperDetectedThrottle")
auto &priorityThrottledTags = self->throttledTags[throttleInfo.priority];
auto itr = priorityThrottledTags.find(tag);
if(itr == priorityThrottledTags.end()) {
TraceEvent("RatekeeperAddingThrottle")
.detail("Tag", tag)
.detail("Rate", throttleInfo.rate)
.detail("Rate", throttleInfo.tpsRate)
.detail("Priority", TagThrottleInfo::priorityToString(throttleInfo.priority))
.detail("SecondsToExpiration", throttleInfo.expiration - now())
.detail("AutoThrottled", throttleInfo.autoThrottled);
}
newThrottles[throttleInfo.priority][tag] = throttleInfo;
updatedTagThrottles[throttleInfo.priority].try_emplace(tag, throttleInfo.tpsRate, throttleInfo.expiration);
}
else {
auto node = priorityThrottledTags.extract(itr);
if(node.mapped().tpsRate != throttleInfo.tpsRate || node.mapped().expiration != throttleInfo.expiration) {
TraceEvent("RatekeeperUpdatingThrottle")
.detail("Tag", tag)
.detail("Rate", throttleInfo.tpsRate)
.detail("Priority", TagThrottleInfo::priorityToString(throttleInfo.priority))
.detail("SecondsToExpiration", throttleInfo.expiration - now())
.detail("AutoThrottled", throttleInfo.autoThrottled);
node.mapped().updateRate(throttleInfo.tpsRate);
node.mapped().expiration = throttleInfo.expiration;
}
updatedTagThrottles[throttleInfo.priority].insert(std::move(node));
}
}
}
self->tagThrottles = newThrottles;
self->throttledTags = std::move(updatedTagThrottles);
state Future<Void> watchFuture = tr.watch(tagThrottleSignalKey);
wait(tr.commit());
@ -464,7 +525,7 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
}
}
void updateRate(RatekeeperData* self, RatekeeperLimits* limits, TagThrottleMap& throttledTags) {
void updateRate(RatekeeperData* self, RatekeeperLimits* limits, TagThrottleMap<RkTagThrottleData>& throttledTags) {
//double controlFactor = ; // dt / eFoldingTime
double actualTps = self->smoothReleasedTransactions.smoothRate();
@ -532,27 +593,27 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits, TagThrottleMap&
double targetRateRatio = std::min(( storageQueue - targetBytes + springBytes ) / (double)springBytes, 2.0);
// TODO: limit the number of throttles active for a storage server
if(storageQueue > targetBytes / 2.0 && ss.busiestTag.present() && ss.busiestTagFractionalBusyness > 0.2
/*if(storageQueue > targetBytes / 2.0 && ss.busiestTag.present() && ss.busiestTagFractionalBusyness > 0.2
&& ss.busiestTagRate > 1000 && throttledTags.size() < 10) {
auto &throttle = throttledTags[ss.busiestTag.get()];
double throttleRate = (storageQueue - targetBytes / 2.0) / targetBytes / 2.0;
if(throttle.expiration <= now() || throttle.rate < throttleRate * 0.95) {
if(throttle.expiration <= now() || throttle.tpsRate > throttleRate * 0.95) {
TraceEvent(format("RatekeeperThrottlingTag%s", limits->context).c_str())
.detail("Tag", ss.busiestTag.get())
.detail("ThrottleRate", throttleRate);
}
if(throttle.expiration <= now()) {
throttle.rate = throttleRate;
throttle.tpsRate = throttleRate;
}
else {
throttle.expiration = std::max(throttle.rate, throttleRate);
throttle.expiration = std::max(throttle.tpsRate, throttleRate);
}
throttle.expiration = now() + 120.0;
}
}*/
double inputRate = ss.smoothInputBytes.smoothRate();
//inputRate = std::max( inputRate, actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE );
@ -884,8 +945,8 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
state bool lastLimited = false;
loop choose {
when (wait( timeout )) {
updateRate(&self, &self.normalLimits, self.tagThrottles[TagThrottleInfo::Priority::DEFAULT]);
updateRate(&self, &self.batchLimits, self.tagThrottles[TagThrottleInfo::Priority::BATCH]);
updateRate(&self, &self.normalLimits, self.throttledTags[TagThrottleInfo::Priority::DEFAULT]);
updateRate(&self, &self.batchLimits, self.throttledTags[TagThrottleInfo::Priority::BATCH]);
lastLimited = self.smoothReleasedTransactions.smoothRate() > SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit;
double tooOld = now() - 1.0;
@ -917,16 +978,25 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
reply.batchTransactionRate = self.batchLimits.tpsLimit / self.proxy_transactionCounts.size();
reply.leaseDuration = SERVER_KNOBS->METRIC_UPDATE_RATE;
for(auto tag : req.throttledTagCounts) { // TODO: add only if this data is recent?
for(auto &priorityItr : self.throttledTags) { // TODO: we don't need this loop
auto itr = priorityItr.second.find(tag.first);
if(itr != priorityItr.second.end()) {
itr->second.addRequests(tag.second);
}
}
}
// TODO: avoid iteration every time
for(auto priorityItr = self.tagThrottles.begin(); priorityItr != self.tagThrottles.end(); ++priorityItr) {
auto &priorityTags = reply.throttledTags[priorityItr->first];
for(auto tagItr = priorityItr->second.begin(); tagItr != priorityItr->second.end();) {
for(auto &priorityItr : self.throttledTags) {
auto &priorityTags = reply.throttledTags[priorityItr.first];
for(auto tagItr = priorityItr.second.begin(); tagItr != priorityItr.second.end();) {
if(tagItr->second.expiration > now()) {
priorityTags[tagItr->first] = tagItr->second;
auto result = priorityTags.insert_or_assign(tagItr->first, TagThrottleInfo(tagItr->second.getClientRate(), tagItr->second.expiration, false, priorityItr.first, true)); // TODO: Different structure?
++tagItr;
}
else {
tagItr = priorityItr->second.erase(tagItr);
tagItr = priorityItr.second.erase(tagItr);
}
}
}

View File

@ -59,7 +59,7 @@ struct GetRateInfoReply {
double leaseDuration;
HealthMetrics healthMetrics;
PrioritizedTagThrottleMap throttledTags;
PrioritizedTagThrottleMap<TagThrottleInfo> throttledTags;
template <class Ar>
void serialize(Ar& ar) {
@ -72,16 +72,18 @@ struct GetRateInfoRequest {
UID requesterID;
int64_t totalReleasedTransactions;
int64_t batchReleasedTransactions;
TagThrottleMap<uint64_t> throttledTagCounts;
bool detailed;
ReplyPromise<struct GetRateInfoReply> reply;
GetRateInfoRequest() {}
GetRateInfoRequest(UID const& requesterID, int64_t totalReleasedTransactions, int64_t batchReleasedTransactions, bool detailed)
: requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions), batchReleasedTransactions(batchReleasedTransactions), detailed(detailed) {}
GetRateInfoRequest(UID const& requesterID, int64_t totalReleasedTransactions, int64_t batchReleasedTransactions, TagThrottleMap<uint64_t> throttledTagCounts, bool detailed)
: requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions), batchReleasedTransactions(batchReleasedTransactions), throttledTagCounts(throttledTagCounts), detailed(detailed) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, requesterID, totalReleasedTransactions, batchReleasedTransactions, detailed, reply);
serializer(ar, requesterID, totalReleasedTransactions, batchReleasedTransactions, throttledTagCounts, detailed, reply);
}
};

View File

@ -449,20 +449,20 @@ public:
return val;
}
struct TagCounters {
struct TransactionTagCounter {
struct TagInfo {
Standalone<StringRef> tag;
TransactionTag tag;
int64_t count;
double fractionalBusyness;
double elapsed;
TagInfo(Standalone<StringRef> const& tag, int64_t count, int64_t totalCount, double elapsed)
TagInfo(TransactionTag const& tag, int64_t count, int64_t totalCount, double elapsed)
: tag(tag), count(count), fractionalBusyness((double)count/totalCount), elapsed(elapsed) {}
};
std::unordered_map<Standalone<StringRef>, int64_t, std::hash<StringRef>> intervalCounts;
TagThrottleMap<int64_t> intervalCounts;
int64_t intervalTotalSampledCount = 0;
Standalone<StringRef> busiestTag;
TransactionTag busiestTag;
int64_t busiestTagCount = 0;
double intervalStart = 0;
@ -471,7 +471,7 @@ public:
void increment(Optional<TagSet> const& tags, int64_t delta) {
if(tags.present()) {
for(auto& tag : tags.get()) {
int64_t &count = intervalCounts[Standalone<StringRef>(tag, tags.get().arena)];
int64_t &count = intervalCounts[TransactionTag(tag, tags.get().arena)];
count += delta;
if(count > busiestTagCount) {
busiestTagCount = count;
@ -511,7 +511,7 @@ public:
}
};
TagCounters tagCounters;
TransactionTagCounter transactionTagCounter;
Optional<LatencyBandConfig> latencyBandConfig;
@ -973,7 +973,7 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
data->sendErrorWithPenalty(req.reply, e, data->getPenalty());
}
data->tagCounters.increment(req.tags, (resultSize / 4096) + 1);
data->transactionTagCounter.increment(req.tags, (resultSize / 4096) + 1);
++data->counters.finishedQueries;
--data->readQueueSizeMetric;
@ -1519,7 +1519,7 @@ ACTOR Future<Void> getKeyValues( StorageServer* data, GetKeyValuesRequest req )
data->sendErrorWithPenalty(req.reply, e, data->getPenalty());
}
data->tagCounters.increment(req.tags, (resultSize / 4096) + 1);
data->transactionTagCounter.increment(req.tags, (resultSize / 4096) + 1);
++data->counters.finishedQueries;
--data->readQueueSizeMetric;
@ -1579,7 +1579,7 @@ ACTOR Future<Void> getKey( StorageServer* data, GetKeyRequest req ) {
data->sendErrorWithPenalty(req.reply, e, data->getPenalty());
}
data->tagCounters.increment(req.tags, (resultSize/4096) + 1);
data->transactionTagCounter.increment(req.tags, (resultSize/4096) + 1);
++data->counters.finishedQueries;
--data->readQueueSizeMetric;
if(data->latencyBandConfig.present()) {
@ -1607,8 +1607,8 @@ void getQueuingMetrics( StorageServer* self, StorageQueuingMetricsRequest const&
reply.diskUsage = self->diskUsage;
reply.durableVersion = self->durableVersion.get();
Optional<StorageServer::TagCounters::TagInfo> busiestTag = self->tagCounters.getBusiestTag();
reply.busiestTag = busiestTag.map<Standalone<StringRef>>([](StorageServer::TagCounters::TagInfo tagInfo) { return tagInfo.tag; });
Optional<StorageServer::TransactionTagCounter::TagInfo> busiestTag = self->transactionTagCounter.getBusiestTag();
reply.busiestTag = busiestTag.map<TransactionTag>([](StorageServer::TransactionTagCounter::TagInfo tagInfo) { return tagInfo.tag; });
reply.busiestTagFractionalBusyness = busiestTag.present() ? busiestTag.get().fractionalBusyness : 0.0;
reply.busiestTagRate = busiestTag.present() ? busiestTag.get().count / busiestTag.get().elapsed : 0.0;
@ -3619,8 +3619,8 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
actors.add(logLongByteSampleRecovery(self->byteSampleRecovery));
actors.add(checkBehind(self));
self->tagCounters.startNewInterval(self->thisServerID);
actors.add(recurring([this](){ self->tagCounters.startNewInterval(self->thisServerID); }, SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL));
self->transactionTagCounter.startNewInterval(self->thisServerID);
actors.add(recurring([this](){ self->transactionTagCounter.startNewInterval(self->thisServerID); }, SERVER_KNOBS->READ_TAG_MEASUREMENT_INTERVAL));
self->coreStarted.send( Void() );