Add throttling ratio to GlobalTagThrottler

This commit is contained in:
sfc-gh-tclinkenbeard 2022-07-03 19:09:28 -07:00
parent ab2110142a
commit fb6540ab43
2 changed files with 110 additions and 12 deletions

View File

@ -37,17 +37,21 @@ class GlobalTagThrottlerImpl {
Smoother transactionCounter;
Smoother perClientRate;
Optional<double> getReadTPSLimit() const {
Optional<double> getReadTPSLimit(Optional<double> maxDesiredCost) const {
if (totalReadCostRate.smoothTotal() > 0) {
return quota.get().totalReadQuota * transactionCounter.smoothRate() / totalReadCostRate.smoothTotal();
auto const desiredReadCost = maxDesiredCost.present() ? std::min(maxDesiredCost.get(), quota.get().totalReadQuota) : quota.get().totalReadQuota;
auto const averageCostPerTransaction = transactionCounter.smoothRate() / totalReadCostRate.smoothTotal();
return desiredReadCost * averageCostPerTransaction;
} else {
return {};
}
}
Optional<double> getWriteTPSLimit() const {
Optional<double> getWriteTPSLimit(Optional<double> maxDesiredCost) const {
if (totalWriteCostRate.smoothTotal() > 0) {
return quota.get().totalWriteQuota * transactionCounter.smoothRate() / totalWriteCostRate.smoothTotal();
auto const desiredWriteCost = maxDesiredCost.present() ? std::min(maxDesiredCost.get(), quota.get().totalWriteQuota) : quota.get().totalWriteQuota;
auto const averageCostPerTransaction = transactionCounter.smoothRate() / totalWriteCostRate.smoothTotal();
return desiredWriteCost * averageCostPerTransaction;
} else {
return {};
}
@ -62,6 +66,18 @@ class GlobalTagThrottlerImpl {
void setQuota(ThrottleApi::TagQuotaValue const& quota) { this->quota = quota; }
Optional<ThrottleApi::TagQuotaValue> const &getQuota() const {
return quota;
}
double getReadCostRate() const {
return totalReadCostRate.smoothTotal();
}
double getWriteCostRate() const {
return totalWriteCostRate.smoothTotal();
}
void updateReadCostRate(UID ssId, double newReadCostRate) {
auto& currentReadCostRate = ssToReadCostRate[ssId];
auto diff = newReadCostRate - currentReadCostRate;
@ -78,11 +94,11 @@ class GlobalTagThrottlerImpl {
void addTransactions(int count) { transactionCounter.addDelta(count); }
Optional<double> getTargetTotalTPSLimit() const {
Optional<double> getTargetTotalTPSLimit(Optional<double> maxReadCostRate, Optional<double> maxWriteCostRate) const {
if (!quota.present())
return {};
auto readLimit = getReadTPSLimit();
auto writeLimit = getWriteTPSLimit();
auto readLimit = getReadTPSLimit(maxReadCostRate);
auto writeLimit = getWriteTPSLimit(maxWriteCostRate);
if (!readLimit.present() && !writeLimit.present()) {
return {};
@ -97,8 +113,8 @@ class GlobalTagThrottlerImpl {
}
}
Optional<ClientTagThrottleLimits> updateAndGetPerClientLimit() {
auto targetRate = getTargetTotalTPSLimit();
Optional<ClientTagThrottleLimits> updateAndGetPerClientLimit(Optional<double> maxReadCostRate, Optional<double> maxWriteCostRate) {
auto targetRate = getTargetTotalTPSLimit(maxReadCostRate, maxWriteCostRate);
if (targetRate.present() && transactionCounter.smoothRate() > 0) {
auto newPerClientRate = std::max(
SERVER_KNOBS->GLOBAL_TAG_THROTTLING_MIN_RATE,
@ -113,8 +129,8 @@ class GlobalTagThrottlerImpl {
void processTraceEvent(TraceEvent& te) const {
if (quota.present()) {
te.detail("ProvidedReadTPSLimit", getReadTPSLimit())
.detail("ProvidedWriteTPSLimit", getWriteTPSLimit())
te.detail("ProvidedReadTPSLimit", getReadTPSLimit({}))
.detail("ProvidedWriteTPSLimit", getWriteTPSLimit({}))
.detail("ReadCostRate", totalReadCostRate.smoothTotal())
.detail("WriteCostRate", totalWriteCostRate.smoothTotal())
.detail("TotalReadQuota", quota.get().totalReadQuota)
@ -130,6 +146,47 @@ class GlobalTagThrottlerImpl {
std::map<TransactionTag, QuotaAndCounters> trackedTags;
uint64_t throttledTagChangeId{ 0 };
Future<Void> traceActor;
Optional<double> throttlingRatio;
double getQuotaRatio(TransactionTagRef tag, bool read, bool reserved) const {
int64_t sumQuota{ 0 };
int64_t tagQuota{ 0 };
for (const auto &[tag2, quotaAndCounters] : trackedTags) {
if (!quotaAndCounters.getQuota().present()) {
continue;
}
int64_t quota{ 0 };
if (read) {
if (reserved) {
quota = quotaAndCounters.getQuota().get().reservedReadQuota;
} else {
quota = quotaAndCounters.getQuota().get().totalReadQuota;
}
} else {
if (reserved) {
quota = quotaAndCounters.getQuota().get().reservedWriteQuota;
} else {
quota = quotaAndCounters.getQuota().get().totalWriteQuota;
}
}
sumQuota += quota;
if (tag == tag2) {
tagQuota = quota;
}
}
if (tagQuota == 0) return 0;
ASSERT_GT(sumQuota, 0);
return tagQuota / sumQuota;
}
// Returns the total cost rate (summed across all tags)
double getTotalCostRate(bool read) const {
double result{ 0 };
for (const auto &[tag, quotaAndCounters] : trackedTags) {
result += read ? quotaAndCounters.getReadCostRate() : quotaAndCounters.getWriteCostRate();
}
return result;
}
ACTOR static Future<Void> tracer(GlobalTagThrottlerImpl const* self) {
loop {
@ -186,6 +243,15 @@ class GlobalTagThrottlerImpl {
}
}
Optional<double> getMaxCostRate(TransactionTagRef tag, bool read, bool reserved) const {
if (throttlingRatio.present()) {
auto const desiredTotalCost = throttlingRatio.get() * getTotalCostRate(read);
return desiredTotalCost * getQuotaRatio(tag, read, reserved);
} else {
return {};
}
}
public:
GlobalTagThrottlerImpl(Database db, UID id) : db(db), id(id) { traceActor = tracer(this); }
Future<Void> monitorThrottlingChanges() { return monitorThrottlingChanges(this); }
@ -197,7 +263,7 @@ public:
PrioritizedTransactionTagMap<ClientTagThrottleLimits> result;
for (auto& [tag, quotaAndCounters] : trackedTags) {
// Currently there is no differentiation between batch priority and default priority transactions
auto const limit = quotaAndCounters.updateAndGetPerClientLimit();
auto const limit = quotaAndCounters.updateAndGetPerClientLimit(getMaxCostRate(tag, true, false), getMaxCostRate(tag, false, false));
if (limit.present()) {
result[TransactionPriority::BATCH][tag] = result[TransactionPriority::DEFAULT][tag] = limit.get();
}
@ -214,6 +280,7 @@ public:
return 0;
}
int64_t manualThrottleCount() const { return trackedTags.size(); }
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const& ss) {
for (const auto& busyReadTag : ss.busiestReadTags) {
trackedTags[busyReadTag.tag].updateReadCostRate(ss.id, busyReadTag.rate);
@ -224,6 +291,10 @@ public:
return Void();
}
void setThrottlingRatio(Optional<double> ratio) {
throttlingRatio = ratio;
}
void setQuota(TransactionTagRef tag, ThrottleApi::TagQuotaValue const& tagQuotaValue) {
trackedTags[tag].setQuota(tagQuotaValue);
}
@ -265,6 +336,9 @@ bool GlobalTagThrottler::isAutoThrottlingEnabled() const {
Future<Void> GlobalTagThrottler::tryUpdateAutoThrottling(StorageQueueInfo const& ss) {
return impl->tryUpdateAutoThrottling(ss);
}
void GlobalTagThrottler::setThrottlingRatio(Optional<double> ratio) {
return impl->setThrottlingRatio(ratio);
}
void GlobalTagThrottler::setQuota(TransactionTagRef tag, ThrottleApi::TagQuotaValue const& tagQuotaValue) {
return impl->setQuota(tag, tagQuotaValue);
@ -577,5 +651,25 @@ TEST_CASE("/GlobalTagThrottler/RemoveQuota") {
return Void();
}
// In this test, the throttling ratio remains below 1 indefinitely,
// so the throughput is eventually throttled down to the minimum possible throughput
TEST_CASE("/GlobalTagThrottler/ThrottleToZero") {
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
state ThrottleApi::TagQuotaValue tagQuotaValue;
state TransactionTag testTag = "sampleTag1"_sr;
tagQuotaValue.totalReadQuota = 100.0;
globalTagThrottler.setQuota(testTag, tagQuotaValue);
state Future<Void> client =
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, false);
state Future<Void> monitor =
GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 100.0 / 6.0);
state Future<Void> updater =
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
wait(timeoutError(monitor || client || updater, 300.0));
globalTagThrottler.setThrottlingRatio(0.9);
// 1 is the minimum TPS rate guaranteed:
monitor = GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 1);
wait(timeoutError(monitor || client || updater, 300.0));
return Void();
}

View File

@ -88,6 +88,10 @@ public:
bool isAutoThrottlingEnabled() const override;
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const&) override;
// Based on limiting storage queue size, set a ratio by which total throughput needs to be
// adjusted
void setThrottlingRatio(Optional<double>);
// testing only
public:
void setQuota(TransactionTagRef, ThrottleApi::TagQuotaValue const&);