Guarantee reserved quota in GlobalTagThrottler
This commit is contained in:
parent
8e08033974
commit
5b0eb135e0
|
@ -43,7 +43,10 @@ class GlobalTagThrottlerImpl {
|
|||
|
||||
Optional<double> getReadTPSLimit(Optional<double> maxDesiredCost) const {
|
||||
if (totalReadCostRate.smoothTotal() > 0) {
|
||||
auto const desiredReadCost = maxDesiredCost.present() ? std::min(maxDesiredCost.get(), quota.get().totalReadQuota) : quota.get().totalReadQuota;
|
||||
auto const desiredReadCost =
|
||||
std::max(quota.get().reservedReadQuota,
|
||||
maxDesiredCost.present() ? std::min(maxDesiredCost.get(), quota.get().totalReadQuota)
|
||||
: quota.get().totalReadQuota);
|
||||
auto const averageCostPerTransaction =
|
||||
totalReadCostRate.smoothTotal() / transactionCounter.smoothRate();
|
||||
return desiredReadCost / averageCostPerTransaction;
|
||||
|
@ -54,7 +57,10 @@ class GlobalTagThrottlerImpl {
|
|||
|
||||
Optional<double> getWriteTPSLimit(Optional<double> maxDesiredCost) const {
|
||||
if (totalWriteCostRate.smoothTotal() > 0) {
|
||||
auto const desiredWriteCost = maxDesiredCost.present() ? std::min(maxDesiredCost.get(), quota.get().totalWriteQuota) : quota.get().totalWriteQuota;
|
||||
auto const desiredWriteCost =
|
||||
std::max(quota.get().reservedWriteQuota,
|
||||
maxDesiredCost.present() ? std::min(maxDesiredCost.get(), quota.get().totalWriteQuota)
|
||||
: quota.get().totalWriteQuota);
|
||||
auto const averageCostPerTransaction = transactionCounter.smoothRate() / totalWriteCostRate.smoothTotal();
|
||||
return desiredWriteCost * averageCostPerTransaction;
|
||||
} else {
|
||||
|
@ -153,7 +159,7 @@ class GlobalTagThrottlerImpl {
|
|||
Future<Void> traceActor;
|
||||
Optional<double> throttlingRatio;
|
||||
|
||||
double getQuotaRatio(TransactionTagRef tag, OpType opType, LimitType limitType) const {
|
||||
double getQuotaRatio(TransactionTagRef tag, OpType opType) const {
|
||||
double sumQuota{ 0.0 };
|
||||
double tagQuota{ 0.0 };
|
||||
for (const auto &[tag2, quotaAndCounters] : trackedTags) {
|
||||
|
@ -162,17 +168,9 @@ class GlobalTagThrottlerImpl {
|
|||
}
|
||||
int64_t quota{ 0 };
|
||||
if (opType == OpType::READ) {
|
||||
if (limitType == LimitType::RESERVED) {
|
||||
quota = quotaAndCounters.getQuota().get().reservedReadQuota;
|
||||
} else {
|
||||
quota = quotaAndCounters.getQuota().get().totalReadQuota;
|
||||
}
|
||||
quota = quotaAndCounters.getQuota().get().totalReadQuota;
|
||||
} else {
|
||||
if (limitType == LimitType::RESERVED) {
|
||||
quota = quotaAndCounters.getQuota().get().reservedWriteQuota;
|
||||
} else {
|
||||
quota = quotaAndCounters.getQuota().get().totalWriteQuota;
|
||||
}
|
||||
quota = quotaAndCounters.getQuota().get().totalWriteQuota;
|
||||
}
|
||||
sumQuota += quota;
|
||||
if (tag.compare(tag2) == 0) {
|
||||
|
@ -249,10 +247,10 @@ class GlobalTagThrottlerImpl {
|
|||
}
|
||||
}
|
||||
|
||||
Optional<double> getMaxCostRate(TransactionTagRef tag, OpType opType, LimitType limitType) const {
|
||||
Optional<double> getMaxCostRate(TransactionTagRef tag, OpType opType) const {
|
||||
if (throttlingRatio.present()) {
|
||||
auto const desiredTotalCostRate = throttlingRatio.get() * getTotalCostRate(opType);
|
||||
return desiredTotalCostRate * getQuotaRatio(tag, opType, limitType);
|
||||
return desiredTotalCostRate * getQuotaRatio(tag, opType);
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
|
@ -264,14 +262,11 @@ public:
|
|||
void addRequests(TransactionTag tag, int count) { trackedTags[tag].addTransactions(count); }
|
||||
uint64_t getThrottledTagChangeId() const { return throttledTagChangeId; }
|
||||
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() {
|
||||
// TODO: For now, only enforce total throttling rates.
|
||||
// We should use reserved quotas as well.
|
||||
PrioritizedTransactionTagMap<ClientTagThrottleLimits> result;
|
||||
for (auto& [tag, quotaAndCounters] : trackedTags) {
|
||||
// Currently there is no differentiation between batch priority and default priority transactions
|
||||
auto const limit =
|
||||
quotaAndCounters.updateAndGetPerClientLimit(getMaxCostRate(tag, OpType::READ, LimitType::TOTAL),
|
||||
getMaxCostRate(tag, OpType::WRITE, LimitType::TOTAL));
|
||||
auto const limit = quotaAndCounters.updateAndGetPerClientLimit(getMaxCostRate(tag, OpType::READ),
|
||||
getMaxCostRate(tag, OpType::WRITE));
|
||||
if (limit.present()) {
|
||||
result[TransactionPriority::BATCH][tag] = result[TransactionPriority::DEFAULT][tag] = limit.get();
|
||||
}
|
||||
|
@ -752,3 +747,20 @@ TEST_CASE("/GlobalTagThrottler/MultiTagActiveThrottling") {
|
|||
wait(timeoutError(waitForAny(futures) || (monitor1 && monitor2), 300.0));
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/GlobalTagThrottler/ReservedQuota") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
|
||||
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10, 5);
|
||||
state ThrottleApi::TagQuotaValue tagQuotaValue;
|
||||
state TransactionTag testTag = "sampleTag1"_sr;
|
||||
tagQuotaValue.totalReadQuota = 100.0;
|
||||
tagQuotaValue.reservedReadQuota = 70.0;
|
||||
globalTagThrottler.setQuota(testTag, tagQuotaValue);
|
||||
state Future<Void> client =
|
||||
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 10.0, 6.0, false);
|
||||
state Future<Void> monitor = GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 70 / 6.0);
|
||||
state Future<Void> updater =
|
||||
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
|
||||
wait(timeoutError(monitor || client || updater, 300.0));
|
||||
return Void();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue