Remove TagThrottler::ratekeeper field
This commit is contained in:
parent
182fb8de3d
commit
0f4c808f37
|
@ -464,7 +464,7 @@ Ratekeeper::Ratekeeper(UID id, Database db)
|
|||
SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH,
|
||||
SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH),
|
||||
lastBusiestCommitTagPick(0.0) {
|
||||
tagThrottler = std::make_unique<TagThrottler>(this);
|
||||
tagThrottler = std::make_unique<TagThrottler>(db, id);
|
||||
expiredTagThrottleCleanup = recurring([this]() { ThrottleApi::expire(this->db.getReference()); },
|
||||
SERVER_KNOBS->TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL);
|
||||
}
|
||||
|
|
|
@ -378,7 +378,8 @@ public:
|
|||
};
|
||||
|
||||
class TagThrottlerImpl {
|
||||
Ratekeeper const* ratekeeper;
|
||||
Database db;
|
||||
UID id;
|
||||
RkTagThrottleCollection throttledTags;
|
||||
uint64_t throttledTagChangeId{ 0 };
|
||||
bool autoThrottlingEnabled{ false };
|
||||
|
@ -386,7 +387,7 @@ class TagThrottlerImpl {
|
|||
ACTOR static Future<Void> monitorThrottlingChanges(TagThrottlerImpl* self) {
|
||||
state bool committed = false;
|
||||
loop {
|
||||
state ReadYourWritesTransaction tr(self->ratekeeper->db);
|
||||
state ReadYourWritesTransaction tr(self->db);
|
||||
|
||||
loop {
|
||||
try {
|
||||
|
@ -408,20 +409,20 @@ class TagThrottlerImpl {
|
|||
autoThrottlingEnabled.get().get() == LiteralStringRef("0")) {
|
||||
TEST(true); // Auto-throttling disabled
|
||||
if (self->autoThrottlingEnabled) {
|
||||
TraceEvent("AutoTagThrottlingDisabled", self->ratekeeper->id).log();
|
||||
TraceEvent("AutoTagThrottlingDisabled", self->id).log();
|
||||
}
|
||||
self->autoThrottlingEnabled = false;
|
||||
} else if (autoThrottlingEnabled.get().present() &&
|
||||
autoThrottlingEnabled.get().get() == LiteralStringRef("1")) {
|
||||
TEST(true); // Auto-throttling enabled
|
||||
if (!self->autoThrottlingEnabled) {
|
||||
TraceEvent("AutoTagThrottlingEnabled", self->ratekeeper->id).log();
|
||||
TraceEvent("AutoTagThrottlingEnabled", self->id).log();
|
||||
}
|
||||
self->autoThrottlingEnabled = true;
|
||||
} else {
|
||||
TEST(true); // Auto-throttling unspecified
|
||||
if (autoThrottlingEnabled.get().present()) {
|
||||
TraceEvent(SevWarnAlways, "InvalidAutoTagThrottlingValue", self->ratekeeper->id)
|
||||
TraceEvent(SevWarnAlways, "InvalidAutoTagThrottlingValue", self->id)
|
||||
.detail("Value", autoThrottlingEnabled.get().get());
|
||||
}
|
||||
self->autoThrottlingEnabled = SERVER_KNOBS->AUTO_TAG_THROTTLING_ENABLED;
|
||||
|
@ -432,7 +433,7 @@ class TagThrottlerImpl {
|
|||
|
||||
RkTagThrottleCollection updatedTagThrottles;
|
||||
|
||||
TraceEvent("RatekeeperReadThrottledTags", self->ratekeeper->id)
|
||||
TraceEvent("RatekeeperReadThrottledTags", self->id)
|
||||
.detail("NumThrottledTags", throttledTagKeys.get().size());
|
||||
for (auto entry : throttledTagKeys.get()) {
|
||||
TagThrottleKey tagKey = TagThrottleKey::fromKey(entry.key);
|
||||
|
@ -458,14 +459,14 @@ class TagThrottlerImpl {
|
|||
|
||||
if (tagKey.throttleType == TagThrottleType::AUTO) {
|
||||
updatedTagThrottles.autoThrottleTag(
|
||||
self->ratekeeper->id, tag, 0, tagValue.tpsRate, tagValue.expirationTime);
|
||||
self->id, tag, 0, tagValue.tpsRate, tagValue.expirationTime);
|
||||
if (tagValue.reason == TagThrottledReason::BUSY_READ) {
|
||||
updatedTagThrottles.busyReadTagCount++;
|
||||
} else if (tagValue.reason == TagThrottledReason::BUSY_WRITE) {
|
||||
updatedTagThrottles.busyWriteTagCount++;
|
||||
}
|
||||
} else {
|
||||
updatedTagThrottles.manualThrottleTag(self->ratekeeper->id,
|
||||
updatedTagThrottles.manualThrottleTag(self->id,
|
||||
tag,
|
||||
tagKey.priority,
|
||||
tagValue.tpsRate,
|
||||
|
@ -483,11 +484,11 @@ class TagThrottlerImpl {
|
|||
committed = true;
|
||||
|
||||
wait(watchFuture);
|
||||
TraceEvent("RatekeeperThrottleSignaled", self->ratekeeper->id).log();
|
||||
TraceEvent("RatekeeperThrottleSignaled", self->id).log();
|
||||
TEST(true); // Tag throttle changes detected
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("RatekeeperMonitorThrottlingChangesError", self->ratekeeper->id).error(e);
|
||||
TraceEvent("RatekeeperMonitorThrottlingChangesError", self->id).error(e);
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
@ -503,12 +504,12 @@ class TagThrottlerImpl {
|
|||
// currently MIN_TAG_PAGES_RATE > MIN_TAG_COST in our default knobs.
|
||||
if (busyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && rate > SERVER_KNOBS->MIN_TAG_COST) {
|
||||
TEST(true); // Transaction tag auto-throttled
|
||||
Optional<double> clientRate = autoThrottleTag(ratekeeper->id, tag, busyness);
|
||||
Optional<double> clientRate = autoThrottleTag(id, tag, busyness);
|
||||
if (clientRate.present()) {
|
||||
TagSet tags;
|
||||
tags.addTag(tag);
|
||||
|
||||
Reference<DatabaseContext> dbRef = Reference<DatabaseContext>::addRef(ratekeeper->db.getPtr());
|
||||
Reference<DatabaseContext> dbRef = Reference<DatabaseContext>::addRef(db.getPtr());
|
||||
return ThrottleApi::throttleTags(dbRef,
|
||||
tags,
|
||||
clientRate.get(),
|
||||
|
@ -523,7 +524,7 @@ class TagThrottlerImpl {
|
|||
}
|
||||
|
||||
public:
|
||||
TagThrottlerImpl(Ratekeeper const* ratekeeper) : ratekeeper(ratekeeper) {}
|
||||
TagThrottlerImpl(Database db, UID id) : db(db), id(id) {}
|
||||
Future<Void> monitorThrottlingChanges() { return monitorThrottlingChanges(this); }
|
||||
|
||||
void addRequests(TransactionTag tag, int count) { throttledTags.addRequests(tag, count); }
|
||||
|
@ -561,7 +562,7 @@ public:
|
|||
|
||||
}; // class TagThrottlerImpl
|
||||
|
||||
TagThrottler::TagThrottler(Ratekeeper* ratekeeper) : impl(PImpl<TagThrottlerImpl>::create(ratekeeper)) {}
|
||||
TagThrottler::TagThrottler(Database db, UID id) : impl(PImpl<TagThrottlerImpl>::create(db, id)) {}
|
||||
TagThrottler::~TagThrottler() = default;
|
||||
Future<Void> TagThrottler::monitorThrottlingChanges() {
|
||||
return impl->monitorThrottlingChanges();
|
||||
|
|
|
@ -27,7 +27,7 @@ class TagThrottler {
|
|||
PImpl<class TagThrottlerImpl> impl;
|
||||
|
||||
public:
|
||||
TagThrottler(Ratekeeper* ratekeeper);
|
||||
TagThrottler(Database db, UID id);
|
||||
~TagThrottler();
|
||||
Future<Void> monitorThrottlingChanges();
|
||||
void addRequests(TransactionTag tag, int count);
|
||||
|
|
Loading…
Reference in New Issue