Initial support for ramping load back up. Fix some logging. Update auto-throttles less frequently.
This commit is contained in:
parent
0ed70accfa
commit
b80225dde0
|
@ -487,12 +487,13 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
|
||||
init( MAX_AUTO_THROTTLED_TRANSACTION_TAGS, 5 ); if(randomize && BUGGIFY) MAX_AUTO_THROTTLED_TRANSACTION_TAGS = 1;
|
||||
init( MAX_MANUAL_THROTTLED_TRANSACTION_TAGS, 100 ); if(randomize && BUGGIFY) MAX_MANUAL_THROTTLED_TRANSACTION_TAGS = 1;
|
||||
init( MIN_TAG_BUSYNESS, 0.2 ); if(randomize && BUGGIFY) MIN_TAG_BUSYNESS = 0.0;
|
||||
init( MIN_TAG_COST, 1000 ); if(randomize && BUGGIFY) MIN_TAG_COST = 0.0;
|
||||
init( AUTO_THROTTLE_TARGET_TAG_BUSYNESS, 0.1 ); if(randomize && BUGGIFY) AUTO_THROTTLE_TARGET_TAG_BUSYNESS = 0.0;
|
||||
init( AUTO_TAG_THROTTLE_DURATION, 120.0 ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLE_DURATION = 5.0;
|
||||
init( AUTO_TAG_THROTTLE_RAMP_UP_TIME, 120.0 ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLE_RAMP_UP_TIME = 5.0;
|
||||
init( AUTO_TAG_THROTTLE_DURATION, 240.0 ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLE_DURATION = 20.0;
|
||||
init( TAG_THROTTLE_PUSH_INTERVAL, 1.0 ); if(randomize && BUGGIFY) TAG_THROTTLE_PUSH_INTERVAL = 0.0;
|
||||
init( AUTO_TAG_THROTTLE_START_AGGREGATION_TIME, 5.0 ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLE_START_AGGREGATION_TIME = 0.5;
|
||||
init( AUTO_TAG_THROTTLE_UPDATE_FREQUENCY, 10.0 ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLE_UPDATE_FREQUENCY = 0.5;
|
||||
init( AUTO_TAG_THROTTLING_ENABLED, true ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLING_ENABLED = false;
|
||||
|
||||
//Storage Metrics
|
||||
|
|
|
@ -393,12 +393,14 @@ public:
|
|||
|
||||
int64_t MAX_MANUAL_THROTTLED_TRANSACTION_TAGS;
|
||||
int64_t MAX_AUTO_THROTTLED_TRANSACTION_TAGS;
|
||||
double MIN_TAG_BUSYNESS;
|
||||
double MIN_TAG_COST;
|
||||
double AUTO_THROTTLE_TARGET_TAG_BUSYNESS;
|
||||
double AUTO_THROTTLE_RAMP_TAG_BUSYNESS;
|
||||
double AUTO_TAG_THROTTLE_RAMP_UP_TIME;
|
||||
double AUTO_TAG_THROTTLE_DURATION;
|
||||
double TAG_THROTTLE_PUSH_INTERVAL;
|
||||
double AUTO_TAG_THROTTLE_START_AGGREGATION_TIME;
|
||||
double AUTO_TAG_THROTTLE_UPDATE_FREQUENCY;
|
||||
bool AUTO_TAG_THROTTLING_ENABLED;
|
||||
|
||||
double MAX_TRANSACTIONS_PER_BYTE;
|
||||
|
|
|
@ -138,7 +138,11 @@ private:
|
|||
struct RkTagThrottleData {
|
||||
ClientTagThrottleLimits limits;
|
||||
Smoother clientRate;
|
||||
|
||||
// Only used by auto-throttles
|
||||
double created = now();
|
||||
double lastUpdated = now();
|
||||
double lastReduced = now();
|
||||
|
||||
RkTagThrottleData() : clientRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {}
|
||||
|
||||
|
@ -177,19 +181,37 @@ public:
|
|||
tagData = std::move(other.tagData);
|
||||
}
|
||||
|
||||
double autoThrottleTag(TransactionTag const& tag, double fractionalBusyness, Optional<double> tpsRate = Optional<double>(), Optional<double> expiration = Optional<double>()) {
|
||||
// Returns the TPS rate if the throttle is updated, otherwise returns an empty optional
|
||||
Optional<double> autoThrottleTag(TransactionTag const& tag, double fractionalBusyness, Optional<double> tpsRate = Optional<double>(), Optional<double> expiration = Optional<double>()) {
|
||||
auto &throttle = autoThrottledTags[tag];
|
||||
|
||||
if(!tpsRate.present()) {
|
||||
// TODO: limit update frequency
|
||||
if(now() > throttle.created + SERVER_KNOBS->AUTO_TAG_THROTTLE_START_AGGREGATION_TIME) {
|
||||
double requestRate = tagData[tag].requestRate.smoothRate();
|
||||
double targetFraction = SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS * (1-fractionalBusyness) / ((1-SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS) * fractionalBusyness);
|
||||
tpsRate = std::min(requestRate * targetFraction, throttle.limits.tpsRate);
|
||||
// TODO: smooth ramp up
|
||||
if(now() <= throttle.created + SERVER_KNOBS->AUTO_TAG_THROTTLE_START_AGGREGATION_TIME) {
|
||||
tpsRate = 1e7;
|
||||
}
|
||||
else if(now() <= throttle.lastUpdated + SERVER_KNOBS->AUTO_TAG_THROTTLE_UPDATE_FREQUENCY) {
|
||||
return Optional<double>();
|
||||
}
|
||||
else {
|
||||
tpsRate = 1e7;
|
||||
double requestRate = tagData[tag].requestRate.smoothRate();
|
||||
|
||||
double targetBusyness = SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS;
|
||||
if(now() >= throttle.lastReduced + SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME) {
|
||||
double rampLocation = (now() - throttle.lastReduced - SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME) / SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS;
|
||||
targetBusyness = pow(targetBusyness, 1 - rampLocation);
|
||||
}
|
||||
|
||||
double targetFraction = targetBusyness * (1-fractionalBusyness) / ((1-targetBusyness) * fractionalBusyness);
|
||||
tpsRate = requestRate * targetFraction;
|
||||
|
||||
if(now() <= throttle.lastReduced + SERVER_KNOBS->AUTO_TAG_THROTTLE_RAMP_UP_TIME && tpsRate.get() >= throttle.limits.tpsRate) {
|
||||
return Optional<double>();
|
||||
}
|
||||
|
||||
throttle.lastUpdated = now();
|
||||
if(tpsRate.get() < throttle.limits.tpsRate) {
|
||||
throttle.lastReduced = now();
|
||||
}
|
||||
}
|
||||
}
|
||||
if(!expiration.present()) {
|
||||
|
@ -210,29 +232,25 @@ public:
|
|||
return tpsRate.get();
|
||||
}
|
||||
|
||||
void insertOrUpdateManualThrottle(TransactionTag const& tag, TransactionPriority priority, double tpsRate, double expiration) {
|
||||
void manualThrottleTag(TransactionTag const& tag, TransactionPriority priority, double tpsRate, double expiration, Optional<ClientTagThrottleLimits> const& oldLimits) {
|
||||
ASSERT(tpsRate >= 0);
|
||||
ASSERT(expiration > now());
|
||||
|
||||
auto &priorityThrottleMap = manualThrottledTags[tag];
|
||||
auto result = priorityThrottleMap.try_emplace(priority);
|
||||
ASSERT(result.second); // Updating to the map is done by copying the whole map
|
||||
|
||||
Optional<ClientTagThrottleLimits> oldThrottleData;
|
||||
if(!result.second) {
|
||||
oldThrottleData = result.first->second.limits;
|
||||
}
|
||||
|
||||
result.first->second.limits.tpsRate = tpsRate;
|
||||
result.first->second.limits.expiration = expiration;
|
||||
|
||||
if(!oldThrottleData.present()) {
|
||||
if(!oldLimits.present()) {
|
||||
TraceEvent("RatekeeperAddingManualThrottle")
|
||||
.detail("Tag", tag)
|
||||
.detail("Rate", tpsRate)
|
||||
.detail("Priority", transactionPriorityToString(priority))
|
||||
.detail("SecondsToExpiration", expiration - now());
|
||||
}
|
||||
else if(oldThrottleData.get().tpsRate != tpsRate || oldThrottleData.get().expiration != expiration) {
|
||||
else if(oldLimits.get().tpsRate != tpsRate || oldLimits.get().expiration != expiration) {
|
||||
TraceEvent("RatekeeperUpdatingManualThrottle")
|
||||
.detail("Tag", tag)
|
||||
.detail("Rate", tpsRate)
|
||||
|
@ -244,6 +262,18 @@ public:
|
|||
ASSERT(clientRate.present());
|
||||
}
|
||||
|
||||
Optional<ClientTagThrottleLimits> getManualTagThrottleLimits(TransactionTag const& tag, TransactionPriority priority) {
|
||||
auto itr = manualThrottledTags.find(tag);
|
||||
if(itr != manualThrottledTags.end()) {
|
||||
auto priorityItr = itr->second.find(priority);
|
||||
if(priorityItr != itr->second.end()) {
|
||||
return priorityItr->second.limits;
|
||||
}
|
||||
}
|
||||
|
||||
return Optional<ClientTagThrottleLimits>();
|
||||
}
|
||||
|
||||
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() {
|
||||
PrioritizedTransactionTagMap<ClientTagThrottleLimits> clientRates;
|
||||
|
||||
|
@ -652,13 +682,13 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
|
|||
}
|
||||
else {
|
||||
TransactionTag tag = *tagKey.tags.begin();
|
||||
//auto itr = self->throttledTags.find(tag); // TODO: logging of changes/additions?
|
||||
Optional<ClientTagThrottleLimits> oldLimits = self->throttledTags.getManualTagThrottleLimits(tag, tagKey.priority);
|
||||
|
||||
if(tagKey.autoThrottled) {
|
||||
updatedTagThrottles.autoThrottleTag(tag, 0, tagValue.tpsRate, tagValue.expirationTime);
|
||||
}
|
||||
else {
|
||||
updatedTagThrottles.insertOrUpdateManualThrottle(tag, tagKey.priority, tagValue.tpsRate, tagValue.expirationTime);
|
||||
updatedTagThrottles.manualThrottleTag(tag, tagKey.priority, tagValue.tpsRate, tagValue.expirationTime, oldLimits);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -678,19 +708,17 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: limit the number of throttles active for a storage server
|
||||
// TODO: allow adjusting existing throttle
|
||||
// TODO: wait for all proxies to report before throttling down
|
||||
void autoThrottleTag(RatekeeperData *self, StorageQueueInfo const& ss, RkTagThrottleCollection& throttledTags) {
|
||||
if(ss.busiestTag.present() && ss.busiestTagFractionalBusyness > SERVER_KNOBS->MIN_TAG_BUSYNESS
|
||||
if(ss.busiestTag.present() && ss.busiestTagFractionalBusyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS
|
||||
&& ss.busiestTagRate > SERVER_KNOBS->MIN_TAG_COST && throttledTags.autoThrottleCount() <= SERVER_KNOBS->MAX_AUTO_THROTTLED_TRANSACTION_TAGS)
|
||||
{
|
||||
double clientRate = self->throttledTags.autoThrottleTag(ss.busiestTag.get(), ss.busiestTagFractionalBusyness);
|
||||
Optional<double> clientRate = self->throttledTags.autoThrottleTag(ss.busiestTag.get(), ss.busiestTagFractionalBusyness);
|
||||
if(clientRate.present()) {
|
||||
TagSet tags;
|
||||
tags.addTag(ss.busiestTag.get());
|
||||
|
||||
TagSet tags;
|
||||
tags.addTag(ss.busiestTag.get());
|
||||
|
||||
self->addActor.send(ThrottleApi::throttleTags(self->db, tags, clientRate, SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, true, TransactionPriority::DEFAULT, now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION));
|
||||
self->addActor.send(ThrottleApi::throttleTags(self->db, tags, clientRate.get(), SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, true, TransactionPriority::DEFAULT, now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue