Add multiclient unit tests for GlobalTagThrottler
This commit is contained in:
parent
1505ef86db
commit
dd08c2b180
|
@ -35,6 +35,7 @@ class GlobalTagThrottlerImpl {
|
|||
Smoother totalReadCostRate;
|
||||
Smoother totalWriteCostRate;
|
||||
Smoother transactionCounter;
|
||||
Smoother perClientRate;
|
||||
|
||||
Optional<double> getReadTPSLimit() const {
|
||||
if (totalReadCostRate.smoothTotal() > 0) {
|
||||
|
@ -56,7 +57,8 @@ class GlobalTagThrottlerImpl {
|
|||
QuotaAndCounters()
|
||||
: totalReadCostRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME),
|
||||
totalWriteCostRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME),
|
||||
transactionCounter(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME) {}
|
||||
transactionCounter(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME),
|
||||
perClientRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME) {}
|
||||
|
||||
void setQuota(ThrottleApi::TagQuotaValue const& quota) { this->quota = quota; }
|
||||
|
||||
|
@ -76,7 +78,7 @@ class GlobalTagThrottlerImpl {
|
|||
|
||||
void addTransactions(int count) { transactionCounter.addDelta(count); }
|
||||
|
||||
Optional<ClientTagThrottleLimits> getTotalLimit() const {
|
||||
Optional<double> getTargetTotalTPSLimit() const {
|
||||
if (!quota.present())
|
||||
return {};
|
||||
auto readLimit = getReadTPSLimit();
|
||||
|
@ -86,15 +88,27 @@ class GlobalTagThrottlerImpl {
|
|||
if (!readLimit.present() && !writeLimit.present()) {
|
||||
return {};
|
||||
} else {
|
||||
auto totalLimit = SERVER_KNOBS->GLOBAL_TAG_THROTTLING_MIN_RATE;
|
||||
if (!readLimit.present()) {
|
||||
totalLimit = std::max(totalLimit, writeLimit.get());
|
||||
return writeLimit.get();
|
||||
} else if (!writeLimit.present()) {
|
||||
totalLimit = std::max(totalLimit, readLimit.get());
|
||||
return readLimit.get();
|
||||
} else {
|
||||
totalLimit = std::max(totalLimit, std::min(readLimit.get(), writeLimit.get()));
|
||||
return std::min(readLimit.get(), writeLimit.get());
|
||||
}
|
||||
return ClientTagThrottleLimits(totalLimit, ClientTagThrottleLimits::NO_EXPIRATION);
|
||||
}
|
||||
}
|
||||
|
||||
Optional<ClientTagThrottleLimits> updateAndGetPerClientLimit() {
|
||||
auto targetRate = getTargetTotalTPSLimit();
|
||||
if (targetRate.present() && transactionCounter.smoothRate() > 0) {
|
||||
auto newPerClientRate = std::max(
|
||||
SERVER_KNOBS->GLOBAL_TAG_THROTTLING_MIN_RATE,
|
||||
std::min(targetRate.get(),
|
||||
(targetRate.get() / transactionCounter.smoothRate()) * perClientRate.smoothTotal()));
|
||||
perClientRate.setTotal(newPerClientRate);
|
||||
return ClientTagThrottleLimits(perClientRate.getTotal(), ClientTagThrottleLimits::NO_EXPIRATION);
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -171,9 +185,9 @@ public:
|
|||
// TODO: For now, only enforce total throttling rates.
|
||||
// We should use reserved quotas as well.
|
||||
PrioritizedTransactionTagMap<ClientTagThrottleLimits> result;
|
||||
for (const auto& [tag, quotaAndCounters] : trackedTags) {
|
||||
for (auto& [tag, quotaAndCounters] : trackedTags) {
|
||||
// Currently there is no differentiation between batch priority and default priority transactions
|
||||
auto const limit = quotaAndCounters.getTotalLimit();
|
||||
auto const limit = quotaAndCounters.updateAndGetPerClientLimit();
|
||||
if (limit.present()) {
|
||||
result[TransactionPriority::BATCH][tag] = result[TransactionPriority::DEFAULT][tag] = limit.get();
|
||||
}
|
||||
|
@ -245,7 +259,9 @@ void GlobalTagThrottler::setQuota(TransactionTagRef tag, ThrottleApi::TagQuotaVa
|
|||
return impl->setQuota(tag, tagQuotaValue);
|
||||
}
|
||||
|
||||
Optional<double> testGetTPSLimit(GlobalTagThrottler& globalTagThrottler, TransactionTag tag) {
|
||||
namespace GlobalTagThrottlerTesting {
|
||||
|
||||
Optional<double> getTPSLimit(GlobalTagThrottler& globalTagThrottler, TransactionTag tag) {
|
||||
auto clientRates = globalTagThrottler.getClientRates();
|
||||
auto it1 = clientRates.find(TransactionPriority::DEFAULT);
|
||||
if (it1 != clientRates.end()) {
|
||||
|
@ -257,8 +273,6 @@ Optional<double> testGetTPSLimit(GlobalTagThrottler& globalTagThrottler, Transac
|
|||
return {};
|
||||
}
|
||||
|
||||
namespace GlobalTagThrottlerTesting {
|
||||
|
||||
class StorageServerCollection {
|
||||
class Cost {
|
||||
Smoother smoother;
|
||||
|
@ -318,7 +332,7 @@ ACTOR static Future<Void> runClient(GlobalTagThrottler* globalTagThrottler,
|
|||
double costPerTransaction,
|
||||
bool write) {
|
||||
loop {
|
||||
auto tpsLimit = testGetTPSLimit(*globalTagThrottler, tag);
|
||||
auto tpsLimit = getTPSLimit(*globalTagThrottler, tag);
|
||||
state double tpsRate = tpsLimit.present() ? std::min<double>(desiredTpsRate, tpsLimit.get()) : desiredTpsRate;
|
||||
wait(delay(1 / tpsRate));
|
||||
if (write) {
|
||||
|
@ -336,7 +350,7 @@ ACTOR static Future<Void> monitorClientRates(GlobalTagThrottler* globalTagThrott
|
|||
state int successes = 0;
|
||||
loop {
|
||||
wait(delay(1.0));
|
||||
auto currentTPSLimit = testGetTPSLimit(*globalTagThrottler, tag);
|
||||
auto currentTPSLimit = getTPSLimit(*globalTagThrottler, tag);
|
||||
if (currentTPSLimit.present()) {
|
||||
TraceEvent("GlobalTagThrottling_RateMonitor")
|
||||
.detail("Tag", tag)
|
||||
|
@ -368,7 +382,7 @@ ACTOR static Future<Void> updateGlobalTagThrottler(GlobalTagThrottler* globalTag
|
|||
|
||||
} // namespace GlobalTagThrottlerTesting
|
||||
|
||||
TEST_CASE("/GlobalTagThrottler/NoActiveThrottling") {
|
||||
TEST_CASE("/GlobalTagThrottler/Simple") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
|
||||
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
|
||||
ThrottleApi::TagQuotaValue tagQuotaValue;
|
||||
|
@ -439,3 +453,40 @@ TEST_CASE("/GlobalTagThrottler/ActiveThrottling") {
|
|||
wait(timeoutError(monitor || client || updater, 300.0));
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/GlobalTagThrottler/MultiClientThrottling") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
|
||||
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
|
||||
ThrottleApi::TagQuotaValue tagQuotaValue;
|
||||
TransactionTag testTag = "sampleTag1"_sr;
|
||||
tagQuotaValue.totalReadQuota = 100.0;
|
||||
globalTagThrottler.setQuota(testTag, tagQuotaValue);
|
||||
state Future<Void> client =
|
||||
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, false);
|
||||
state Future<Void> client2 =
|
||||
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, false);
|
||||
state Future<Void> monitor =
|
||||
GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 100.0 / 6.0);
|
||||
state Future<Void> updater =
|
||||
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
|
||||
wait(timeoutError(monitor || client || updater, 300.0));
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/GlobalTagThrottler/MultiClientActiveThrottling") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
|
||||
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
|
||||
ThrottleApi::TagQuotaValue tagQuotaValue;
|
||||
TransactionTag testTag = "sampleTag1"_sr;
|
||||
tagQuotaValue.totalReadQuota = 100.0;
|
||||
globalTagThrottler.setQuota(testTag, tagQuotaValue);
|
||||
state Future<Void> client =
|
||||
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 20.0, 10.0, false);
|
||||
state Future<Void> client2 =
|
||||
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 20.0, 10.0, false);
|
||||
state Future<Void> monitor = GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 5.0);
|
||||
state Future<Void> updater =
|
||||
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
|
||||
wait(timeoutError(monitor || client || updater, 300.0));
|
||||
return Void();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue