Reorganization of throttle storage in ratekeeper to support various auto-throttling related actions

This commit is contained in:
A.J. Beamon 2020-04-28 14:30:37 -07:00
parent 573b10f9f5
commit 0ed70accfa
3 changed files with 196 additions and 157 deletions

View File

@ -490,8 +490,9 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( MIN_TAG_BUSYNESS, 0.2 ); if(randomize && BUGGIFY) MIN_TAG_BUSYNESS = 0.0;
init( MIN_TAG_COST, 1000 ); if(randomize && BUGGIFY) MIN_TAG_COST = 0.0;
init( AUTO_THROTTLE_TARGET_TAG_BUSYNESS, 0.1 ); if(randomize && BUGGIFY) AUTO_THROTTLE_TARGET_TAG_BUSYNESS = 0.0;
init( TAG_AUTO_THROTTLE_DURATION, 120.0 ); if(randomize && BUGGIFY) TAG_AUTO_THROTTLE_DURATION = 5.0;
init( AUTO_TAG_THROTTLE_DURATION, 120.0 ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLE_DURATION = 5.0;
init( TAG_THROTTLE_PUSH_INTERVAL, 1.0 ); if(randomize && BUGGIFY) TAG_THROTTLE_PUSH_INTERVAL = 0.0;
init( AUTO_TAG_THROTTLE_START_AGGREGATION_TIME, 5.0 ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLE_START_AGGREGATION_TIME = 0.5;
init( AUTO_TAG_THROTTLING_ENABLED, true ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLING_ENABLED = false;
//Storage Metrics

View File

@ -396,8 +396,9 @@ public:
double MIN_TAG_BUSYNESS;
double MIN_TAG_COST;
double AUTO_THROTTLE_TARGET_TAG_BUSYNESS;
double TAG_AUTO_THROTTLE_DURATION;
double AUTO_TAG_THROTTLE_DURATION;
double TAG_THROTTLE_PUSH_INTERVAL;
double AUTO_TAG_THROTTLE_START_AGGREGATION_TIME;
bool AUTO_TAG_THROTTLING_ENABLED;
double MAX_TRANSACTIONS_PER_BYTE;

View File

@ -128,158 +128,223 @@ struct TLogQueueInfo {
}
};
class RkTagThrottleData : NonCopyable {
class RkTagThrottleCollection : NonCopyable {
private:
struct PriorityThrottleData {
struct RkTagData {
Smoother requestRate;
RkTagData() : requestRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {}
};
struct RkTagThrottleData {
ClientTagThrottleLimits limits;
Smoother clientRate;
double expiration = 0;
double created = now();
Optional<ClientTagThrottleLimits> autoThrottleData;
Optional<ClientTagThrottleLimits> manualThrottleData;
RkTagThrottleData() : clientRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {}
PriorityThrottleData() : clientRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {}
double getTargetRate(ClientTagThrottleLimits const& tagThrottleLimits, double requestRate) {
if(tagThrottleLimits.tpsRate == 0.0 || requestRate == 0.0) {
return tagThrottleLimits.tpsRate;
double getTargetRate(Optional<double> requestRate) {
if(limits.tpsRate == 0.0 || !requestRate.present() || requestRate.get() == 0.0) {
return limits.tpsRate;
}
else {
return std::min(tagThrottleLimits.tpsRate, (tagThrottleLimits.tpsRate / requestRate) * clientRate.smoothTotal());
return std::min(limits.tpsRate, (limits.tpsRate / requestRate.get()) * clientRate.smoothTotal());
}
}
double updateAndGetClientRate(double requestRate) {
double newClientRate = std::numeric_limits<double>::max();
if(autoThrottleData.present()) {
if(autoThrottleData.get().expiration > now()) {
newClientRate = getTargetRate(autoThrottleData.get(), requestRate);
expiration = autoThrottleData.get().expiration;
}
else {
autoThrottleData.reset();
}
Optional<double> updateAndGetClientRate(Optional<double> requestRate) {
if(limits.expiration > now()) {
clientRate.setTotal(getTargetRate(requestRate));
return clientRate.smoothTotal();
}
if(manualThrottleData.present()) {
if(manualThrottleData.get().expiration > now()) {
double manualTargetRate = getTargetRate(manualThrottleData.get(), requestRate);
if(manualTargetRate < newClientRate) {
newClientRate = manualTargetRate;
expiration = manualThrottleData.get().expiration;
}
}
else {
manualThrottleData.reset();
}
else {
return Optional<double>();
}
if(newClientRate == std::numeric_limits<double>::max()) {
return newClientRate;
}
clientRate.setTotal(newClientRate);
return clientRate.smoothTotal();
}
};
Smoother smoothRequests;
public:
std::map<TransactionPriority, PriorityThrottleData> throttleData;
RkTagThrottleCollection() {}
RkTagThrottleData() : smoothRequests(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {}
RkTagThrottleCollection(RkTagThrottleCollection &&other) {
autoThrottledTags = std::move(other.autoThrottledTags);
manualThrottledTags = std::move(other.manualThrottledTags);
tagData = std::move(other.tagData);
}
void operator=(RkTagThrottleCollection &&other) {
autoThrottledTags = std::move(other.autoThrottledTags);
manualThrottledTags = std::move(other.manualThrottledTags);
tagData = std::move(other.tagData);
}
// Inserts or updates a throttle
void insertOrUpdateThrottle(TransactionTagRef tag, TransactionPriority priority, bool autoThrottle, double tpsRate, double expiration) {
double autoThrottleTag(TransactionTag const& tag, double fractionalBusyness, Optional<double> tpsRate = Optional<double>(), Optional<double> expiration = Optional<double>()) {
auto &throttle = autoThrottledTags[tag];
if(!tpsRate.present()) {
// TODO: limit update frequency
if(now() > throttle.created + SERVER_KNOBS->AUTO_TAG_THROTTLE_START_AGGREGATION_TIME) {
double requestRate = tagData[tag].requestRate.smoothRate();
double targetFraction = SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS * (1-fractionalBusyness) / ((1-SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS) * fractionalBusyness);
tpsRate = std::min(requestRate * targetFraction, throttle.limits.tpsRate);
// TODO: smooth ramp up
}
else {
tpsRate = 1e7;
}
}
if(!expiration.present()) {
expiration = now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION;
}
TraceEvent("RkSetAutoThrottle")
.detail("Tag", tag)
.detail("TargetRate", tpsRate.get())
.detail("Expiration", expiration.get() - now());
throttle.limits.tpsRate = tpsRate.get();
throttle.limits.expiration = expiration.get();
Optional<double> clientRate = throttle.updateAndGetClientRate(getRequestRate(tag));
ASSERT(clientRate.present());
return tpsRate.get();
}
void insertOrUpdateManualThrottle(TransactionTag const& tag, TransactionPriority priority, double tpsRate, double expiration) {
ASSERT(tpsRate >= 0);
ASSERT(expiration > now());
auto &priorityThrottleData = throttleData[priority];
auto &priorityThrottleMap = manualThrottledTags[tag];
auto result = priorityThrottleMap.try_emplace(priority);
Optional<ClientTagThrottleLimits> oldThrottleData;
if(autoThrottle) {
oldThrottleData = priorityThrottleData.autoThrottleData;
// Auto throttle rates cannot be increased while active
if(oldThrottleData.present()) {
tpsRate = std::min(oldThrottleData.get().tpsRate, tpsRate);
}
priorityThrottleData.autoThrottleData = ClientTagThrottleLimits(tpsRate, expiration);
}
else {
oldThrottleData = priorityThrottleData.manualThrottleData;
priorityThrottleData.manualThrottleData = ClientTagThrottleLimits(tpsRate, expiration);
if(!result.second) {
oldThrottleData = result.first->second.limits;
}
result.first->second.limits.tpsRate = tpsRate;
result.first->second.limits.expiration = expiration;
if(!oldThrottleData.present()) {
TraceEvent("RatekeeperAddingThrottle")
TraceEvent("RatekeeperAddingManualThrottle")
.detail("Tag", tag)
.detail("Rate", tpsRate)
.detail("Priority", transactionPriorityToString(priority))
.detail("SecondsToExpiration", expiration - now())
.detail("AutoThrottled", autoThrottle);
.detail("SecondsToExpiration", expiration - now());
}
else if(oldThrottleData.get().tpsRate != tpsRate || oldThrottleData.get().expiration != expiration) {
TraceEvent("RatekeeperUpdatingThrottle")
TraceEvent("RatekeeperUpdatingManualThrottle")
.detail("Tag", tag)
.detail("Rate", tpsRate)
.detail("Priority", transactionPriorityToString(priority))
.detail("SecondsToExpiration", expiration - now())
.detail("AutoThrottled", autoThrottle);
.detail("SecondsToExpiration", expiration - now());
}
double clientRate = priorityThrottleData.updateAndGetClientRate(smoothRequests.smoothRate());
ASSERT(clientRate != std::numeric_limits<double>::max());
Optional<double> clientRate = result.first->second.updateAndGetClientRate(getRequestRate(tag));
ASSERT(clientRate.present());
}
// Remove the specified throttle and returns true if this tag still has throttles present
bool eraseThrottle(TransactionPriority priority, bool autoThrottle) {
auto itr = throttleData.find(priority);
if(itr != throttleData.end()) {
bool erase = false;
if(autoThrottle) {
itr->second.autoThrottleData.reset();
erase = !itr->second.manualThrottleData.present();
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() {
PrioritizedTransactionTagMap<ClientTagThrottleLimits> clientRates;
for(auto tagItr = tagData.begin(); tagItr != tagData.end();) {
bool tagPresent = false;
double requestRate = tagItr->second.requestRate.smoothRate();
auto manualItr = manualThrottledTags.find(tagItr->first);
if(manualItr != manualThrottledTags.end()) {
Optional<ClientTagThrottleLimits> manualClientRate;
for(auto priority = allTransactionPriorities.rbegin(); priority != allTransactionPriorities.rend(); ++priority) {
auto priorityItr = manualItr->second.find(*priority);
if(priorityItr != manualItr->second.end()) {
Optional<double> priorityClientRate = priorityItr->second.updateAndGetClientRate(requestRate);
if(!priorityClientRate.present()) {
priorityItr = manualItr->second.erase(priorityItr);
}
else if(!manualClientRate.present() || manualClientRate.get().tpsRate > priorityClientRate.get()) {
manualClientRate = ClientTagThrottleLimits(priorityClientRate.get(), priorityItr->second.limits.expiration);
++priorityItr;
}
}
if(manualItr->second.empty()) {
manualThrottledTags.erase(manualItr);
}
else {
tagPresent = true;
}
if(manualClientRate.present()) {
clientRates[*priority][tagItr->first] = manualClientRate.get();
}
}
}
auto autoItr = autoThrottledTags.find(tagItr->first);
if(autoItr != autoThrottledTags.end()) {
Optional<double> autoClientRate = autoItr->second.updateAndGetClientRate(requestRate);
if(autoClientRate.present()) {
tagPresent = true;
auto result = clientRates[TransactionPriority::DEFAULT].try_emplace(tagItr->first, autoClientRate.get(), autoItr->second.limits.expiration);
if(!result.second && result.first->second.tpsRate > autoClientRate.get()) {
result.first->second = ClientTagThrottleLimits(autoClientRate.get(), autoItr->second.limits.expiration);
}
clientRates[TransactionPriority::BATCH][tagItr->first] = ClientTagThrottleLimits(0, autoItr->second.limits.expiration);
}
else {
autoThrottledTags.erase(autoItr);
}
}
if(!tagPresent) {
tagItr = tagData.erase(tagItr);
}
else {
itr->second.manualThrottleData.reset();
erase = !itr->second.autoThrottleData.present();
}
if(erase) {
throttleData.erase(itr);
}
return !throttleData.empty();
}
}
Optional<std::pair<double, double>> getClientRate(TransactionPriority minPriority) {
Optional<std::pair<double, double>> clientRate;
double requestRate = smoothRequests.smoothRate();
for(auto itr = throttleData.lower_bound(minPriority); itr != throttleData.end();) {
double priorityClientRate = itr->second.updateAndGetClientRate(requestRate);
if(!clientRate.present() || clientRate.get().first > priorityClientRate) {
clientRate = std::make_pair(priorityClientRate, itr->second.expiration);
}
if(priorityClientRate == std::numeric_limits<double>::max()) {
itr = throttleData.erase(itr);
}
else {
++itr;
++tagItr;
}
}
return clientRate;
return clientRates;
}
void addRequests(int requests) {
smoothRequests.addDelta(requests);
getClientRate(TransactionPriority::BATCH); // Update client rates based on new request rate
void addRequests(TransactionTag const& tag, int requests) {
auto tagItr = tagData.try_emplace(tag);
tagItr.first->second.requestRate.addDelta(requests);
double requestRate = tagItr.first->second.requestRate.smoothRate();
auto autoItr = autoThrottledTags.find(tag);
if(autoItr != autoThrottledTags.end()) {
autoItr->second.updateAndGetClientRate(requestRate);
}
auto manualItr = manualThrottledTags.find(tag);
if(manualItr != manualThrottledTags.end()) {
for(auto priorityItr = manualItr->second.begin(); priorityItr != manualItr->second.end(); ++priorityItr) {
priorityItr->second.updateAndGetClientRate(requestRate);
}
}
}
double getRequestRate() {
return smoothRequests.smoothRate();
Optional<double> getRequestRate(TransactionTag const& tag) {
auto itr = tagData.find(tag);
if(itr != tagData.end()) {
return itr->second.requestRate.smoothRate();
}
return Optional<double>();
}
int64_t autoThrottleCount() const {
return autoThrottledTags.size();
}
int64_t manualThrottleCount() const {
return autoThrottledTags.size();
}
TransactionTagMap<RkTagThrottleData> autoThrottledTags;
TransactionTagMap<std::map<TransactionPriority, RkTagThrottleData>> manualThrottledTags;
TransactionTagMap<RkTagData> tagData;
};
struct RatekeeperLimits {
@ -344,7 +409,7 @@ struct RatekeeperData {
double lastWarning;
double lastSSListFetchedTimestamp;
TransactionTagMap<RkTagThrottleData> throttledTags;
RkTagThrottleCollection throttledTags;
uint64_t throttledTagChangeId;
RatekeeperLimits normalLimits;
@ -564,7 +629,7 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
self->autoThrottlingEnabled = SERVER_KNOBS->AUTO_TAG_THROTTLING_ENABLED;
}
TransactionTagMap<RkTagThrottleData> updatedTagThrottles;
RkTagThrottleCollection updatedTagThrottles;
TraceEvent("RatekeeperReadThrottledTags").detail("NumThrottledTags", throttledTagKeys.get().size());
for(auto entry : throttledTagKeys.get()) {
@ -587,9 +652,14 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
}
else {
TransactionTag tag = *tagKey.tags.begin();
auto itr = self->throttledTags.find(tag);
auto result = updatedTagThrottles.try_emplace(tag);
result.first->second.insertOrUpdateThrottle(tag, tagKey.priority, tagKey.autoThrottled, tagValue.tpsRate, tagValue.expirationTime);
//auto itr = self->throttledTags.find(tag); // TODO: logging of changes/additions?
if(tagKey.autoThrottled) {
updatedTagThrottles.autoThrottleTag(tag, 0, tagValue.tpsRate, tagValue.expirationTime);
}
else {
updatedTagThrottles.insertOrUpdateManualThrottle(tag, tagKey.priority, tagValue.tpsRate, tagValue.expirationTime);
}
}
}
@ -609,35 +679,22 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
}
// TODO: limit the number of throttles active for a storage server
// TODO: correctly limit auto throttled count
// TODO: allow adjusting existing throttle
// TODO: wait for all proxies to report before throttling down
void autoThrottleTag(RatekeeperData *self, StorageQueueInfo const& ss, TransactionTagMap<RkTagThrottleData>& throttledTags) {
void autoThrottleTag(RatekeeperData *self, StorageQueueInfo const& ss, RkTagThrottleCollection& throttledTags) {
if(ss.busiestTag.present() && ss.busiestTagFractionalBusyness > SERVER_KNOBS->MIN_TAG_BUSYNESS
&& ss.busiestTagRate > SERVER_KNOBS->MIN_TAG_COST && throttledTags.size() <= SERVER_KNOBS->MAX_AUTO_THROTTLED_TRANSACTION_TAGS) {
auto &throttle = throttledTags[ss.busiestTag.get()];
double targetRate = 1e7;
double requestRate = throttle.getRequestRate();
if(requestRate != 0) { // TODO: figure out this condition
double targetFraction = SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS * (1-ss.busiestTagFractionalBusyness) / ((1-SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS) * ss.busiestTagFractionalBusyness);
targetRate = requestRate * targetFraction;
}
TraceEvent("RkSetAutoThrottle").detail("TargetRate", targetRate);
throttle.insertOrUpdateThrottle(ss.busiestTag.get(), TransactionPriority::DEFAULT, true, targetRate, now() + SERVER_KNOBS->TAG_AUTO_THROTTLE_DURATION);
throttle.insertOrUpdateThrottle(ss.busiestTag.get(), TransactionPriority::BATCH, true, 0, now() + SERVER_KNOBS->TAG_AUTO_THROTTLE_DURATION);
&& ss.busiestTagRate > SERVER_KNOBS->MIN_TAG_COST && throttledTags.autoThrottleCount() <= SERVER_KNOBS->MAX_AUTO_THROTTLED_TRANSACTION_TAGS)
{
double clientRate = self->throttledTags.autoThrottleTag(ss.busiestTag.get(), ss.busiestTagFractionalBusyness);
TagSet tags;
tags.addTag(ss.busiestTag.get());
// TODO: same transaction?
self->addActor.send(ThrottleApi::throttleTags(self->db, tags, targetRate, SERVER_KNOBS->TAG_AUTO_THROTTLE_DURATION, true, TransactionPriority::DEFAULT, now() + SERVER_KNOBS->TAG_AUTO_THROTTLE_DURATION));
self->addActor.send(ThrottleApi::throttleTags(self->db, tags, 0, SERVER_KNOBS->TAG_AUTO_THROTTLE_DURATION, true, TransactionPriority::BATCH, now() + SERVER_KNOBS->TAG_AUTO_THROTTLE_DURATION));
self->addActor.send(ThrottleApi::throttleTags(self->db, tags, clientRate, SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, true, TransactionPriority::DEFAULT, now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION));
}
}
void updateRate(RatekeeperData* self, RatekeeperLimits* limits, TransactionTagMap<RkTagThrottleData>& throttledTags) {
void updateRate(RatekeeperData* self, RatekeeperLimits* limits, RkTagThrottleCollection& throttledTags) {
//double controlFactor = ; // dt / eFoldingTime
double actualTps = self->smoothReleasedTransactions.smoothRate();
@ -704,7 +761,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits, TransactionTagMa
double targetRateRatio = std::min(( storageQueue - targetBytes + springBytes ) / (double)springBytes, 2.0);
TraceEvent("RkCheckingAutoThrottle").detail("StorageQueue", storageQueue).detail("BusiestTag", ss.busiestTag.present() ? ss.busiestTag.get() : LiteralStringRef("<none>")).detail("FractionalBusyness", ss.busiestTagFractionalBusyness).detail("BusiestTagRate", ss.busiestTagRate).detail("ThrottledTags", throttledTags.size());
TraceEvent("RkCheckingAutoThrottle").detail("StorageQueue", storageQueue).detail("BusiestTag", ss.busiestTag.present() ? ss.busiestTag.get() : LiteralStringRef("<none>")).detail("FractionalBusyness", ss.busiestTagFractionalBusyness).detail("BusiestTagRate", ss.busiestTagRate).detail("AutoThrottledTags", throttledTags.autoThrottleCount());
if(limits->priority == TransactionPriority::DEFAULT && (storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES || storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS)) {
autoThrottleTag(self, ss, throttledTags);
}
@ -976,7 +1033,8 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits, TransactionTagMa
.detail("LimitingStorageServerVersionLag", limitingVersionLag)
.detail("WorstStorageServerDurabilityLag", worstDurabilityLag)
.detail("LimitingStorageServerDurabilityLag", limitingDurabilityLag)
.detail("TagsThrottled", throttledTags.size())
.detail("TagsAutoThrottled", throttledTags.autoThrottleCount())
.detail("TagsManuallyThrottled", throttledTags.manualThrottleCount())
.trackLatest(name);
}
}
@ -1061,10 +1119,7 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
self.smoothReleasedTransactions.addDelta( req.totalReleasedTransactions - p.totalTransactions );
for(auto tag : req.throttledTagCounts) {
auto itr = self.throttledTags.find(tag.first);
if(itr != self.throttledTags.end()) {
itr->second.addRequests(tag.second);
}
self.throttledTags.addRequests(tag.first, tag.second);
}
}
if(p.batchTransactions > 0) {
@ -1083,25 +1138,7 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
p.lastThrottledTagChangeId = self.throttledTagChangeId;
p.lastTagPushTime = now();
reply.throttledTags = PrioritizedTransactionTagMap<ClientTagThrottleLimits>();
for(auto itr = self.throttledTags.begin(); itr != self.throttledTags.end();) {
for(auto priority : allTransactionPriorities) {
Optional<std::pair<double, double>> clientRate = itr->second.getClientRate(priority);
if(clientRate.present()) {
auto &priorityTags = reply.throttledTags.get()[priority];
priorityTags.try_emplace(itr->first, ClientTagThrottleLimits(clientRate.get().first, clientRate.get().second));
}
// Handle throttle expiration. We expire a throttle if no rate is returned for the lowest priority,
// which means that no throttles are active at any priority.
else if(priority == TransactionPriority::MIN) {
itr = self.throttledTags.erase(itr);
break;
}
}
++itr;
}
reply.throttledTags = self.throttledTags.getClientRates();
}
reply.healthMetrics.update(self.healthMetrics, true, req.detailed);