diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp index 573639484e..638f7d25fe 100644 --- a/fdbcli/fdbcli.actor.cpp +++ b/fdbcli/fdbcli.actor.cpp @@ -3808,15 +3808,15 @@ ACTOR Future cli(CLIOptions opt, LineNoise* plinenoise) { } } - std::map, TagThrottleInfo> tags = wait(ThrottleApi::getTags(db, throttleListLimit)); + std::map tags = wait(ThrottleApi::getTags(db, throttleListLimit)); if(tags.size() > 0) { printf("Throttled tags:\n\n"); - printf(" Rate | Expiration (s) | Priority | Type | Tag\n"); - printf(" ------+----------------+-----------+--------+------------------\n"); + printf(" Rate (txn/s) | Expiration (s) | Priority | Type | Tag\n"); + printf(" --------------+----------------+-----------+--------+------------------\n"); for(auto itr = tags.begin(); itr != tags.end(); ++itr) { - printf(" %3d%% | %13ds | %9s | %6s | %s\n", - (int)(itr->second.rate*100), + printf(" %12d | %13ds | %9s | %6s | %s\n", + (int)(itr->second.tpsRate), (int)(itr->second.expiration-now()), TagThrottleInfo::priorityToString(itr->second.priority, false), itr->second.autoThrottled ? "auto" : "manual", @@ -3837,25 +3837,25 @@ ACTOR Future cli(CLIOptions opt, LineNoise* plinenoise) { printf("Usage: throttle on tag [RATE] [DURATION]\n"); printf("\n"); printf("Enables throttling for transactions with the specified tag.\n"); - printf("An optional throttling rate (out of 1.0) can be specified (default 1.0).\n"); + printf("An optional transactions per second rate can be specified (default 0).\n"); printf("An optional duration can be specified, which must include a time suffix (s, m, h, d) (default 1h).\n"); is_error = true; continue; } - double rate = 1.0; + double tpsRate = 0.0; uint64_t duration = 3600; if(tokens.size() >= 5) { char *end; - rate = std::strtod((const char*)tokens[4].begin(), &end); + tpsRate = std::strtod((const char*)tokens[4].begin(), &end); if((tokens.size() > 5 && !std::isspace(*end)) || (tokens.size() == 5 && *end != '\0')) { printf("ERROR: failed to parse rate `%s'.\n", printable(tokens[4]).c_str()); is_error = true; continue; } - if(rate <= 0 || rate > 1) { - printf("ERROR: invalid rate `%f'; must satisfy 0 < rate <= 1\n", rate); + if(tpsRate < 0) { + printf("ERROR: rate cannot be negative `%f'\n", tpsRate); is_error = true; continue; } @@ -3870,7 +3870,7 @@ ACTOR Future cli(CLIOptions opt, LineNoise* plinenoise) { duration = parsedDuration.get(); } - wait(ThrottleApi::throttleTag(db, tokens[3], rate, now()+duration, true, false)); + wait(ThrottleApi::throttleTag(db, tokens[3], tpsRate, now()+duration, true, false)); printf("Tag `%s' has been throttled\n", tokens[3].toString().c_str()); } else if(tokencmp(tokens[1], "off")) { diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index 62413cea7a..02311b8384 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -30,6 +30,7 @@ #include "flow/TDMetric.actor.h" #include "fdbclient/EventTypes.actor.h" #include "fdbrpc/ContinuousSample.h" +#include "fdbrpc/Smoother.h" class StorageServerInfo : public ReferencedInterface { public: @@ -45,6 +46,51 @@ private: typedef MultiInterface> LocationInfo; typedef MultiInterface ProxyInfo; +class ClientTagThrottleData { +private: + double tpsRate; + Smoother smoothRate; + Smoother smoothReleased; + +public: + double expiration; + + ClientTagThrottleData(double tpsRate, double expiration) + : tpsRate(tpsRate), expiration(expiration), smoothRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW), + smoothReleased(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) + { + ASSERT(tpsRate >= 0); + smoothRate.reset(tpsRate); + } + + void updateRate(double tpsRate) { + ASSERT(tpsRate >= 0); + this->tpsRate = tpsRate; + smoothRate.setTotal(tpsRate); + } + + void addReleased(int released) { + smoothReleased.addDelta(released); + } + + double throttleDuration() { + if(expiration <= now()) { + return 0.0; + } + + double capacity = (smoothRate.smoothTotal() - smoothReleased.smoothRate()) * CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW; + if(capacity >= 1) { + return 0.0; + } + + if(tpsRate == 0) { + return std::max(0.0, expiration - now()); + } + + return std::min(expiration - now(), capacity / tpsRate); + } +}; + class DatabaseContext : public ReferenceCounted, public FastAllocated, NonCopyable { public: static DatabaseContext* allocateOnForeignThread() { @@ -167,7 +213,7 @@ public: UID dbId; bool internal; // Only contexts created through the C client and fdbcli are non-internal - PrioritizedTagThrottleMap throttledTags; + PrioritizedTagThrottleMap throttledTags; CounterCollection cc; diff --git a/fdbclient/Knobs.cpp b/fdbclient/Knobs.cpp index ddb33618bc..cfaae58fcf 100644 --- a/fdbclient/Knobs.cpp +++ b/fdbclient/Knobs.cpp @@ -220,6 +220,7 @@ ClientKnobs::ClientKnobs(bool randomize) { init( MAX_TAGS_PER_TRANSACTION, 100 ); init( MAX_TRANSACTION_TAG_LENGTH, 100 ); init( READ_TAG_SAMPLE_RATE, 1.0 ); // Communicated to clients from cluster + init( TAG_THROTTLE_SMOOTHING_WINDOW, 5.0 ); // clang-format on } diff --git a/fdbclient/Knobs.h b/fdbclient/Knobs.h index e40ac0ba08..298aa29bad 100644 --- a/fdbclient/Knobs.h +++ b/fdbclient/Knobs.h @@ -209,6 +209,7 @@ public: int MAX_TRANSACTION_TAG_LENGTH; int MAX_TAGS_PER_TRANSACTION; double READ_TAG_SAMPLE_RATE; // Communicated to clients from cluster + double TAG_THROTTLE_SMOOTHING_WINDOW; ClientKnobs(bool randomize = false); }; diff --git a/fdbclient/MasterProxyInterface.h b/fdbclient/MasterProxyInterface.h index c1e6f29686..b1b75f2825 100644 --- a/fdbclient/MasterProxyInterface.h +++ b/fdbclient/MasterProxyInterface.h @@ -165,7 +165,7 @@ struct GetReadVersionReply { bool locked; Optional metadataVersion; - TagThrottleMap tagThrottleInfo; + TagThrottleMap tagThrottleInfo; GetReadVersionReply() : version(invalidVersion), locked(false) {} @@ -191,12 +191,14 @@ struct GetReadVersionRequest : TimedRequest { uint32_t transactionCount; uint32_t flags; - TagSet tags; + + std::map tags; // TODO: compact + Optional debugID; ReplyPromise reply; GetReadVersionRequest() : transactionCount( 1 ), flags( PRIORITY_DEFAULT ) {} - GetReadVersionRequest( uint32_t transactionCount, uint32_t flags, TagSet tags = TagSet(), Optional debugID = Optional() ) + GetReadVersionRequest( uint32_t transactionCount, uint32_t flags, std::map tags = std::map(), Optional debugID = Optional() ) : transactionCount( transactionCount ), flags( flags ), tags(tags), debugID( debugID ) {} int priority() const { return flags & FLAG_PRIORITY_MASK; } diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 5d5cd5bb02..678a862d74 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -3169,7 +3169,7 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional getConsistentReadVersion( DatabaseContext *cx, uint32_t transactionCount, uint32_t flags, TagSet tags, Optional debugID ) { +ACTOR Future getConsistentReadVersion( DatabaseContext *cx, uint32_t transactionCount, uint32_t flags, std::map tags, Optional debugID ) { try { ++cx->transactionReadVersionBatches; if( debugID.present() ) @@ -3180,8 +3180,12 @@ ACTOR Future getConsistentReadVersion( DatabaseContext *cx, when ( wait( cx->onMasterProxiesChanged() ) ) {} when ( GetReadVersionReply v = wait( loadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) { auto &priorityThrottledTags = cx->throttledTags[TagThrottleInfo::priorityFromReadVersionFlags(flags)]; - for(auto tag : v.tagThrottleInfo) { - priorityThrottledTags[tag.first] = tag.second; + for(auto tag : v.tagThrottleInfo) { // TODO: remove any that aren't sent back? + auto result = priorityThrottledTags.try_emplace(tag.first, tag.second.tpsRate, tag.second.expiration); + if(!result.second) { + result.first->second.updateRate(tag.second.tpsRate); + result.first->second.expiration = tag.second.expiration; + } } if( debugID.present() ) @@ -3207,7 +3211,7 @@ ACTOR Future readVersionBatcher( DatabaseContext *cx, FutureStream debugID; state bool send_batch; - state TagSet tags; + state std::map tags; // dynamic batching state PromiseStream replyTimes; @@ -3226,7 +3230,7 @@ ACTOR Future readVersionBatcher( DatabaseContext *cx, FutureStreamMAX_BATCH_SIZE) @@ -3254,7 +3258,7 @@ ACTOR Future readVersionBatcher( DatabaseContext *cx, FutureStream>(std::move(requests)), CLIENT_KNOBS->BROADCAST_BATCH_SIZE); - tags = TagSet(); + tags.clear(); debugID = Optional(); requests = std::vector< Promise >(); addActor.send(batch); @@ -3263,7 +3267,7 @@ ACTOR Future readVersionBatcher( DatabaseContext *cx, FutureStream extractReadVersion(DatabaseContext* cx, uint32_t flags, Reference trLogInfo, Future f, bool lockAware, double startTime, Promise> metadataVersion, TagSet tags, double throttledRate) { +ACTOR Future extractReadVersion(DatabaseContext* cx, uint32_t flags, Reference trLogInfo, Future f, bool lockAware, double startTime, Promise> metadataVersion, TagSet tags) { GetReadVersionReply rep = wait(f); double latency = now() - startTime; cx->GRVLatencies.addSample(latency); @@ -3278,23 +3282,24 @@ ACTOR Future extractReadVersion(DatabaseContext* cx, uint32_t flags, Re ++cx->transactionReadVersionsCompleted; auto &priorityThrottledTags = cx->throttledTags[TagThrottleInfo::priorityFromReadVersionFlags(flags)]; - double newMaxThrottle = throttledRate; for(auto tag : tags) { auto itr = priorityThrottledTags.find(tag); if(itr != priorityThrottledTags.end()) { - if(itr->second.expiration > now()) { - newMaxThrottle = std::max(newMaxThrottle, itr->second.rate); - } - else { + if(itr->second.expiration <= now()) { priorityThrottledTags.erase(itr); } + else if(itr->second.throttleDuration() > 0) { + ++cx->transactionReadVersionsThrottled; + throw tag_throttled(); + } } } - // If the throttle rate is higher now than when we made the request, we may still need to throttle this response - if(newMaxThrottle > throttledRate && deterministicRandom()->random01() < 1.0 - (1.0-newMaxThrottle)/(1.0-throttledRate)) { - ++cx->transactionReadVersionsThrottled; - throw tag_throttled(); + for(auto tag : tags) { + auto itr = priorityThrottledTags.find(tag); + if(itr != priorityThrottledTags.end()) { + itr->second.addReleased(1); + } } if(rep.version > cx->metadataVersionCache[cx->mvCacheInsertLocation].first) { @@ -3309,32 +3314,28 @@ ACTOR Future extractReadVersion(DatabaseContext* cx, uint32_t flags, Re Future Transaction::getReadVersion(uint32_t flags) { if (!readVersion.isValid()) { ++cx->transactionReadVersions; - TagThrottleInfo::Priority priority; flags |= options.getReadVersionFlags; if((flags & GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE) == GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE) { - priority = TagThrottleInfo::Priority::IMMEDIATE; ++cx->transactionImmediateReadVersions; } else if((flags & GetReadVersionRequest::PRIORITY_DEFAULT) == GetReadVersionRequest::PRIORITY_DEFAULT) { - priority = TagThrottleInfo::Priority::DEFAULT; ++cx->transactionDefaultReadVersions; } else if((flags & GetReadVersionRequest::PRIORITY_BATCH) == GetReadVersionRequest::PRIORITY_BATCH) { - priority = TagThrottleInfo::Priority::BATCH; ++cx->transactionBatchReadVersions; } else { ASSERT(false); } - double maxThrottle = 0; + double maxThrottleDelay = 0.0; if(options.tags.size() != 0) { - auto priorityThrottledTags = cx->throttledTags[priority]; + auto priorityThrottledTags = cx->throttledTags[TagThrottleInfo::priorityFromReadVersionFlags(flags)]; for(auto tag : options.tags) { auto itr = priorityThrottledTags.find(tag); if(itr != priorityThrottledTags.end()) { if(itr->second.expiration > now()) { - maxThrottle = std::max(maxThrottle, itr->second.rate); + maxThrottleDelay = std::max(maxThrottleDelay, itr->second.throttleDuration()); } else { priorityThrottledTags.erase(itr); @@ -3343,8 +3344,7 @@ Future Transaction::getReadVersion(uint32_t flags) { } } - // TODO: can we protect against client spamming this request (e.g. set absolute limits or delay for all transactions starting with throttle tag) - if(maxThrottle > 0 && deterministicRandom()->random01() < maxThrottle) { + if(maxThrottleDelay > 0.0) { // TODO: allow delaying? ++cx->transactionReadVersionsThrottled; return Future(tag_throttled()); } @@ -3357,7 +3357,7 @@ Future Transaction::getReadVersion(uint32_t flags) { auto const req = DatabaseContext::VersionRequest(options.tags, info.debugID); batcher.stream.send(req); startTime = now(); - readVersion = extractReadVersion( cx.getPtr(), flags, trLogInfo, req.reply.getFuture(), options.lockAware, startTime, metadataVersion, options.tags, maxThrottle); + readVersion = extractReadVersion( cx.getPtr(), flags, trLogInfo, req.reply.getFuture(), options.lockAware, startTime, metadataVersion, options.tags); } return readVersion; } diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index 8ac8871027..3e86577a55 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -429,7 +429,7 @@ struct StorageQueuingMetricsReply { double cpuUsage; double diskUsage; double localRateLimit; - Optional> busiestTag; + Optional busiestTag; double busiestTagFractionalBusyness; double busiestTagRate; diff --git a/fdbclient/TagThrottle.actor.cpp b/fdbclient/TagThrottle.actor.cpp index b024f27a85..f4ad943007 100644 --- a/fdbclient/TagThrottle.actor.cpp +++ b/fdbclient/TagThrottle.actor.cpp @@ -24,7 +24,7 @@ #include "flow/actorcompiler.h" // has to be last include -void TagSet::addTag(StringRef tag) { +void TagSet::addTag(TransactionTagRef tag) { ASSERT(CLIENT_KNOBS->MAX_TRANSACTION_TAG_LENGTH < 256); // Tag length is encoded with a single byte if(tag.size() > CLIENT_KNOBS->MAX_TRANSACTION_TAG_LENGTH) { @@ -34,7 +34,7 @@ void TagSet::addTag(StringRef tag) { throw too_many_tags(); } - tags.insert(StringRef(arena, tag)); + tags.insert(TransactionTagRef(arena, tag)); bytes += tag.size(); } @@ -70,7 +70,7 @@ namespace ThrottleApi { // The tags are listed in sorted order // // Currently, the throttle API supports only 1 tag per throttle - Standalone throttleKeyForTags(std::set const& tags) { + Key throttleKeyForTags(std::set const& tags) { ASSERT(CLIENT_KNOBS->MAX_TRANSACTION_TAG_LENGTH < 256); ASSERT(tags.size() > 0); @@ -101,19 +101,19 @@ namespace ThrottleApi { return result; } - StringRef tagFromThrottleKey(KeyRef key) { - StringRef tag = key.substr(tagThrottleKeysPrefix.size()+1); + TransactionTagRef tagFromThrottleKey(KeyRef key) { + TransactionTagRef tag = key.substr(tagThrottleKeysPrefix.size()+1); ASSERT(tag.size() == key.begin()[tagThrottleKeysPrefix.size()]); // TODO: support multiple tags per throttle return tag; } - ACTOR Future, TagThrottleInfo>> getTags(Database db, int limit) { + ACTOR Future> getTags(Database db, int limit) { state Transaction tr(db); loop { try { Standalone tags = wait(tr.getRange(tagThrottleKeys, limit)); - std::map, TagThrottleInfo> results; + std::map results; for(auto tag : tags) { results[tagFromThrottleKey(tag.key)] = decodeTagThrottleValue(tag.value); } @@ -125,11 +125,11 @@ namespace ThrottleApi { } } - ACTOR Future throttleTag(Database db, Standalone tag, double rate, double expiration, bool serializeExpirationAsDuration, bool autoThrottled) { + ACTOR Future throttleTag(Database db, TransactionTagRef tag, double tpsRate, double expiration, bool serializeExpirationAsDuration, bool autoThrottled) { state Transaction tr(db); - state Key key = throttleKeyForTags(std::set{ tag }); + state Key key = throttleKeyForTags(std::set{ tag }); - TagThrottleInfo throttle(rate, expiration, autoThrottled, TagThrottleInfo::Priority::DEFAULT, serializeExpirationAsDuration); + TagThrottleInfo throttle(tpsRate, expiration, autoThrottled, TagThrottleInfo::Priority::DEFAULT, serializeExpirationAsDuration); BinaryWriter wr(IncludeVersion()); wr << throttle; state Value value = wr.toValue(); @@ -151,9 +151,9 @@ namespace ThrottleApi { } } - ACTOR Future unthrottleTag(Database db, Standalone tag) { + ACTOR Future unthrottleTag(Database db, TransactionTagRef tag) { state Transaction tr(db); - state Key key = throttleKeyForTags(std::set{ tag }); + state Key key = throttleKeyForTags(std::set{ tag }); loop { try { diff --git a/fdbclient/TagThrottle.h b/fdbclient/TagThrottle.h index 38905623e5..3bd0fc972d 100644 --- a/fdbclient/TagThrottle.h +++ b/fdbclient/TagThrottle.h @@ -32,6 +32,9 @@ class Database; +typedef StringRef TransactionTagRef; +typedef Standalone TransactionTag; + struct TagThrottleInfo { enum class Priority { BATCH, @@ -55,22 +58,22 @@ struct TagThrottleInfo { static Priority priorityFromReadVersionFlags(int flags); - double rate; + double tpsRate; double expiration; bool autoThrottled; Priority priority; bool serializeExpirationAsDuration; - TagThrottleInfo() : rate(0), expiration(0), autoThrottled(false), priority(Priority::DEFAULT), serializeExpirationAsDuration(true) {} - TagThrottleInfo(double rate, double expiration, bool autoThrottled, Priority priority, bool serializeExpirationAsDuration) - : rate(rate), expiration(expiration), autoThrottled(autoThrottled), priority(priority), + TagThrottleInfo() : tpsRate(0), expiration(0), autoThrottled(false), priority(Priority::DEFAULT), serializeExpirationAsDuration(true) {} + TagThrottleInfo(double tpsRate, double expiration, bool autoThrottled, Priority priority, bool serializeExpirationAsDuration) + : tpsRate(tpsRate), expiration(expiration), autoThrottled(autoThrottled), priority(priority), serializeExpirationAsDuration(serializeExpirationAsDuration) {} template void serialize(Ar& ar) { if(ar.isDeserializing) { - serializer(ar, rate, expiration, autoThrottled, priority, serializeExpirationAsDuration); + serializer(ar, tpsRate, expiration, autoThrottled, priority, serializeExpirationAsDuration); if(serializeExpirationAsDuration) { expiration += now(); } @@ -81,7 +84,7 @@ struct TagThrottleInfo { serializedExpiration = std::max(expiration - now(), 0.0); } - serializer(ar, rate, serializedExpiration, autoThrottled, priority, serializeExpirationAsDuration); + serializer(ar, tpsRate, serializedExpiration, autoThrottled, priority, serializeExpirationAsDuration); } } }; @@ -90,11 +93,11 @@ BINARY_SERIALIZABLE(TagThrottleInfo::Priority); class TagSet { public: - typedef std::set::const_iterator const_iterator; + typedef std::set::const_iterator const_iterator; TagSet() : bytes(0) {} - void addTag(StringRef tag); + void addTag(TransactionTagRef tag); size_t size(); const_iterator begin() const { @@ -107,7 +110,7 @@ public: //private: Arena arena; // TODO: where to hold this memory? - std::set tags; + std::set tags; size_t bytes; }; @@ -139,7 +142,7 @@ struct dynamic_size_traits : std::true_type { while(data < end) { uint8_t len = *data; ++data; - StringRef tag(context.tryReadZeroCopy(data, len), len); + TransactionTagRef tag(context.tryReadZeroCopy(data, len), len); data += len; t.tags.insert(tag); @@ -150,20 +153,23 @@ struct dynamic_size_traits : std::true_type { } }; -typedef std::unordered_map, TagThrottleInfo, std::hash> TagThrottleMap; -typedef std::map PrioritizedTagThrottleMap; +template +using TagThrottleMap = std::unordered_map>; + +template +using PrioritizedTagThrottleMap = std::map>; namespace ThrottleApi { // Currently, only 1 tag in a key is supported - Standalone throttleKeyForTags(std::set const& tags); - StringRef tagFromThrottleKey(KeyRef key); + Key throttleKeyForTags(std::set const& tags); + TransactionTagRef tagFromThrottleKey(KeyRef key); - Future, TagThrottleInfo>> getTags(Database const& db, int const& limit); + Future> getTags(Database const& db, int const& limit); - Future throttleTag(Database const& db, Standalone const& tag, double const& rate, double const& expiration, + Future throttleTag(Database const& db, TransactionTagRef const& tag, double const& tpsRate, double const& expiration, bool const& serializeExpirationAsDuration, bool const& autoThrottled); // TODO: priorities - Future unthrottleTag(Database const& db, Standalone const& tag); + Future unthrottleTag(Database const& db, TransactionTagRef const& tag); Future unthrottleManual(Database db); Future unthrottleAuto(Database db); diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index 4760fae6db..c8433b9cc7 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -516,7 +516,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula init( BEHIND_CHECK_VERSIONS, 5 * VERSIONS_PER_SECOND ); init( WAIT_METRICS_WRONG_SHARD_CHANCE, isSimulated ? 1.0 : 0.1 ); init( MIN_TAG_PAGES_READ_RATE, 1.0e4 ); if( randomize && BUGGIFY ) MIN_TAG_PAGES_READ_RATE = 0; - init( TAG_MEASUREMENT_INTERVAL, 30.0 ); if( randomize && BUGGIFY ) TAG_MEASUREMENT_INTERVAL = 1.0; + init( READ_TAG_MEASUREMENT_INTERVAL, 30.0 ); if( randomize && BUGGIFY ) READ_TAG_MEASUREMENT_INTERVAL = 1.0; //Wait Failure init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2; diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index 9c2950e28b..1f39e5cc82 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -384,7 +384,7 @@ public: double TLOG_IGNORE_POP_AUTO_ENABLE_DELAY; int64_t MAX_THROTTLED_TAGS; - int64_t MIN_TAG_BUSYNESS; + double MIN_TAG_BUSYNESS; double TAG_THROTTLE_DURATION; bool AUTO_TAG_THROTTLING_ENABLED; @@ -455,7 +455,7 @@ public: int64_t BEHIND_CHECK_VERSIONS; double WAIT_METRICS_WRONG_SHARD_CHANCE; int64_t MIN_TAG_PAGES_READ_RATE; - double TAG_MEASUREMENT_INTERVAL; + double READ_TAG_MEASUREMENT_INTERVAL; //Wait Failure int MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS; diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index f8e9ee0eed..6ef256bc91 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -99,7 +99,7 @@ struct ProxyStats { ACTOR Future getRate(UID myID, Reference> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, double* outTransactionRate, double* outBatchTransactionRate, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply, - PrioritizedTagThrottleMap *throttledTags) { + TagThrottleMap* transactionTagCounter, PrioritizedTagThrottleMap* throttledTags) { state Future nextRequestTimer = Never(); state Future leaseTimeout = Never(); state Future reply = Never(); @@ -123,7 +123,8 @@ ACTOR Future getRate(UID myID, Reference> db, int64 when ( wait( nextRequestTimer ) ) { nextRequestTimer = Never(); bool detailed = now() - lastDetailedReply > SERVER_KNOBS->DETAILED_METRIC_UPDATE_RATE; - reply = brokenPromiseToNever(db->get().ratekeeper.get().getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, detailed))); + reply = brokenPromiseToNever(db->get().ratekeeper.get().getRateInfo.getReply(GetRateInfoRequest(myID, *inTransactionCount, *inBatchTransactionCount, *transactionTagCounter, detailed))); + transactionTagCounter->clear(); expectingDetailedReply = detailed; } when ( GetRateInfoReply rep = wait(reply) ) { @@ -140,7 +141,12 @@ ACTOR Future getRate(UID myID, Reference> db, int64 lastDetailedReply = now(); } - *throttledTags = std::move(rep.throttledTags); + for(auto priorityTags : rep.throttledTags) { + auto &localPriorityTags = (*throttledTags)[priorityTags.first]; + for(auto tag : priorityTags.second) { // TODO: remove missing tags + localPriorityTags[tag.first] = tag.second; + } + } } when ( wait( leaseTimeout ) ) { *outTransactionRate = 0; @@ -180,7 +186,9 @@ ACTOR Future queueTransactionStartRequests( FutureStream readVersionRequests, PromiseStream GRVTimer, double *lastGRVTime, double *GRVBatchTime, FutureStream replyTimes, - ProxyStats* stats, TransactionRateInfo* batchRateInfo) + ProxyStats* stats, TransactionRateInfo* batchRateInfo, + PrioritizedTagThrottleMap* throttledTags, + TagThrottleMap* transactionTagCounter) { loop choose{ when(GetReadVersionRequest req = waitNext(readVersionRequests)) { @@ -194,6 +202,11 @@ ACTOR Future queueTransactionStartRequests( req.reply.send(rep); TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceeded").suppressFor(60); } else { + // TODO: this probably needs to happen outside the high priority path + for(auto tag : req.tags) { + transactionTagCounter[tag] += req.transactionCount; + } + if (req.debugID.present()) g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.queueTransactionStartRequests.Before"); @@ -1292,7 +1305,7 @@ ACTOR Future getLiveCommittedVersion(ProxyCommitData* commi } ACTOR Future sendGrvReplies(Future replyFuture, std::vector requests, - ProxyStats* stats, Version minKnownCommittedVersion, PrioritizedTagThrottleMap throttledTags) { + ProxyStats* stats, Version minKnownCommittedVersion, PrioritizedTagThrottleMap throttledTags) { GetReadVersionReply _baseReply = wait(replyFuture); GetReadVersionReply baseReply = _baseReply; double end = g_network->timer(); @@ -1315,13 +1328,13 @@ ACTOR Future sendGrvReplies(Future replyFuture, std:: auto itr = throttledTags.find(TagThrottleInfo::priorityFromReadVersionFlags(request.flags)); ASSERT(itr != throttledTags.end()); - auto tagItr = itr->second.find(tag); + auto tagItr = itr->second.find(tag.first); while(tagItr == itr->second.end() && itr != throttledTags.begin()) { --itr; } if(tagItr != itr->second.end()) { - reply.tagThrottleInfo[tag] = tagItr->second; + reply.tagThrottleInfo[tag.first] = tagItr->second; } } @@ -1353,12 +1366,13 @@ ACTOR static Future transactionStarter( state Deque batchQueue; state vector otherProxies; - state PrioritizedTagThrottleMap throttledTags; + state TagThrottleMap transactionTagCounter; + state PrioritizedTagThrottleMap throttledTags; state PromiseStream replyTimes; - addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo.rate, &batchRateInfo.rate, healthMetricsReply, detailedHealthMetricsReply, &throttledTags)); + addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo.rate, &batchRateInfo.rate, healthMetricsReply, detailedHealthMetricsReply, &transactionTagCounter, &throttledTags)); addActor.send(queueTransactionStartRequests(db, &systemQueue, &defaultQueue, &batchQueue, proxy.getConsistentReadVersion.getFuture(), - GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats, &batchRateInfo)); + GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats, &batchRateInfo, &throttledTags, &transactionTagCounter)); // Get a list of the other proxies that go together with us while (std::find(db->get().client.proxies.begin(), db->get().client.proxies.end(), proxy) == db->get().client.proxies.end()) diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index b6f700043c..a08f9b3c73 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -96,7 +96,7 @@ struct StorageQueueInfo { Smoother smoothTotalSpace; limitReason_t limitReason; - Optional> busiestTag; + Optional busiestTag; double busiestTagFractionalBusyness; double busiestTagRate; @@ -128,6 +128,58 @@ struct TLogQueueInfo { } }; +class RkTagThrottleData : NonCopyable { +private: + Smoother clientRate; + Smoother smoothRate; + Smoother smoothRequests; + +public: + double tpsRate; + double expiration; + int64_t count; + + RkTagThrottleData(double tpsRate, double expiration) + : tpsRate(tpsRate), expiration(expiration), smoothRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW), + smoothRequests(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW), clientRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW), count(0) + { + ASSERT(tpsRate >= 0); + smoothRate.reset(tpsRate); + clientRate.reset(tpsRate); + } + + void updateClientRate() { + double startClientRate = clientRate.smoothTotal(); + if(tpsRate == 0.0) { + clientRate.reset(0.0); + } + else if(smoothRequests.smoothRate() == 0) { + clientRate.setTotal(smoothRate.smoothTotal()); + } + else { + double targetRate = smoothRate.smoothTotal(); + clientRate.setTotal(std::min(targetRate, (targetRate / smoothRequests.smoothRate()) * clientRate.smoothTotal())); + } + } + + void updateRate(double tpsRate) { + ASSERT(tpsRate >= 0); + this->tpsRate = tpsRate; + smoothRate.setTotal(tpsRate); + + updateClientRate(); + } + + void addRequests(int requests) { + smoothRequests.addDelta(requests); + updateClientRate(); + } + + double getClientRate() { + return clientRate.smoothTotal(); + } +}; + struct RatekeeperLimits { double tpsLimit; Int64MetricHandle tpsLimitMetric; @@ -187,7 +239,7 @@ struct RatekeeperData { double lastWarning; double lastSSListFetchedTimestamp; - PrioritizedTagThrottleMap tagThrottles; + PrioritizedTagThrottleMap throttledTags; RatekeeperLimits normalLimits; RatekeeperLimits batchLimits; @@ -204,9 +256,9 @@ struct RatekeeperData { batchLimits("Batch", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH, SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH), autoThrottlingEnabled(false) { - tagThrottles.try_emplace(TagThrottleInfo::Priority::IMMEDIATE); - tagThrottles.try_emplace(TagThrottleInfo::Priority::DEFAULT); - tagThrottles.try_emplace(TagThrottleInfo::Priority::BATCH); + throttledTags.try_emplace(TagThrottleInfo::Priority::IMMEDIATE); + throttledTags.try_emplace(TagThrottleInfo::Priority::DEFAULT); + throttledTags.try_emplace(TagThrottleInfo::Priority::BATCH); } }; @@ -407,15 +459,11 @@ ACTOR Future monitorThrottlingChanges(RatekeeperData *self) { self->autoThrottlingEnabled = SERVER_KNOBS->AUTO_TAG_THROTTLING_ENABLED; } - TraceEvent("RatekeeperReadThrottles").detail("NumThrottledTags", throttledTags.get().size()); - PrioritizedTagThrottleMap newThrottles; - std::map> oldThrottleIterators; - for(auto t : self->tagThrottles) { - oldThrottleIterators[t.first] = std::make_pair(t.second.begin(), t.second.end()); - } + PrioritizedTagThrottleMap updatedTagThrottles; + TraceEvent("RatekeeperReadThrottles").detail("NumThrottledTags", throttledTags.get().size()); for(auto entry : throttledTags.get()) { - StringRef tag = ThrottleApi::tagFromThrottleKey(entry.key); + TransactionTagRef tag = ThrottleApi::tagFromThrottleKey(entry.key); TagThrottleInfo throttleInfo = ThrottleApi::decodeTagThrottleValue(entry.value); TraceEvent("RatekeeperReadThrottleRead").detail("Tag", tag).detail("Expiration", throttleInfo.expiration); if((!self->autoThrottlingEnabled && throttleInfo.autoThrottled) || throttleInfo.expiration <= now()) { // TODO: keep or delete auto throttles when disabling auto-throttling @@ -432,25 +480,38 @@ ACTOR Future monitorThrottlingChanges(RatekeeperData *self) { tr.set(entry.key, value); } - auto oldItr = oldThrottleIterators[throttleInfo.priority]; - while(oldItr.first != oldItr.second && oldItr.first->first < tag) { - ++oldItr.first; - } - - if(oldItr.first == oldItr.second || oldItr.first->first != tag || oldItr.first->second.rate < throttleInfo.rate * 0.95) { - TraceEvent("RatekeeperDetectedThrottle") + auto &priorityThrottledTags = self->throttledTags[throttleInfo.priority]; + auto itr = priorityThrottledTags.find(tag); + if(itr == priorityThrottledTags.end()) { + TraceEvent("RatekeeperAddingThrottle") .detail("Tag", tag) - .detail("Rate", throttleInfo.rate) + .detail("Rate", throttleInfo.tpsRate) .detail("Priority", TagThrottleInfo::priorityToString(throttleInfo.priority)) .detail("SecondsToExpiration", throttleInfo.expiration - now()) .detail("AutoThrottled", throttleInfo.autoThrottled); - } - newThrottles[throttleInfo.priority][tag] = throttleInfo; + updatedTagThrottles[throttleInfo.priority].try_emplace(tag, throttleInfo.tpsRate, throttleInfo.expiration); + } + else { + auto node = priorityThrottledTags.extract(itr); + if(node.mapped().tpsRate != throttleInfo.tpsRate || node.mapped().expiration != throttleInfo.expiration) { + TraceEvent("RatekeeperUpdatingThrottle") + .detail("Tag", tag) + .detail("Rate", throttleInfo.tpsRate) + .detail("Priority", TagThrottleInfo::priorityToString(throttleInfo.priority)) + .detail("SecondsToExpiration", throttleInfo.expiration - now()) + .detail("AutoThrottled", throttleInfo.autoThrottled); + + node.mapped().updateRate(throttleInfo.tpsRate); + node.mapped().expiration = throttleInfo.expiration; + } + + updatedTagThrottles[throttleInfo.priority].insert(std::move(node)); + } } } - self->tagThrottles = newThrottles; + self->throttledTags = std::move(updatedTagThrottles); state Future watchFuture = tr.watch(tagThrottleSignalKey); wait(tr.commit()); @@ -464,7 +525,7 @@ ACTOR Future monitorThrottlingChanges(RatekeeperData *self) { } } -void updateRate(RatekeeperData* self, RatekeeperLimits* limits, TagThrottleMap& throttledTags) { +void updateRate(RatekeeperData* self, RatekeeperLimits* limits, TagThrottleMap& throttledTags) { //double controlFactor = ; // dt / eFoldingTime double actualTps = self->smoothReleasedTransactions.smoothRate(); @@ -532,27 +593,27 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits, TagThrottleMap& double targetRateRatio = std::min(( storageQueue - targetBytes + springBytes ) / (double)springBytes, 2.0); // TODO: limit the number of throttles active for a storage server - if(storageQueue > targetBytes / 2.0 && ss.busiestTag.present() && ss.busiestTagFractionalBusyness > 0.2 + /*if(storageQueue > targetBytes / 2.0 && ss.busiestTag.present() && ss.busiestTagFractionalBusyness > 0.2 && ss.busiestTagRate > 1000 && throttledTags.size() < 10) { auto &throttle = throttledTags[ss.busiestTag.get()]; double throttleRate = (storageQueue - targetBytes / 2.0) / targetBytes / 2.0; - if(throttle.expiration <= now() || throttle.rate < throttleRate * 0.95) { + if(throttle.expiration <= now() || throttle.tpsRate > throttleRate * 0.95) { TraceEvent(format("RatekeeperThrottlingTag%s", limits->context).c_str()) .detail("Tag", ss.busiestTag.get()) .detail("ThrottleRate", throttleRate); } if(throttle.expiration <= now()) { - throttle.rate = throttleRate; + throttle.tpsRate = throttleRate; } else { - throttle.expiration = std::max(throttle.rate, throttleRate); + throttle.expiration = std::max(throttle.tpsRate, throttleRate); } throttle.expiration = now() + 120.0; - } + }*/ double inputRate = ss.smoothInputBytes.smoothRate(); //inputRate = std::max( inputRate, actualTps / SERVER_KNOBS->MAX_TRANSACTIONS_PER_BYTE ); @@ -884,8 +945,8 @@ ACTOR Future ratekeeper(RatekeeperInterface rkInterf, Reference SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit; double tooOld = now() - 1.0; @@ -917,16 +978,25 @@ ACTOR Future ratekeeper(RatekeeperInterface rkInterf, ReferenceMETRIC_UPDATE_RATE; + for(auto tag : req.throttledTagCounts) { // TODO: add only if this data is recent? + for(auto &priorityItr : self.throttledTags) { // TODO: we don't need this loop + auto itr = priorityItr.second.find(tag.first); + if(itr != priorityItr.second.end()) { + itr->second.addRequests(tag.second); + } + } + } + // TODO: avoid iteration every time - for(auto priorityItr = self.tagThrottles.begin(); priorityItr != self.tagThrottles.end(); ++priorityItr) { - auto &priorityTags = reply.throttledTags[priorityItr->first]; - for(auto tagItr = priorityItr->second.begin(); tagItr != priorityItr->second.end();) { + for(auto &priorityItr : self.throttledTags) { + auto &priorityTags = reply.throttledTags[priorityItr.first]; + for(auto tagItr = priorityItr.second.begin(); tagItr != priorityItr.second.end();) { if(tagItr->second.expiration > now()) { - priorityTags[tagItr->first] = tagItr->second; + auto result = priorityTags.insert_or_assign(tagItr->first, TagThrottleInfo(tagItr->second.getClientRate(), tagItr->second.expiration, false, priorityItr.first, true)); // TODO: Different structure? ++tagItr; } else { - tagItr = priorityItr->second.erase(tagItr); + tagItr = priorityItr.second.erase(tagItr); } } } diff --git a/fdbserver/RatekeeperInterface.h b/fdbserver/RatekeeperInterface.h index 80eeeb7b9b..64dc4055c9 100644 --- a/fdbserver/RatekeeperInterface.h +++ b/fdbserver/RatekeeperInterface.h @@ -59,7 +59,7 @@ struct GetRateInfoReply { double leaseDuration; HealthMetrics healthMetrics; - PrioritizedTagThrottleMap throttledTags; + PrioritizedTagThrottleMap throttledTags; template void serialize(Ar& ar) { @@ -72,16 +72,18 @@ struct GetRateInfoRequest { UID requesterID; int64_t totalReleasedTransactions; int64_t batchReleasedTransactions; + + TagThrottleMap throttledTagCounts; bool detailed; ReplyPromise reply; GetRateInfoRequest() {} - GetRateInfoRequest(UID const& requesterID, int64_t totalReleasedTransactions, int64_t batchReleasedTransactions, bool detailed) - : requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions), batchReleasedTransactions(batchReleasedTransactions), detailed(detailed) {} + GetRateInfoRequest(UID const& requesterID, int64_t totalReleasedTransactions, int64_t batchReleasedTransactions, TagThrottleMap throttledTagCounts, bool detailed) + : requesterID(requesterID), totalReleasedTransactions(totalReleasedTransactions), batchReleasedTransactions(batchReleasedTransactions), throttledTagCounts(throttledTagCounts), detailed(detailed) {} template void serialize(Ar& ar) { - serializer(ar, requesterID, totalReleasedTransactions, batchReleasedTransactions, detailed, reply); + serializer(ar, requesterID, totalReleasedTransactions, batchReleasedTransactions, throttledTagCounts, detailed, reply); } }; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 11f9932fd8..656b503e9c 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -449,20 +449,20 @@ public: return val; } - struct TagCounters { + struct TransactionTagCounter { struct TagInfo { - Standalone tag; + TransactionTag tag; int64_t count; double fractionalBusyness; double elapsed; - TagInfo(Standalone const& tag, int64_t count, int64_t totalCount, double elapsed) + TagInfo(TransactionTag const& tag, int64_t count, int64_t totalCount, double elapsed) : tag(tag), count(count), fractionalBusyness((double)count/totalCount), elapsed(elapsed) {} }; - std::unordered_map, int64_t, std::hash> intervalCounts; + TagThrottleMap intervalCounts; int64_t intervalTotalSampledCount = 0; - Standalone busiestTag; + TransactionTag busiestTag; int64_t busiestTagCount = 0; double intervalStart = 0; @@ -471,7 +471,7 @@ public: void increment(Optional const& tags, int64_t delta) { if(tags.present()) { for(auto& tag : tags.get()) { - int64_t &count = intervalCounts[Standalone(tag, tags.get().arena)]; + int64_t &count = intervalCounts[TransactionTag(tag, tags.get().arena)]; count += delta; if(count > busiestTagCount) { busiestTagCount = count; @@ -511,7 +511,7 @@ public: } }; - TagCounters tagCounters; + TransactionTagCounter transactionTagCounter; Optional latencyBandConfig; @@ -973,7 +973,7 @@ ACTOR Future getValueQ( StorageServer* data, GetValueRequest req ) { data->sendErrorWithPenalty(req.reply, e, data->getPenalty()); } - data->tagCounters.increment(req.tags, (resultSize / 4096) + 1); + data->transactionTagCounter.increment(req.tags, (resultSize / 4096) + 1); ++data->counters.finishedQueries; --data->readQueueSizeMetric; @@ -1519,7 +1519,7 @@ ACTOR Future getKeyValues( StorageServer* data, GetKeyValuesRequest req ) data->sendErrorWithPenalty(req.reply, e, data->getPenalty()); } - data->tagCounters.increment(req.tags, (resultSize / 4096) + 1); + data->transactionTagCounter.increment(req.tags, (resultSize / 4096) + 1); ++data->counters.finishedQueries; --data->readQueueSizeMetric; @@ -1579,7 +1579,7 @@ ACTOR Future getKey( StorageServer* data, GetKeyRequest req ) { data->sendErrorWithPenalty(req.reply, e, data->getPenalty()); } - data->tagCounters.increment(req.tags, (resultSize/4096) + 1); + data->transactionTagCounter.increment(req.tags, (resultSize/4096) + 1); ++data->counters.finishedQueries; --data->readQueueSizeMetric; if(data->latencyBandConfig.present()) { @@ -1607,8 +1607,8 @@ void getQueuingMetrics( StorageServer* self, StorageQueuingMetricsRequest const& reply.diskUsage = self->diskUsage; reply.durableVersion = self->durableVersion.get(); - Optional busiestTag = self->tagCounters.getBusiestTag(); - reply.busiestTag = busiestTag.map>([](StorageServer::TagCounters::TagInfo tagInfo) { return tagInfo.tag; }); + Optional busiestTag = self->transactionTagCounter.getBusiestTag(); + reply.busiestTag = busiestTag.map([](StorageServer::TransactionTagCounter::TagInfo tagInfo) { return tagInfo.tag; }); reply.busiestTagFractionalBusyness = busiestTag.present() ? busiestTag.get().fractionalBusyness : 0.0; reply.busiestTagRate = busiestTag.present() ? busiestTag.get().count / busiestTag.get().elapsed : 0.0; @@ -3619,8 +3619,8 @@ ACTOR Future storageServerCore( StorageServer* self, StorageServerInterfac actors.add(logLongByteSampleRecovery(self->byteSampleRecovery)); actors.add(checkBehind(self)); - self->tagCounters.startNewInterval(self->thisServerID); - actors.add(recurring([this](){ self->tagCounters.startNewInterval(self->thisServerID); }, SERVER_KNOBS->TAG_MEASUREMENT_INTERVAL)); + self->transactionTagCounter.startNewInterval(self->thisServerID); + actors.add(recurring([this](){ self->transactionTagCounter.startNewInterval(self->thisServerID); }, SERVER_KNOBS->READ_TAG_MEASUREMENT_INTERVAL)); self->coreStarted.send( Void() );