Merge pull request #8444 from sfc-gh-tclinkenbeard/limit-gtt-size
Limit number of tags tracked by `GlobalTagThrottler`
This commit is contained in:
commit
cf935a1915
|
@ -726,6 +726,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( GLOBAL_TAG_THROTTLING_MIN_RATE, 1.0 );
|
||||
init( GLOBAL_TAG_THROTTLING_FOLDING_TIME, 10.0 );
|
||||
init( GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO, 5.0 );
|
||||
init( GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED, 10 );
|
||||
init( GLOBAL_TAG_THROTTLING_TAG_EXPIRE_AFTER, 240.0 );
|
||||
|
||||
//Storage Metrics
|
||||
init( STORAGE_METRICS_AVERAGE_INTERVAL, 120.0 );
|
||||
|
|
|
@ -624,6 +624,12 @@ public:
|
|||
double GLOBAL_TAG_THROTTLING_FOLDING_TIME;
|
||||
// Cost multiplier for writes (because write operations are more expensive than reads)
|
||||
double GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO;
|
||||
// Maximum number of tags tracked by global tag throttler. Additional tags will be ignored
|
||||
// until some existing tags expire
|
||||
int64_t GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED;
|
||||
// Global tag throttler forgets about throughput from a tag once no new transactions from that
|
||||
// tag have been received for this duration (in seconds):
|
||||
int64_t GLOBAL_TAG_THROTTLING_TAG_EXPIRE_AFTER;
|
||||
|
||||
double MAX_TRANSACTIONS_PER_BYTE;
|
||||
|
||||
|
|
|
@ -120,12 +120,13 @@ class GlobalTagThrottlerImpl {
|
|||
Smoother transactionCounter;
|
||||
Smoother perClientRate;
|
||||
Smoother targetRate;
|
||||
double transactionsLastAdded;
|
||||
|
||||
public:
|
||||
explicit PerTagStatistics()
|
||||
: transactionCounter(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME),
|
||||
perClientRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME),
|
||||
targetRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME) {}
|
||||
targetRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME), transactionsLastAdded(now()) {}
|
||||
|
||||
Optional<ThrottleApi::TagQuotaValue> getQuota() const { return quota; }
|
||||
|
||||
|
@ -133,7 +134,10 @@ class GlobalTagThrottlerImpl {
|
|||
|
||||
void clearQuota() { quota = {}; }
|
||||
|
||||
void addTransactions(int count) { transactionCounter.addDelta(count); }
|
||||
void addTransactions(int count) {
|
||||
transactionsLastAdded = now();
|
||||
transactionCounter.addDelta(count);
|
||||
}
|
||||
|
||||
double getTransactionRate() const { return transactionCounter.smoothRate(); }
|
||||
|
||||
|
@ -151,6 +155,10 @@ class GlobalTagThrottlerImpl {
|
|||
targetRate.setTotal(targetTps);
|
||||
return targetRate.smoothTotal();
|
||||
}
|
||||
|
||||
bool recentTransactionsAdded() const {
|
||||
return now() - transactionsLastAdded < SERVER_KNOBS->GLOBAL_TAG_THROTTLING_TAG_EXPIRE_AFTER;
|
||||
}
|
||||
};
|
||||
|
||||
Database db;
|
||||
|
@ -278,7 +286,7 @@ class GlobalTagThrottlerImpl {
|
|||
for (const auto& t : tagsAffectingStorageServer) {
|
||||
auto const tQuota = getQuota(t, LimitType::TOTAL);
|
||||
sumQuota += tQuota.orDefault(0);
|
||||
if (tag.compare(tag) == 0) {
|
||||
if (t.compare(tag) == 0) {
|
||||
tagQuota = tQuota.orDefault(0);
|
||||
}
|
||||
}
|
||||
|
@ -360,6 +368,7 @@ class GlobalTagThrottlerImpl {
|
|||
tagsWithQuota.insert(tag);
|
||||
}
|
||||
self->removeUnseenQuotas(tagsWithQuota);
|
||||
self->removeExpiredTags();
|
||||
++self->throttledTagChangeId;
|
||||
wait(delay(5.0));
|
||||
break;
|
||||
|
@ -397,7 +406,24 @@ class GlobalTagThrottlerImpl {
|
|||
public:
|
||||
GlobalTagThrottlerImpl(Database db, UID id) : db(db), id(id) {}
|
||||
Future<Void> monitorThrottlingChanges() { return monitorThrottlingChanges(this); }
|
||||
void addRequests(TransactionTag tag, int count) { tagStatistics[tag].addTransactions(static_cast<double>(count)); }
|
||||
void addRequests(TransactionTag tag, int count) {
|
||||
auto it = tagStatistics.find(tag);
|
||||
if (it == tagStatistics.end()) {
|
||||
if (tagStatistics.size() == SERVER_KNOBS->GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED) {
|
||||
CODE_PROBE(true,
|
||||
"Global tag throttler ignoring transactions because maximum number of trackable tags has "
|
||||
"been reached");
|
||||
TraceEvent("GlobalTagThrottler_IgnoringRequests")
|
||||
.suppressFor(1.0)
|
||||
.detail("Tag", printable(tag))
|
||||
.detail("Count", count);
|
||||
} else {
|
||||
tagStatistics[tag].addTransactions(static_cast<double>(count));
|
||||
}
|
||||
} else {
|
||||
it->second.addTransactions(static_cast<double>(count));
|
||||
}
|
||||
}
|
||||
uint64_t getThrottledTagChangeId() const { return throttledTagChangeId; }
|
||||
|
||||
TransactionTagMap<double> getProxyRates(int numProxies) {
|
||||
|
@ -465,10 +491,14 @@ public:
|
|||
throttlingRatios[ss.id] = ss.getThrottlingRatio(SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER,
|
||||
SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER);
|
||||
for (const auto& busyReadTag : ss.busiestReadTags) {
|
||||
throughput[ss.id][busyReadTag.tag].updateCost(busyReadTag.rate, OpType::READ);
|
||||
if (tagStatistics.find(busyReadTag.tag) != tagStatistics.end()) {
|
||||
throughput[ss.id][busyReadTag.tag].updateCost(busyReadTag.rate, OpType::READ);
|
||||
}
|
||||
}
|
||||
for (const auto& busyWriteTag : ss.busiestWriteTags) {
|
||||
throughput[ss.id][busyWriteTag.tag].updateCost(busyWriteTag.rate, OpType::WRITE);
|
||||
if (tagStatistics.find(busyWriteTag.tag) != tagStatistics.end()) {
|
||||
throughput[ss.id][busyWriteTag.tag].updateCost(busyWriteTag.rate, OpType::WRITE);
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
@ -478,6 +508,22 @@ public:
|
|||
}
|
||||
|
||||
void removeQuota(TransactionTagRef tag) { tagStatistics[tag].clearQuota(); }
|
||||
|
||||
void removeExpiredTags() {
|
||||
for (auto it = tagStatistics.begin(); it != tagStatistics.end();) {
|
||||
const auto& [tag, stats] = *it;
|
||||
if (!stats.recentTransactionsAdded()) {
|
||||
for (auto& [ss, tagToCounters] : throughput) {
|
||||
tagToCounters.erase(tag);
|
||||
}
|
||||
it = tagStatistics.erase(it);
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t tagsTracked() const { return tagStatistics.size(); }
|
||||
};
|
||||
|
||||
GlobalTagThrottler::GlobalTagThrottler(Database db, UID id) : impl(PImpl<GlobalTagThrottlerImpl>::create(db, id)) {}
|
||||
|
@ -526,6 +572,14 @@ void GlobalTagThrottler::removeQuota(TransactionTagRef tag) {
|
|||
return impl->removeQuota(tag);
|
||||
}
|
||||
|
||||
uint32_t GlobalTagThrottler::tagsTracked() const {
|
||||
return impl->tagsTracked();
|
||||
}
|
||||
|
||||
void GlobalTagThrottler::removeExpiredTags() {
|
||||
return impl->removeExpiredTags();
|
||||
}
|
||||
|
||||
namespace GlobalTagThrottlerTesting {
|
||||
|
||||
enum class LimitType { RESERVED, TOTAL };
|
||||
|
@ -1025,3 +1079,47 @@ TEST_CASE("/GlobalTagThrottler/ReservedQuota") {
|
|||
wait(timeoutError(monitor || client || updater, 600.0));
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Test that tags are expired iff a sufficient amount of time has passed since the
|
||||
// last transaction with that tag
|
||||
TEST_CASE("/GlobalTagThrottler/ExpireTags") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
|
||||
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10, 5);
|
||||
TransactionTag testTag = "sampleTag1"_sr;
|
||||
|
||||
state Future<Void> client =
|
||||
timeout(GlobalTagThrottlerTesting::runClient(
|
||||
&globalTagThrottler, &storageServers, testTag, 10.0, 6.0, GlobalTagThrottlerTesting::OpType::READ),
|
||||
60.0,
|
||||
Void());
|
||||
state Future<Void> updater = timeout(
|
||||
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers), 60.0, Void());
|
||||
wait(client && updater);
|
||||
client.cancel();
|
||||
updater.cancel();
|
||||
ASSERT_EQ(globalTagThrottler.tagsTracked(), 1);
|
||||
globalTagThrottler.removeExpiredTags();
|
||||
ASSERT_EQ(globalTagThrottler.tagsTracked(), 1);
|
||||
wait(delay(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_TAG_EXPIRE_AFTER + 1.0));
|
||||
ASSERT_EQ(globalTagThrottler.tagsTracked(), 1);
|
||||
globalTagThrottler.removeExpiredTags();
|
||||
ASSERT_EQ(globalTagThrottler.tagsTracked(), 0);
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Test that the number of tags tracked does not grow beyond SERVER_KNOBS->GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED
|
||||
TEST_CASE("/GlobalTagThrottler/TagLimit") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
|
||||
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10, 5);
|
||||
std::vector<Future<Void>> futures;
|
||||
for (int i = 0; i < 2 * SERVER_KNOBS->GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED; ++i) {
|
||||
Arena arena;
|
||||
TransactionTag tag = makeString(8, arena);
|
||||
deterministicRandom()->randomBytes(mutateString(tag), tag.size());
|
||||
futures.push_back(GlobalTagThrottlerTesting::runClient(
|
||||
&globalTagThrottler, &storageServers, tag, 1.0, 6.0, GlobalTagThrottlerTesting::OpType::READ));
|
||||
}
|
||||
wait(timeout(waitForAll(futures), 60.0, Void()));
|
||||
ASSERT_EQ(globalTagThrottler.tagsTracked(), SERVER_KNOBS->GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED);
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -100,4 +100,6 @@ public:
|
|||
public:
|
||||
void setQuota(TransactionTagRef, ThrottleApi::TagQuotaValue const&);
|
||||
void removeQuota(TransactionTagRef);
|
||||
void removeExpiredTags();
|
||||
uint32_t tagsTracked() const;
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue