diff --git a/fdbserver/GlobalTagThrottler.actor.cpp b/fdbserver/GlobalTagThrottler.actor.cpp index fa33bd8ff2..3967868186 100644 --- a/fdbserver/GlobalTagThrottler.actor.cpp +++ b/fdbserver/GlobalTagThrottler.actor.cpp @@ -647,12 +647,13 @@ class MockStorageServer { }; UID id; - double targetCost; + // bytes/second that this storage server can handle + double capacity; std::map readCosts, writeCosts; Cost totalReadCost, totalWriteCost; public: - explicit MockStorageServer(UID id, double targetCost) : id(id), targetCost(targetCost) { ASSERT_GT(targetCost, 0); } + explicit MockStorageServer(UID id, double capacity) : id(id), capacity(capacity) { ASSERT_GT(capacity, 0); } void addReadCost(TransactionTag tag, double cost) { readCosts[tag] += cost; totalReadCost += cost; @@ -662,8 +663,10 @@ public: totalWriteCost += cost; } + void setCapacity(double value) { capacity = value; } + StorageQueueInfo getStorageQueueInfo() const { - StorageQueueInfo result(id, LocalityData{}); + StorageQueueInfo result(id, LocalityData({}, Value(id.toString()), {}, {})); for (const auto& [tag, readCost] : readCosts) { double fractionalBusyness{ 0.0 }; // unused for global tag throttling result.busiestReadTags.emplace_back(tag, readCost.smoothRate(), fractionalBusyness); @@ -672,7 +675,7 @@ public: double fractionalBusyness{ 0.0 }; // unused for global tag throttling result.busiestWriteTags.emplace_back(tag, writeCost.smoothRate(), fractionalBusyness); } - result.lastReply.bytesInput = ((totalReadCost.smoothRate() + totalWriteCost.smoothRate()) / targetCost) * + result.lastReply.bytesInput = ((totalReadCost.smoothRate() + totalWriteCost.smoothRate()) / capacity) * SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER; return result; } @@ -712,6 +715,8 @@ public: } } + void setCapacity(int index, double value) { storageServers[index].setCapacity(value); } + std::vector getStorageQueueInfos() const { std::vector result; result.reserve(storageServers.size()); @@ -1176,3 +1181,30 @@ TEST_CASE("/GlobalTagThrottler/TagLimit") { ASSERT_EQ(globalTagThrottler.tagsTracked(), SERVER_KNOBS->GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED); return Void(); } + +// 9 storage servers can handle 100 bytes/second each. +// 1 unhealthy storage server can only handle 1 byte/second. +// Total quota is set to 100 bytes/second. +// Client attempts 5 6-byte transactions per second. +// Target rate adjusts to 100/6 transactions per second, ignoring the worst storage server. +// Then, a second storage server becomes unhealthy and can only handle 1 byte/second. +// Target rate adjusts down to 1/6 transactions per second, because only one bad zone can be ignored. +TEST_CASE("/GlobalTagThrottler/IgnoreWorstZone") { + state GlobalTagThrottler globalTagThrottler(Database{}, UID{}, 1); + state StorageServerCollection storageServers(10, 100); + state TransactionTag testTag = "sampleTag1"_sr; + storageServers.setCapacity(0, 1); + ThrottleApi::TagQuotaValue tagQuotaValue; + tagQuotaValue.totalQuota = 100.0; + globalTagThrottler.setQuota(testTag, tagQuotaValue); + state Future client = runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, OpType::READ); + state Future monitor = monitorActor( + &globalTagThrottler, [](auto& gtt) { return targetRateIsNear(gtt, "sampleTag1"_sr, 100.0 / 6.0); }); + state Future updater = updateGlobalTagThrottler(&globalTagThrottler, &storageServers); + wait(timeoutError(monitor || client || updater, 600.0)); + storageServers.setCapacity(1, 1); + monitor = + monitorActor(&globalTagThrottler, [](auto& gtt) { return targetRateIsNear(gtt, "sampleTag1"_sr, 1.0 / 6.0); }); + wait(timeoutError(monitor || client || updater, 600.0)); + return Void(); +}