Add /GlobalTagThrottler/MultiTagActiveThrottling3 unit test

This commit is contained in:
sfc-gh-tclinkenbeard 2022-07-19 09:03:21 -07:00
parent 118eb4d93b
commit f197314cb5
1 changed files with 29 additions and 15 deletions

View File

@ -319,18 +319,6 @@ class GlobalTagThrottlerImpl {
return result;
}
Optional<double> getLimitingTps(TransactionTag tag) const {
auto const readLimitingTps = getLimitingTps(tag, OpType::READ);
auto const writeLimitingTps = getLimitingTps(tag, OpType::WRITE);
if (readLimitingTps.present() && writeLimitingTps.present()) {
return std::min(readLimitingTps.get(), writeLimitingTps.get());
} else if (readLimitingTps.present()) {
return readLimitingTps;
} else {
return writeLimitingTps;
}
}
Optional<double> getDesiredTps(TransactionTag tag, OpType opType, double averageTransactionCost) const {
auto const quota = getQuota(tag, opType, LimitType::TOTAL);
if (!quota.present()) {
@ -701,14 +689,14 @@ public:
ACTOR static Future<Void> runClient(GlobalTagThrottler* globalTagThrottler,
StorageServerCollection* storageServers,
TransactionTag tag,
double desiredTpsRate,
double tpsRate,
double costPerTransaction,
OpType opType,
std::vector<int> storageServerIndices = std::vector<int>()) {
loop {
auto tpsLimit = getTPSLimit(*globalTagThrottler, tag);
state double tpsRate = tpsLimit.present() ? std::min<double>(desiredTpsRate, tpsLimit.get()) : desiredTpsRate;
wait(delay(1 / tpsRate));
state double enforcedRate = tpsLimit.present() ? std::min<double>(tpsRate, tpsLimit.get()) : tpsRate;
wait(delay(1 / enforcedRate));
if (opType == OpType::WRITE) {
storageServers->addWriteCost(tag, costPerTransaction, storageServerIndices);
} else {
@ -1024,6 +1012,32 @@ TEST_CASE("/GlobalTagThrottler/MultiTagActiveThrottling2") {
return Void();
}
TEST_CASE("/GlobalTagThrottler/MultiTagActiveThrottling3") {
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(3, 50);
state ThrottleApi::TagQuotaValue tagQuotaValue1;
state ThrottleApi::TagQuotaValue tagQuotaValue2;
TransactionTag testTag1 = "sampleTag1"_sr;
TransactionTag testTag2 = "sampleTag2"_sr;
tagQuotaValue1.totalReadQuota = tagQuotaValue1.totalWriteQuota = 100.0;
tagQuotaValue2.totalReadQuota = tagQuotaValue2.totalWriteQuota = 100.0;
globalTagThrottler.setQuota(testTag1, tagQuotaValue1);
globalTagThrottler.setQuota(testTag2, tagQuotaValue2);
std::vector<Future<Void>> futures;
futures.push_back(GlobalTagThrottlerTesting::runClient(
&globalTagThrottler, &storageServers, testTag1, 10.0, 6.0, OpType::READ, { 0 }));
futures.push_back(GlobalTagThrottlerTesting::runClient(
&globalTagThrottler, &storageServers, testTag2, 10.0, 6.0, OpType::READ, { 1, 2 }));
state Future<Void> monitor =
GlobalTagThrottlerTesting::monitor(&globalTagThrottler, [testTag1, testTag2](auto& gtt) {
return GlobalTagThrottlerTesting::rateIsNear(gtt, testTag1, 50 / 6.0) &&
GlobalTagThrottlerTesting::rateIsNear(gtt, testTag2, 100 / 6.0) && gtt.busyReadTagCount() == 1;
});
futures.push_back(GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers));
wait(timeoutError(waitForAny(futures) || monitor, 300.0));
return Void();
}
TEST_CASE("/GlobalTagThrottler/ReservedReadQuota") {
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10, 5);