Add /GlobalTagThrottler/MultiTagActiveThrottling2 unit test and fix bug uncovered in GlobalTagThrottlerImpl::getLimitingTps
This commit is contained in:
parent
b4e711f1c3
commit
118eb4d93b
|
@ -312,7 +312,7 @@ class GlobalTagThrottlerImpl {
|
|||
auto const targetTpsForSS = getLimitingTps(id, tag, opType);
|
||||
if (result.present() && targetTpsForSS.present()) {
|
||||
result = std::min(result.get(), targetTpsForSS.get());
|
||||
} else {
|
||||
} else if (targetTpsForSS.present()) {
|
||||
result = targetTpsForSS;
|
||||
}
|
||||
}
|
||||
|
@ -660,17 +660,31 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
void addReadCost(TransactionTag tag, double cost) {
|
||||
auto const costPerSS = cost / storageServers.size();
|
||||
for (auto& storageServer : storageServers) {
|
||||
storageServer.addReadCost(tag, costPerSS);
|
||||
void addReadCost(TransactionTag tag, double cost, std::vector<int> storageServerIndices) {
|
||||
if (storageServerIndices.empty()) {
|
||||
auto const costPerSS = cost / storageServers.size();
|
||||
for (auto& storageServer : storageServers) {
|
||||
storageServer.addReadCost(tag, costPerSS);
|
||||
}
|
||||
} else {
|
||||
auto const costPerSS = cost / storageServerIndices.size();
|
||||
for (auto i : storageServerIndices) {
|
||||
storageServers[i].addReadCost(tag, costPerSS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void addWriteCost(TransactionTag tag, double cost) {
|
||||
auto const costPerSS = cost / storageServers.size();
|
||||
for (auto& storageServer : storageServers) {
|
||||
storageServer.addWriteCost(tag, costPerSS);
|
||||
void addWriteCost(TransactionTag tag, double cost, std::vector<int> storageServerIndices) {
|
||||
if (storageServerIndices.empty()) {
|
||||
auto const costPerSS = cost / storageServers.size();
|
||||
for (auto& storageServer : storageServers) {
|
||||
storageServer.addWriteCost(tag, costPerSS);
|
||||
}
|
||||
} else {
|
||||
auto const costPerSS = cost / storageServerIndices.size();
|
||||
for (auto i : storageServerIndices) {
|
||||
storageServers[i].addWriteCost(tag, costPerSS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -689,15 +703,16 @@ ACTOR static Future<Void> runClient(GlobalTagThrottler* globalTagThrottler,
|
|||
TransactionTag tag,
|
||||
double desiredTpsRate,
|
||||
double costPerTransaction,
|
||||
OpType opType) {
|
||||
OpType opType,
|
||||
std::vector<int> storageServerIndices = std::vector<int>()) {
|
||||
loop {
|
||||
auto tpsLimit = getTPSLimit(*globalTagThrottler, tag);
|
||||
state double tpsRate = tpsLimit.present() ? std::min<double>(desiredTpsRate, tpsLimit.get()) : desiredTpsRate;
|
||||
wait(delay(1 / tpsRate));
|
||||
if (opType == OpType::WRITE) {
|
||||
storageServers->addWriteCost(tag, costPerTransaction);
|
||||
storageServers->addWriteCost(tag, costPerTransaction, storageServerIndices);
|
||||
} else {
|
||||
storageServers->addReadCost(tag, costPerTransaction);
|
||||
storageServers->addReadCost(tag, costPerTransaction, storageServerIndices);
|
||||
}
|
||||
globalTagThrottler->addRequests(tag, 1);
|
||||
}
|
||||
|
@ -983,6 +998,32 @@ TEST_CASE("/GlobalTagThrottler/MultiTagActiveThrottling") {
|
|||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/GlobalTagThrottler/MultiTagActiveThrottling2") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
|
||||
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(3, 50);
|
||||
state ThrottleApi::TagQuotaValue tagQuotaValue1;
|
||||
state ThrottleApi::TagQuotaValue tagQuotaValue2;
|
||||
TransactionTag testTag1 = "sampleTag1"_sr;
|
||||
TransactionTag testTag2 = "sampleTag2"_sr;
|
||||
tagQuotaValue1.totalReadQuota = tagQuotaValue1.totalWriteQuota = 100.0;
|
||||
tagQuotaValue2.totalReadQuota = tagQuotaValue2.totalWriteQuota = 100.0;
|
||||
globalTagThrottler.setQuota(testTag1, tagQuotaValue1);
|
||||
globalTagThrottler.setQuota(testTag2, tagQuotaValue2);
|
||||
std::vector<Future<Void>> futures;
|
||||
futures.push_back(GlobalTagThrottlerTesting::runClient(
|
||||
&globalTagThrottler, &storageServers, testTag1, 10.0, 6.0, OpType::READ, { 0, 1 }));
|
||||
futures.push_back(GlobalTagThrottlerTesting::runClient(
|
||||
&globalTagThrottler, &storageServers, testTag2, 10.0, 6.0, OpType::READ, { 1, 2 }));
|
||||
state Future<Void> monitor =
|
||||
GlobalTagThrottlerTesting::monitor(&globalTagThrottler, [testTag1, testTag2](auto& gtt) {
|
||||
return GlobalTagThrottlerTesting::rateIsNear(gtt, testTag1, 50 / 6.0) &&
|
||||
GlobalTagThrottlerTesting::rateIsNear(gtt, testTag2, 50 / 6.0) && gtt.busyReadTagCount() == 2;
|
||||
});
|
||||
futures.push_back(GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers));
|
||||
wait(timeoutError(waitForAny(futures) || monitor, 300.0));
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/GlobalTagThrottler/ReservedReadQuota") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
|
||||
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10, 5);
|
||||
|
|
Loading…
Reference in New Issue