Send tag throttle updates from ratekeeper to proxy only when they change
This commit is contained in:
parent
444b338f63
commit
d5fb4d26fe
|
@ -216,11 +216,8 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
|
|||
lastDetailedReply = now();
|
||||
}
|
||||
|
||||
for(auto priorityTags : rep.throttledTags) {
|
||||
auto &localPriorityTags = (*throttledTags)[priorityTags.first];
|
||||
for(auto tag : priorityTags.second) { // TODO: remove missing tags
|
||||
localPriorityTags[tag.first] = tag.second;
|
||||
}
|
||||
if(rep.throttledTags.present()) {
|
||||
*throttledTags = rep.throttledTags;
|
||||
}
|
||||
}
|
||||
when ( wait( leaseTimeout ) ) {
|
||||
|
@ -257,7 +254,7 @@ ACTOR Future<Void> queueTransactionStartRequests(
|
|||
req.reply.send(rep);
|
||||
TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceeded").suppressFor(60);
|
||||
} else {
|
||||
// TODO: this probably needs to happen outside the high priority path
|
||||
// TODO: check whether this is reasonable to do in the fast path
|
||||
for(auto tag : req.tags) {
|
||||
(*transactionTagCounter)[tag.first] += tag.second;
|
||||
}
|
||||
|
@ -1381,9 +1378,8 @@ ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::
|
|||
|
||||
reply.tagThrottleInfo.clear();
|
||||
|
||||
auto& priorityThrottledTags = throttledTags[ThrottleApi::priorityFromReadVersionFlags(request.flags)];
|
||||
for(auto tag : request.tags) {
|
||||
auto& priorityThrottledTags = throttledTags[ThrottleApi::priorityFromReadVersionFlags(request.flags)];
|
||||
|
||||
auto tagItr = priorityThrottledTags.find(tag.first);
|
||||
if(tagItr != priorityThrottledTags.end()) {
|
||||
reply.tagThrottleInfo[tag.first] = tagItr->second;
|
||||
|
|
|
@ -284,13 +284,14 @@ struct RatekeeperLimits {
|
|||
{}
|
||||
};
|
||||
|
||||
struct TransactionCounts {
|
||||
int64_t total;
|
||||
int64_t batch;
|
||||
struct ProxyInfo {
|
||||
int64_t totalTransactions;
|
||||
int64_t batchTransactions;
|
||||
uint64_t lastThrottledTagChangeId;
|
||||
|
||||
double time;
|
||||
double lastUpdateTime;
|
||||
|
||||
TransactionCounts() : total(0), batch(0), time(0) {}
|
||||
ProxyInfo() : totalTransactions(0), batchTransactions(0), lastUpdateTime(0), lastThrottledTagChangeId(0) {}
|
||||
};
|
||||
|
||||
struct RatekeeperData {
|
||||
|
@ -299,7 +300,7 @@ struct RatekeeperData {
|
|||
Map<UID, StorageQueueInfo> storageQueueInfo;
|
||||
Map<UID, TLogQueueInfo> tlogQueueInfo;
|
||||
|
||||
std::map<UID, TransactionCounts> proxy_transactionCounts;
|
||||
std::map<UID, ProxyInfo> proxyInfo;
|
||||
Smoother smoothReleasedTransactions, smoothBatchReleasedTransactions, smoothTotalDurableBytes;
|
||||
HealthMetrics healthMetrics;
|
||||
DatabaseConfiguration configuration;
|
||||
|
@ -311,6 +312,7 @@ struct RatekeeperData {
|
|||
double lastSSListFetchedTimestamp;
|
||||
|
||||
TransactionTagMap<RkTagThrottleData> throttledTags;
|
||||
uint64_t throttledTagChangeId;
|
||||
|
||||
RatekeeperLimits normalLimits;
|
||||
RatekeeperLimits batchLimits;
|
||||
|
@ -322,7 +324,7 @@ struct RatekeeperData {
|
|||
|
||||
RatekeeperData(Database db) : db(db), smoothReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothBatchReleasedTransactions(SERVER_KNOBS->SMOOTHING_AMOUNT), smoothTotalDurableBytes(SERVER_KNOBS->SLOW_SMOOTHING_AMOUNT),
|
||||
actualTpsMetric(LiteralStringRef("Ratekeeper.ActualTPS")),
|
||||
lastWarning(0), lastSSListFetchedTimestamp(now()),
|
||||
lastWarning(0), lastSSListFetchedTimestamp(now()), throttledTagChangeId(0),
|
||||
normalLimits("", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER, SERVER_KNOBS->TARGET_BYTES_PER_TLOG, SERVER_KNOBS->SPRING_BYTES_TLOG, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE, SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS),
|
||||
batchLimits("Batch", SERVER_KNOBS->TARGET_BYTES_PER_STORAGE_SERVER_BATCH, SERVER_KNOBS->SPRING_BYTES_STORAGE_SERVER_BATCH, SERVER_KNOBS->TARGET_BYTES_PER_TLOG_BATCH, SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH, SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH),
|
||||
autoThrottlingEnabled(false)
|
||||
|
@ -573,6 +575,7 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
|
|||
}
|
||||
|
||||
self->throttledTags = std::move(updatedTagThrottles);
|
||||
++self->throttledTagChangeId;
|
||||
|
||||
state Future<Void> watchFuture = tr.watch(tagThrottleSignalKey);
|
||||
wait(tr.commit());
|
||||
|
@ -931,7 +934,7 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits, TransactionTagMa
|
|||
.detail("ReleasedBatchTPS", self->smoothBatchReleasedTransactions.smoothRate())
|
||||
.detail("TPSBasis", actualTps)
|
||||
.detail("StorageServers", sscount)
|
||||
.detail("Proxies", self->proxy_transactionCounts.size())
|
||||
.detail("Proxies", self->proxyInfo.size())
|
||||
.detail("TLogs", tlcount)
|
||||
.detail("WorstFreeSpaceStorageServer", worstFreeSpaceStorageServer)
|
||||
.detail("WorstFreeSpaceTLog", worstFreeSpaceTLog)
|
||||
|
@ -1011,9 +1014,9 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
|
|||
|
||||
lastLimited = self.smoothReleasedTransactions.smoothRate() > SERVER_KNOBS->LAST_LIMITED_RATIO * self.batchLimits.tpsLimit;
|
||||
double tooOld = now() - 1.0;
|
||||
for(auto p=self.proxy_transactionCounts.begin(); p!=self.proxy_transactionCounts.end(); ) {
|
||||
if (p->second.time < tooOld)
|
||||
p = self.proxy_transactionCounts.erase(p);
|
||||
for(auto p=self.proxyInfo.begin(); p!=self.proxyInfo.end(); ) {
|
||||
if (p->second.lastUpdateTime < tooOld)
|
||||
p = self.proxyInfo.erase(p);
|
||||
else
|
||||
++p;
|
||||
}
|
||||
|
@ -1022,10 +1025,10 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
|
|||
when (GetRateInfoRequest req = waitNext(rkInterf.getRateInfo.getFuture())) {
|
||||
GetRateInfoReply reply;
|
||||
|
||||
auto& p = self.proxy_transactionCounts[ req.requesterID ];
|
||||
auto& p = self.proxyInfo[ req.requesterID ];
|
||||
//TraceEvent("RKMPU", req.requesterID).detail("TRT", req.totalReleasedTransactions).detail("Last", p.first).detail("Delta", req.totalReleasedTransactions - p.first);
|
||||
if (p.total > 0) {
|
||||
self.smoothReleasedTransactions.addDelta( req.totalReleasedTransactions - p.total );
|
||||
if (p.totalTransactions > 0) {
|
||||
self.smoothReleasedTransactions.addDelta( req.totalReleasedTransactions - p.totalTransactions );
|
||||
|
||||
for(auto tag : req.throttledTagCounts) {
|
||||
auto itr = self.throttledTags.find(tag.first);
|
||||
|
@ -1034,33 +1037,36 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
|
|||
}
|
||||
}
|
||||
}
|
||||
if(p.batch > 0) {
|
||||
self.smoothBatchReleasedTransactions.addDelta( req.batchReleasedTransactions - p.batch );
|
||||
if(p.batchTransactions > 0) {
|
||||
self.smoothBatchReleasedTransactions.addDelta( req.batchReleasedTransactions - p.batchTransactions );
|
||||
}
|
||||
|
||||
p.total = req.totalReleasedTransactions;
|
||||
p.batch = req.batchReleasedTransactions;
|
||||
p.time = now();
|
||||
p.totalTransactions = req.totalReleasedTransactions;
|
||||
p.batchTransactions = req.batchReleasedTransactions;
|
||||
p.lastUpdateTime = now();
|
||||
|
||||
reply.transactionRate = self.normalLimits.tpsLimit / self.proxy_transactionCounts.size();
|
||||
reply.batchTransactionRate = self.batchLimits.tpsLimit / self.proxy_transactionCounts.size();
|
||||
reply.transactionRate = self.normalLimits.tpsLimit / self.proxyInfo.size();
|
||||
reply.batchTransactionRate = self.batchLimits.tpsLimit / self.proxyInfo.size();
|
||||
reply.leaseDuration = SERVER_KNOBS->METRIC_UPDATE_RATE;
|
||||
|
||||
// TODO: avoid iteration every time
|
||||
for(auto itr = self.throttledTags.begin(); itr != self.throttledTags.end();) {
|
||||
for(auto &priorityItr : itr->second.throttleData) {
|
||||
auto &priorityTags = reply.throttledTags[priorityItr.first]; // TODO: report all priorities, not just those at the throttle priority
|
||||
Optional<std::pair<double, double>> clientRate = itr->second.getClientRate(priorityItr.first);
|
||||
if(clientRate.present()) {
|
||||
priorityTags.try_emplace(itr->first, ClientTagThrottleLimits(clientRate.get().first, clientRate.get().second));
|
||||
if(p.lastThrottledTagChangeId != self.throttledTagChangeId) {
|
||||
p.lastThrottledTagChangeId = self.throttedTagChangeId;
|
||||
reply.throttledTags = PrioritizedTransactionTagMap<ClientTagThrottleLimits>();
|
||||
for(auto itr = self.throttledTags.begin(); itr != self.throttledTags.end();) {
|
||||
for(auto &priorityItr : itr->second.throttleData) {
|
||||
auto &priorityTags = reply.throttledTags.get()[priorityItr.first]; // TODO: report all priorities, not just those at the throttle priority
|
||||
Optional<std::pair<double, double>> clientRate = itr->second.getClientRate(priorityItr.first);
|
||||
if(clientRate.present()) {
|
||||
priorityTags.try_emplace(itr->first, ClientTagThrottleLimits(clientRate.get().first, clientRate.get().second));
|
||||
}
|
||||
else if(priorityItr.first == ThrottleApi::Priority::BATCH) {
|
||||
itr = self.throttledTags.erase(itr);
|
||||
break;
|
||||
}
|
||||
}
|
||||
else if(priorityItr.first == ThrottleApi::Priority::BATCH) {
|
||||
itr = self.throttledTags.erase(itr);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
++itr;
|
||||
++itr;
|
||||
}
|
||||
}
|
||||
|
||||
reply.healthMetrics.update(self.healthMetrics, true, req.detailed);
|
||||
|
|
|
@ -72,7 +72,7 @@ struct GetRateInfoReply {
|
|||
double leaseDuration;
|
||||
HealthMetrics healthMetrics;
|
||||
|
||||
PrioritizedTransactionTagMap<ClientTagThrottleLimits> throttledTags;
|
||||
Optional<PrioritizedTransactionTagMap<ClientTagThrottleLimits>> throttledTags;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
|
|
Loading…
Reference in New Issue