Add /GlobalTagThrottler/WriteThrottling unit test
This commit is contained in:
parent
a423fc77db
commit
d692516f64
|
@ -258,22 +258,23 @@ Optional<double> testGetTPSLimit(GlobalTagThrottler& globalTagThrottler, Transac
|
|||
}
|
||||
|
||||
class TestStorageServers {
|
||||
class ReadCost {
|
||||
class Cost {
|
||||
Smoother smoother;
|
||||
|
||||
public:
|
||||
ReadCost() : smoother(5.0) {}
|
||||
ReadCost& operator+=(double delta) {
|
||||
Cost() : smoother(5.0) {}
|
||||
Cost& operator+=(double delta) {
|
||||
smoother.addDelta(delta);
|
||||
return *this;
|
||||
}
|
||||
double smoothRate() const { return smoother.smoothRate(); }
|
||||
};
|
||||
|
||||
std::vector<std::map<TransactionTag, ReadCost>> readCosts;
|
||||
std::vector<std::map<TransactionTag, Cost>> readCosts;
|
||||
std::vector<std::map<TransactionTag, Cost>> writeCosts;
|
||||
|
||||
public:
|
||||
TestStorageServers(size_t size) : readCosts(size) { ASSERT_GT(size, 0); }
|
||||
TestStorageServers(size_t size) : readCosts(size), writeCosts(size) { ASSERT_GT(size, 0); }
|
||||
|
||||
void addReadCost(TransactionTag tag, double cost) {
|
||||
auto const costPerSS = cost / readCosts.size();
|
||||
|
@ -282,6 +283,13 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
void addWriteCost(TransactionTag tag, double cost) {
|
||||
auto const costPerSS = cost / writeCosts.size();
|
||||
for (auto& writeCost : writeCosts) {
|
||||
writeCost[tag] += costPerSS;
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<StorageQueueInfo> getStorageQueueInfos() const {
|
||||
std::vector<StorageQueueInfo> result;
|
||||
result.reserve(readCosts.size());
|
||||
|
@ -291,6 +299,10 @@ public:
|
|||
double fractionalBusyness{ 0.0 }; // unused for global tag throttling
|
||||
sqInfo.busiestReadTags.emplace_back(tag, readCost.smoothRate(), fractionalBusyness);
|
||||
}
|
||||
for (const auto& [tag, writeCost] : writeCosts[i]) {
|
||||
double fractionalBusyness{ 0.0 }; // unused for global tag throttling
|
||||
sqInfo.busiestWriteTags.emplace_back(tag, writeCost.smoothRate(), fractionalBusyness);
|
||||
}
|
||||
result.push_back(sqInfo);
|
||||
}
|
||||
return result;
|
||||
|
@ -301,12 +313,17 @@ ACTOR static Future<Void> testClient(GlobalTagThrottler* globalTagThrottler,
|
|||
TestStorageServers* testStorageServers,
|
||||
TransactionTag tag,
|
||||
double desiredTpsRate,
|
||||
double costPerTransaction) {
|
||||
double costPerTransaction,
|
||||
bool write) {
|
||||
loop {
|
||||
auto tpsLimit = testGetTPSLimit(*globalTagThrottler, tag);
|
||||
state double tpsRate = tpsLimit.present() ? std::min<double>(desiredTpsRate, tpsLimit.get()) : desiredTpsRate;
|
||||
wait(delay(1 / tpsRate));
|
||||
testStorageServers->addReadCost(tag, costPerTransaction);
|
||||
if (write) {
|
||||
testStorageServers->addWriteCost(tag, costPerTransaction);
|
||||
} else {
|
||||
testStorageServers->addReadCost(tag, costPerTransaction);
|
||||
}
|
||||
globalTagThrottler->addRequests(tag, 1);
|
||||
}
|
||||
}
|
||||
|
@ -354,7 +371,21 @@ TEST_CASE("/GlobalTagThrottler/NoActiveThrottling") {
|
|||
TransactionTag testTag = "sampleTag1"_sr;
|
||||
tagQuotaValue.totalReadQuota = 100.0;
|
||||
globalTagThrottler.setQuota(testTag, tagQuotaValue);
|
||||
state Future<Void> client = testClient(&globalTagThrottler, &testStorageServers, testTag, 5.0, 6.0);
|
||||
state Future<Void> client = testClient(&globalTagThrottler, &testStorageServers, testTag, 5.0, 6.0, false);
|
||||
state Future<Void> monitor = monitorClientRates(&globalTagThrottler, testTag, 100.0 / 6.0);
|
||||
state Future<Void> updater = updateGlobalTagThrottler(&globalTagThrottler, &testStorageServers);
|
||||
wait(timeoutError(monitor || client || updater, 300.0));
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/GlobalTagThrottler/WriteThrottling") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
|
||||
state TestStorageServers testStorageServers(10);
|
||||
ThrottleApi::TagQuotaValue tagQuotaValue;
|
||||
TransactionTag testTag = "sampleTag1"_sr;
|
||||
tagQuotaValue.totalWriteQuota = 100.0;
|
||||
globalTagThrottler.setQuota(testTag, tagQuotaValue);
|
||||
state Future<Void> client = testClient(&globalTagThrottler, &testStorageServers, testTag, 5.0, 6.0, true);
|
||||
state Future<Void> monitor = monitorClientRates(&globalTagThrottler, testTag, 100.0 / 6.0);
|
||||
state Future<Void> updater = updateGlobalTagThrottler(&globalTagThrottler, &testStorageServers);
|
||||
wait(timeoutError(monitor || client || updater, 300.0));
|
||||
|
@ -368,7 +399,7 @@ TEST_CASE("/GlobalTagThrottler/ActiveThrottling") {
|
|||
TransactionTag testTag = "sampleTag1"_sr;
|
||||
tagQuotaValue.totalReadQuota = 100.0;
|
||||
globalTagThrottler.setQuota(testTag, tagQuotaValue);
|
||||
state Future<Void> client = testClient(&globalTagThrottler, &testStorageServers, testTag, 20.0, 10.0);
|
||||
state Future<Void> client = testClient(&globalTagThrottler, &testStorageServers, testTag, 20.0, 10.0, false);
|
||||
state Future<Void> monitor = monitorClientRates(&globalTagThrottler, testTag, 10.0);
|
||||
state Future<Void> updater = updateGlobalTagThrottler(&globalTagThrottler, &testStorageServers);
|
||||
wait(timeoutError(monitor || client || updater, 300.0));
|
||||
|
|
Loading…
Reference in New Issue