diff --git a/fdbserver/GlobalTagThrottler.actor.cpp b/fdbserver/GlobalTagThrottler.actor.cpp index b415aec795..00cde6eafc 100644 --- a/fdbserver/GlobalTagThrottler.actor.cpp +++ b/fdbserver/GlobalTagThrottler.actor.cpp @@ -43,7 +43,10 @@ class GlobalTagThrottlerImpl { Optional getReadTPSLimit(Optional maxDesiredCost) const { if (totalReadCostRate.smoothTotal() > 0) { - auto const desiredReadCost = maxDesiredCost.present() ? std::min(maxDesiredCost.get(), quota.get().totalReadQuota) : quota.get().totalReadQuota; + auto const desiredReadCost = + std::max(quota.get().reservedReadQuota, + maxDesiredCost.present() ? std::min(maxDesiredCost.get(), quota.get().totalReadQuota) + : quota.get().totalReadQuota); auto const averageCostPerTransaction = totalReadCostRate.smoothTotal() / transactionCounter.smoothRate(); return desiredReadCost / averageCostPerTransaction; @@ -54,7 +57,10 @@ class GlobalTagThrottlerImpl { Optional getWriteTPSLimit(Optional maxDesiredCost) const { if (totalWriteCostRate.smoothTotal() > 0) { - auto const desiredWriteCost = maxDesiredCost.present() ? std::min(maxDesiredCost.get(), quota.get().totalWriteQuota) : quota.get().totalWriteQuota; + auto const desiredWriteCost = + std::max(quota.get().reservedWriteQuota, + maxDesiredCost.present() ? std::min(maxDesiredCost.get(), quota.get().totalWriteQuota) + : quota.get().totalWriteQuota); auto const averageCostPerTransaction = transactionCounter.smoothRate() / totalWriteCostRate.smoothTotal(); return desiredWriteCost * averageCostPerTransaction; } else { @@ -153,7 +159,7 @@ class GlobalTagThrottlerImpl { Future traceActor; Optional throttlingRatio; - double getQuotaRatio(TransactionTagRef tag, OpType opType, LimitType limitType) const { + double getQuotaRatio(TransactionTagRef tag, OpType opType) const { double sumQuota{ 0.0 }; double tagQuota{ 0.0 }; for (const auto &[tag2, quotaAndCounters] : trackedTags) { @@ -162,17 +168,9 @@ class GlobalTagThrottlerImpl { } int64_t quota{ 0 }; if (opType == OpType::READ) { - if (limitType == LimitType::RESERVED) { - quota = quotaAndCounters.getQuota().get().reservedReadQuota; - } else { - quota = quotaAndCounters.getQuota().get().totalReadQuota; - } + quota = quotaAndCounters.getQuota().get().totalReadQuota; } else { - if (limitType == LimitType::RESERVED) { - quota = quotaAndCounters.getQuota().get().reservedWriteQuota; - } else { - quota = quotaAndCounters.getQuota().get().totalWriteQuota; - } + quota = quotaAndCounters.getQuota().get().totalWriteQuota; } sumQuota += quota; if (tag.compare(tag2) == 0) { @@ -249,10 +247,10 @@ class GlobalTagThrottlerImpl { } } - Optional getMaxCostRate(TransactionTagRef tag, OpType opType, LimitType limitType) const { + Optional getMaxCostRate(TransactionTagRef tag, OpType opType) const { if (throttlingRatio.present()) { auto const desiredTotalCostRate = throttlingRatio.get() * getTotalCostRate(opType); - return desiredTotalCostRate * getQuotaRatio(tag, opType, limitType); + return desiredTotalCostRate * getQuotaRatio(tag, opType); } else { return {}; } @@ -264,14 +262,11 @@ public: void addRequests(TransactionTag tag, int count) { trackedTags[tag].addTransactions(count); } uint64_t getThrottledTagChangeId() const { return throttledTagChangeId; } PrioritizedTransactionTagMap getClientRates() { - // TODO: For now, only enforce total throttling rates. - // We should use reserved quotas as well. PrioritizedTransactionTagMap result; for (auto& [tag, quotaAndCounters] : trackedTags) { // Currently there is no differentiation between batch priority and default priority transactions - auto const limit = - quotaAndCounters.updateAndGetPerClientLimit(getMaxCostRate(tag, OpType::READ, LimitType::TOTAL), - getMaxCostRate(tag, OpType::WRITE, LimitType::TOTAL)); + auto const limit = quotaAndCounters.updateAndGetPerClientLimit(getMaxCostRate(tag, OpType::READ), + getMaxCostRate(tag, OpType::WRITE)); if (limit.present()) { result[TransactionPriority::BATCH][tag] = result[TransactionPriority::DEFAULT][tag] = limit.get(); } @@ -752,3 +747,20 @@ TEST_CASE("/GlobalTagThrottler/MultiTagActiveThrottling") { wait(timeoutError(waitForAny(futures) || (monitor1 && monitor2), 300.0)); return Void(); } + +TEST_CASE("/GlobalTagThrottler/ReservedQuota") { + state GlobalTagThrottler globalTagThrottler(Database{}, UID{}); + state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10, 5); + state ThrottleApi::TagQuotaValue tagQuotaValue; + state TransactionTag testTag = "sampleTag1"_sr; + tagQuotaValue.totalReadQuota = 100.0; + tagQuotaValue.reservedReadQuota = 70.0; + globalTagThrottler.setQuota(testTag, tagQuotaValue); + state Future client = + GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 10.0, 6.0, false); + state Future monitor = GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 70 / 6.0); + state Future updater = + GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers); + wait(timeoutError(monitor || client || updater, 300.0)); + return Void(); +}