Update tag throttling latency bands in GrvProxyData::updateLatencyBandConfig

This commit is contained in:
sfc-gh-tclinkenbeard 2022-10-26 10:48:51 -07:00
parent ecac8f68e7
commit 08251329a9
3 changed files with 15 additions and 8 deletions

View File

@ -183,6 +183,8 @@ struct GrvProxyData {
Version version;
Version minKnownCommittedVersion; // we should ask master for this version.
GrvProxyTransactionTagThrottler tagThrottler;
// Cache of the latest commit versions of storage servers.
VersionVector ssVersionVectorCache;
@ -195,6 +197,7 @@ struct GrvProxyData {
if (newLatencyBandConfig.present()) {
for (auto band : newLatencyBandConfig.get().grvConfig.bands) {
stats.grvLatencyBands.addThreshold(band);
tagThrottler.addLatencyBandThreshold(band);
}
}
}
@ -363,7 +366,6 @@ ACTOR Future<Void> getRate(UID myID,
GetHealthMetricsReply* detailedHealthMetricsReply,
TransactionTagMap<uint64_t>* transactionTagCounter,
PrioritizedTransactionTagMap<ClientTagThrottleLimits>* clientThrottledTags,
GrvProxyTransactionTagThrottler* tagThrottler,
GrvProxyStats* stats,
GrvProxyData* proxyData) {
state Future<Void> nextRequestTimer = Never();
@ -424,7 +426,7 @@ ACTOR Future<Void> getRate(UID myID,
*clientThrottledTags = std::move(rep.clientThrottledTags.get());
}
if (rep.proxyThrottledTags.present()) {
tagThrottler->updateRates(rep.proxyThrottledTags.get());
proxyData->tagThrottler.updateRates(rep.proxyThrottledTags.get());
}
}
when(wait(leaseTimeout)) {
@ -824,7 +826,6 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
state int64_t batchTransactionCount = 0;
state GrvTransactionRateInfo normalRateInfo(10);
state GrvTransactionRateInfo batchRateInfo(0);
state GrvProxyTransactionTagThrottler tagThrottler;
state SpannedDeque<GetReadVersionRequest> systemQueue("GP:transactionStarterSystemQueue"_loc);
state SpannedDeque<GetReadVersionRequest> defaultQueue("GP:transactionStarterDefaultQueue"_loc);
@ -851,7 +852,6 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
detailedHealthMetricsReply,
&transactionTagCounter,
&clientThrottledTags,
&tagThrottler,
&grvProxyData->stats,
grvProxyData));
addActor.send(queueGetReadVersionRequests(db,
@ -866,7 +866,7 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
&grvProxyData->stats,
&batchRateInfo,
&transactionTagCounter,
&tagThrottler));
&grvProxyData->tagThrottler));
while (std::find(db->get().client.grvProxies.begin(), db->get().client.grvProxies.end(), proxy) ==
db->get().client.grvProxies.end()) {
@ -889,7 +889,7 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
elapsed = 1e-15;
}
tagThrottler.releaseTransactions(elapsed, defaultQueue, batchQueue);
grvProxyData->tagThrottler.releaseTransactions(elapsed, defaultQueue, batchQueue);
normalRateInfo.startReleaseWindow();
batchRateInfo.startReleaseWindow();

View File

@ -219,7 +219,12 @@ void GrvProxyTransactionTagThrottler::releaseTransactions(double elapsed,
ASSERT_EQ(transactionsReleased.capacity(), transactionsReleasedInitialCapacity);
}
uint32_t GrvProxyTransactionTagThrottler::size() {
void GrvProxyTransactionTagThrottler::addLatencyBandThreshold(double value) {
CODE_PROBE(size() > 0, "GrvProxyTransactionTagThrottler adding latency bands while actively throttling");
latencyBandsMap.addThreshold(value);
}
uint32_t GrvProxyTransactionTagThrottler::size() const {
return queues.size();
}

View File

@ -83,7 +83,9 @@ public:
void addRequest(GetReadVersionRequest const&);
void addLatencyBandThreshold(double value);
public: // testing
// Returns number of tags tracked
uint32_t size();
uint32_t size() const;
};