Move tryAutoThrottleTag method to TagThrottler
This commit is contained in:
parent
b80f89204a
commit
58669717f1
|
@ -573,7 +573,7 @@ void Ratekeeper::updateRate(RatekeeperLimits* limits) {
|
|||
double targetRateRatio = std::min((storageQueue - targetBytes + springBytes) / (double)springBytes, 2.0);
|
||||
|
||||
if (limits->priority == TransactionPriority::DEFAULT) {
|
||||
tryAutoThrottleTag(ss, storageQueue, storageDurabilityLag);
|
||||
addActor.send(tagThrottler->tryAutoThrottleTag(ss, storageQueue, storageDurabilityLag));
|
||||
}
|
||||
|
||||
double inputRate = ss.smoothInputBytes.smoothRate();
|
||||
|
@ -1011,50 +1011,6 @@ Future<Void> Ratekeeper::refreshStorageServerCommitCost() {
|
|||
return Void();
|
||||
}
|
||||
|
||||
void Ratekeeper::tryAutoThrottleTag(TransactionTag tag, double rate, double busyness, TagThrottledReason reason) {
|
||||
// NOTE: before the comparison with MIN_TAG_COST, the busiest tag rate also compares with MIN_TAG_PAGES_RATE
|
||||
// currently MIN_TAG_PAGES_RATE > MIN_TAG_COST in our default knobs.
|
||||
if (busyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && rate > SERVER_KNOBS->MIN_TAG_COST) {
|
||||
TEST(true); // Transaction tag auto-throttled
|
||||
Optional<double> clientRate = tagThrottler->autoThrottleTag(id, tag, busyness);
|
||||
if (clientRate.present()) {
|
||||
TagSet tags;
|
||||
tags.addTag(tag);
|
||||
|
||||
Reference<DatabaseContext> dbRef = Reference<DatabaseContext>::addRef(db.getPtr());
|
||||
addActor.send(ThrottleApi::throttleTags(dbRef,
|
||||
tags,
|
||||
clientRate.get(),
|
||||
SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION,
|
||||
TagThrottleType::AUTO,
|
||||
TransactionPriority::DEFAULT,
|
||||
now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION,
|
||||
reason));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Ratekeeper::tryAutoThrottleTag(StorageQueueInfo& ss, int64_t storageQueue, int64_t storageDurabilityLag) {
|
||||
// NOTE: we just keep it simple and don't differentiate write-saturation and read-saturation at the moment. In most
|
||||
// of situation, this works. More indicators besides queue size and durability lag could be investigated in the
|
||||
// future
|
||||
if (storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES ||
|
||||
storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS) {
|
||||
if (ss.busiestWriteTag.present()) {
|
||||
tryAutoThrottleTag(ss.busiestWriteTag.get(),
|
||||
ss.busiestWriteTagRate,
|
||||
ss.busiestWriteTagFractionalBusyness,
|
||||
TagThrottledReason::BUSY_WRITE);
|
||||
}
|
||||
if (ss.busiestReadTag.present()) {
|
||||
tryAutoThrottleTag(ss.busiestReadTag.get(),
|
||||
ss.busiestReadTagRate,
|
||||
ss.busiestReadTagFractionalBusyness,
|
||||
TagThrottledReason::BUSY_READ);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
||||
wait(Ratekeeper::run(rkInterf, dbInfo));
|
||||
return Void();
|
||||
|
|
|
@ -21,7 +21,6 @@
|
|||
#include "fdbserver/TagThrottler.h"
|
||||
|
||||
class RkTagThrottleCollection : NonCopyable {
|
||||
private:
|
||||
struct RkTagData {
|
||||
Smoother requestRate;
|
||||
RkTagData() : requestRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {}
|
||||
|
@ -495,6 +494,34 @@ class TagThrottlerImpl {
|
|||
}
|
||||
}
|
||||
|
||||
Optional<double> autoThrottleTag(UID id, TransactionTag tag, double busyness) {
|
||||
return throttledTags.autoThrottleTag(id, tag, busyness);
|
||||
}
|
||||
|
||||
Future<Void> tryAutoThrottleTag(TransactionTag tag, double rate, double busyness, TagThrottledReason reason) {
|
||||
// NOTE: before the comparison with MIN_TAG_COST, the busiest tag rate also compares with MIN_TAG_PAGES_RATE
|
||||
// currently MIN_TAG_PAGES_RATE > MIN_TAG_COST in our default knobs.
|
||||
if (busyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && rate > SERVER_KNOBS->MIN_TAG_COST) {
|
||||
TEST(true); // Transaction tag auto-throttled
|
||||
Optional<double> clientRate = autoThrottleTag(ratekeeper->id, tag, busyness);
|
||||
if (clientRate.present()) {
|
||||
TagSet tags;
|
||||
tags.addTag(tag);
|
||||
|
||||
Reference<DatabaseContext> dbRef = Reference<DatabaseContext>::addRef(ratekeeper->db.getPtr());
|
||||
return ThrottleApi::throttleTags(dbRef,
|
||||
tags,
|
||||
clientRate.get(),
|
||||
SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION,
|
||||
TagThrottleType::AUTO,
|
||||
TransactionPriority::DEFAULT,
|
||||
now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION,
|
||||
reason);
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
public:
|
||||
TagThrottlerImpl(Ratekeeper* ratekeeper) : ratekeeper(ratekeeper) {}
|
||||
Future<Void> monitorThrottlingChanges() { return monitorThrottlingChanges(this); }
|
||||
|
@ -509,9 +536,29 @@ public:
|
|||
uint32_t busyWriteTagCount() const { return throttledTags.busyWriteTagCount; }
|
||||
int64_t manualThrottleCount() const { return throttledTags.manualThrottleCount(); }
|
||||
bool isAutoThrottlingEnabled() const { return autoThrottlingEnabled; }
|
||||
Optional<double> autoThrottleTag(UID id, TransactionTag tag, double busyness) {
|
||||
return throttledTags.autoThrottleTag(id, tag, busyness);
|
||||
|
||||
Future<Void> tryAutoThrottleTag(StorageQueueInfo& ss, int64_t storageQueue, int64_t storageDurabilityLag) {
|
||||
// NOTE: we just keep it simple and don't differentiate write-saturation and read-saturation at the moment. In
|
||||
// most of situation, this works. More indicators besides queue size and durability lag could be investigated in
|
||||
// the future
|
||||
if (storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES ||
|
||||
storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS) {
|
||||
if (ss.busiestWriteTag.present()) {
|
||||
return tryAutoThrottleTag(ss.busiestWriteTag.get(),
|
||||
ss.busiestWriteTagRate,
|
||||
ss.busiestWriteTagFractionalBusyness,
|
||||
TagThrottledReason::BUSY_WRITE);
|
||||
}
|
||||
if (ss.busiestReadTag.present()) {
|
||||
return tryAutoThrottleTag(ss.busiestReadTag.get(),
|
||||
ss.busiestReadTagRate,
|
||||
ss.busiestReadTagFractionalBusyness,
|
||||
TagThrottledReason::BUSY_READ);
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
}; // class TagThrottlerImpl
|
||||
|
||||
TagThrottler::TagThrottler(Ratekeeper* ratekeeper) : impl(PImpl<TagThrottlerImpl>::create(ratekeeper)) {}
|
||||
|
@ -543,6 +590,8 @@ int64_t TagThrottler::manualThrottleCount() const {
|
|||
bool TagThrottler::isAutoThrottlingEnabled() const {
|
||||
return impl->isAutoThrottlingEnabled();
|
||||
}
|
||||
Optional<double> TagThrottler::autoThrottleTag(UID id, TransactionTag tag, double busyness) {
|
||||
return impl->autoThrottleTag(id, tag, busyness);
|
||||
Future<Void> TagThrottler::tryAutoThrottleTag(StorageQueueInfo& ss,
|
||||
int64_t storageQueue,
|
||||
int64_t storageDurabilityLag) {
|
||||
return impl->tryAutoThrottleTag(ss, storageQueue, storageDurabilityLag);
|
||||
}
|
||||
|
|
|
@ -38,5 +38,5 @@ public:
|
|||
uint32_t busyWriteTagCount() const;
|
||||
int64_t manualThrottleCount() const;
|
||||
bool isAutoThrottlingEnabled() const;
|
||||
Optional<double> autoThrottleTag(UID id, TransactionTag tag, double busyness);
|
||||
Future<Void> tryAutoThrottleTag(StorageQueueInfo&, int64_t storageQueue, int64_t storageDurabilityLag);
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue