Various changes to the throttling scheme, including most notably that clients now enforce the throttles with info they receive from the proxy.

This commit is contained in:
A.J. Beamon 2020-04-07 16:28:09 -07:00
parent 2336f073ad
commit a1d8623e5f
8 changed files with 181 additions and 55 deletions

View File

@ -3777,7 +3777,6 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
}
}
if(tokens.size() == 6) {
char *end;
Optional<uint64_t> parsedDuration = parseDuration(tokens[5].toString());
if(!parsedDuration.present()) {
printf("ERROR: failed to parse duration `%s'.\n", printable(tokens[5]).c_str());
@ -3787,7 +3786,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
duration = parsedDuration.get();
}
wait(ThrottleApi::throttleTag(db, tokens[3], rate, now()+duration, false)); // TODO: express in versions or somehow deal with time?
wait(ThrottleApi::throttleTag(db, tokens[3], rate, now()+duration, true, false));
printf("Tag `%s' has been throttled\n", tokens[3].toString().c_str());
}
else if(tokencmp(tokens[1], "off")) {

View File

@ -165,9 +165,12 @@ public:
UID dbId;
bool internal; // Only contexts created through the C client and fdbcli are non-internal
std::map<TagThrottleInfo::Priority, std::map<Standalone<StringRef>, TagThrottleInfo>> throttledTags;
CounterCollection cc;
Counter transactionReadVersions;
Counter transactionReadVersionsThrottled;
Counter transactionReadVersionsCompleted;
Counter transactionReadVersionBatches;
Counter transactionBatchReadVersions;

View File

@ -1014,12 +1014,29 @@ struct TagThrottleInfo {
bool autoThrottled;
Priority priority;
TagThrottleInfo() : rate(0), expiration(0), autoThrottled(false) {}
TagThrottleInfo(double rate, double expiration, bool autoThrottled, Priority priority) : rate(rate), expiration(expiration), autoThrottled(autoThrottled), priority(priority) {}
bool serializeExpirationAsDuration;
TagThrottleInfo() : rate(0), expiration(0), autoThrottled(false), priority(Priority::DEFAULT), serializeExpirationAsDuration(true) {}
TagThrottleInfo(double rate, double expiration, bool autoThrottled, Priority priority, bool serializeExpirationAsDuration)
: rate(rate), expiration(expiration), autoThrottled(autoThrottled), priority(priority),
serializeExpirationAsDuration(serializeExpirationAsDuration) {}
template<class Ar>
void serialize(Ar& ar) {
serializer(ar, rate, expiration, autoThrottled, priority);
if(ar.isDeserializing) {
serializer(ar, rate, expiration, autoThrottled, priority, serializeExpirationAsDuration);
if(serializeExpirationAsDuration) {
expiration += now();
}
}
else {
double serializedExpiration = expiration;
if(serializeExpirationAsDuration) {
serializedExpiration = std::max(expiration - now(), 0.0);
}
serializer(ar, rate, serializedExpiration, autoThrottled, priority, serializeExpirationAsDuration);
}
}
};

View File

@ -158,11 +158,13 @@ struct GetReadVersionReply {
bool locked;
Optional<Value> metadataVersion;
std::map<Standalone<StringRef>, TagThrottleInfo> tagThrottleInfo;
GetReadVersionReply() : version(invalidVersion), locked(false) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, locked, metadataVersion);
serializer(ar, version, locked, metadataVersion, tagThrottleInfo);
}
};

View File

@ -504,7 +504,7 @@ DatabaseContext::DatabaseContext(
Reference<AsyncVar<Reference<ClusterConnectionFile>>> connectionFile, Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor,
TaskPriority taskID, LocalityData const& clientLocality, bool enableLocalityLoadBalance, bool lockAware, bool internal, int apiVersion, bool switchable )
: connectionFile(connectionFile),clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), taskID(taskID), clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance),
lockAware(lockAware), apiVersion(apiVersion), switchable(switchable), provisional(false), cc("TransactionMetrics"), transactionReadVersions("ReadVersions", cc),
lockAware(lockAware), apiVersion(apiVersion), switchable(switchable), provisional(false), cc("TransactionMetrics"), transactionReadVersions("ReadVersions", cc), transactionReadVersionsThrottled("ReadVersionsThrottled", cc),
transactionReadVersionsCompleted("ReadVersionsCompleted", cc), transactionReadVersionBatches("ReadVersionBatches", cc), transactionBatchReadVersions("BatchPriorityReadVersions", cc),
transactionDefaultReadVersions("DefaultPriorityReadVersions", cc), transactionImmediateReadVersions("ImmediatePriorityReadVersions", cc),
transactionBatchReadVersionsCompleted("BatchPriorityReadVersionsCompleted", cc), transactionDefaultReadVersionsCompleted("DefaultPriorityReadVersionsCompleted", cc),
@ -539,7 +539,7 @@ DatabaseContext::DatabaseContext(
clientStatusUpdater.actor = clientStatusUpdateActor(this);
}
DatabaseContext::DatabaseContext( const Error &err ) : deferredError(err), cc("TransactionMetrics"), transactionReadVersions("ReadVersions", cc),
DatabaseContext::DatabaseContext( const Error &err ) : deferredError(err), cc("TransactionMetrics"), transactionReadVersions("ReadVersions", cc), transactionReadVersionsThrottled("ReadVersionsThrottled", cc),
transactionReadVersionsCompleted("ReadVersionsCompleted", cc), transactionReadVersionBatches("ReadVersionBatches", cc), transactionBatchReadVersions("BatchPriorityReadVersions", cc),
transactionDefaultReadVersions("DefaultPriorityReadVersions", cc), transactionImmediateReadVersions("ImmediatePriorityReadVersions", cc),
transactionBatchReadVersionsCompleted("BatchPriorityReadVersionsCompleted", cc), transactionDefaultReadVersionsCompleted("DefaultPriorityReadVersionsCompleted", cc),
@ -3081,6 +3081,25 @@ ACTOR Future<GetReadVersionReply> getConsistentReadVersion( DatabaseContext *cx,
choose {
when ( wait( cx->onMasterProxiesChanged() ) ) {}
when ( GetReadVersionReply v = wait( loadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) {
TagThrottleInfo::Priority priority;
if(req.priority() == GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE) {
priority = TagThrottleInfo::Priority::IMMEDIATE;
}
else if(req.priority() == GetReadVersionRequest::PRIORITY_DEFAULT) {
priority = TagThrottleInfo::Priority::DEFAULT;
}
else if(req.priority() == GetReadVersionRequest::PRIORITY_BATCH) {
priority = TagThrottleInfo::Priority::BATCH;
}
else {
ASSERT(false);
}
auto &priorityThrottledTags = cx->throttledTags[priority];
for(auto tag : v.tagThrottleInfo) {
priorityThrottledTags[tag.first] = tag.second;
}
if( debugID.present() )
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.After");
ASSERT( v.version > 0 );
@ -3104,7 +3123,7 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
state Optional<UID> debugID;
state bool send_batch;
state Standalone<VectorRef<StringRef>> tags;
state std::set<Standalone<StringRef>> tags;
// dynamic batching
state PromiseStream<double> replyTimes;
@ -3122,12 +3141,14 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), debugID.get().first());
}
requests.push_back(req.reply);
tags = req.tags;
for(auto tag : req.tags) {
tags.insert(tag);
}
//if (requests.size() == CLIENT_KNOBS->MAX_BATCH_SIZE)
if (requests.size() == CLIENT_KNOBS->MAX_BATCH_SIZE)
send_batch = true;
//else if (!timeout.isValid())
//timeout = delay(batchTime, TaskPriority::GetConsistentReadVersion);
else if (!timeout.isValid())
timeout = delay(batchTime, TaskPriority::GetConsistentReadVersion);
}
when(wait(timeout.isValid() ? timeout : Never())) { send_batch = true; }
// dynamic batching monitors reply latencies
@ -3145,8 +3166,14 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
requests.push_back(GRVReply);
addActor.send(ready(timeReply(GRVReply.getFuture(), replyTimes)));
Standalone<VectorRef<StringRef>> tagsToSend;
for(auto tag : tags) {
tagsToSend.push_back_deep(tagsToSend.arena(), tag);
}
tags.clear();
Future<Void> batch = incrementalBroadcastWithError(
getConsistentReadVersion(cx, count, flags, tags, std::move(debugID)),
getConsistentReadVersion(cx, count, flags, tagsToSend, std::move(debugID)),
std::vector<Promise<GetReadVersionReply>>(std::move(requests)), CLIENT_KNOBS->BROADCAST_BATCH_SIZE);
debugID = Optional<UID>();
requests = std::vector< Promise<GetReadVersionReply> >();
@ -3156,7 +3183,7 @@ ACTOR Future<Void> readVersionBatcher( DatabaseContext *cx, FutureStream<Databas
}
}
ACTOR Future<Version> extractReadVersion(DatabaseContext* cx, uint32_t flags, Reference<TransactionLogInfo> trLogInfo, Future<GetReadVersionReply> f, bool lockAware, double startTime, Promise<Optional<Value>> metadataVersion) {
ACTOR Future<Version> extractReadVersion(DatabaseContext* cx, uint32_t flags, Reference<TransactionLogInfo> trLogInfo, Future<GetReadVersionReply> f, bool lockAware, double startTime, Promise<Optional<Value>> metadataVersion, Standalone<VectorRef<StringRef>> tags, double throttledRate) {
GetReadVersionReply rep = wait(f);
double latency = now() - startTime;
cx->GRVLatencies.addSample(latency);
@ -3168,20 +3195,45 @@ ACTOR Future<Version> extractReadVersion(DatabaseContext* cx, uint32_t flags, Re
if(rep.locked && !lockAware)
throw database_locked();
TagThrottleInfo::Priority priority;
++cx->transactionReadVersionsCompleted;
if((flags & GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE) == GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE) {
priority = TagThrottleInfo::Priority::IMMEDIATE;
++cx->transactionImmediateReadVersionsCompleted;
}
else if((flags & GetReadVersionRequest::PRIORITY_DEFAULT) == GetReadVersionRequest::PRIORITY_DEFAULT) {
priority = TagThrottleInfo::Priority::DEFAULT;
++cx->transactionDefaultReadVersionsCompleted;
}
else if((flags & GetReadVersionRequest::PRIORITY_BATCH) == GetReadVersionRequest::PRIORITY_BATCH) {
priority = TagThrottleInfo::Priority::BATCH;
++cx->transactionBatchReadVersionsCompleted;
}
else {
ASSERT(false);
}
auto &priorityThrottledTags = cx->throttledTags[priority];
double newMaxThrottle = throttledRate;
for(auto tag : tags) {
auto itr = priorityThrottledTags.find(tag);
if(itr != priorityThrottledTags.end()) {
if(itr->second.expiration > now()) {
newMaxThrottle = std::max(newMaxThrottle, itr->second.rate);
}
else {
priorityThrottledTags.erase(itr);
}
}
}
// If the throttle rate is higher now than when we made the request, we may still need to throttle this response
if(newMaxThrottle > throttledRate && deterministicRandom()->random01() < 1.0 - (1.0-newMaxThrottle)/(1.0-throttledRate)) {
++cx->transactionReadVersionsThrottled;
throw tag_throttled();
}
if(rep.version > cx->metadataVersionCache[cx->mvCacheInsertLocation].first) {
cx->mvCacheInsertLocation = (cx->mvCacheInsertLocation + 1) % cx->metadataVersionCache.size();
cx->metadataVersionCache[cx->mvCacheInsertLocation] = std::make_pair(rep.version, rep.metadataVersion);
@ -3194,20 +3246,46 @@ ACTOR Future<Version> extractReadVersion(DatabaseContext* cx, uint32_t flags, Re
Future<Version> Transaction::getReadVersion(uint32_t flags) {
if (!readVersion.isValid()) {
++cx->transactionReadVersions;
TagThrottleInfo::Priority priority;
flags |= options.getReadVersionFlags;
if((flags & GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE) == GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE) {
priority = TagThrottleInfo::Priority::IMMEDIATE;
++cx->transactionImmediateReadVersions;
}
else if((flags & GetReadVersionRequest::PRIORITY_DEFAULT) == GetReadVersionRequest::PRIORITY_DEFAULT) {
priority = TagThrottleInfo::Priority::DEFAULT;
++cx->transactionDefaultReadVersions;
}
else if((flags & GetReadVersionRequest::PRIORITY_BATCH) == GetReadVersionRequest::PRIORITY_BATCH) {
priority = TagThrottleInfo::Priority::BATCH;
++cx->transactionBatchReadVersions;
}
else {
ASSERT(false);
}
double maxThrottle = 0;
if(!options.tags.empty()) {
auto priorityThrottledTags = cx->throttledTags[priority];
for(auto tag : options.tags) {
auto itr = priorityThrottledTags.find(tag);
if(itr != priorityThrottledTags.end()) {
if(itr->second.expiration > now()) {
maxThrottle = std::max(maxThrottle, itr->second.rate);
}
else {
priorityThrottledTags.erase(itr);
}
}
}
}
// TODO: can we protect against client spamming this request (e.g. set absolute limits or delay for all transactions starting with throttle tag)
if(maxThrottle > 0 && deterministicRandom()->random01() < maxThrottle) {
++cx->transactionReadVersionsThrottled;
return Future<Version>(tag_throttled());
}
auto& batcher = cx->versionBatcher[ flags ];
if (!batcher.actor.isValid()) {
batcher.actor = readVersionBatcher( cx.getPtr(), batcher.stream.getFuture(), flags );
@ -3216,7 +3294,7 @@ Future<Version> Transaction::getReadVersion(uint32_t flags) {
auto const req = DatabaseContext::VersionRequest(options.tags, info.debugID);
batcher.stream.send(req);
startTime = now();
readVersion = extractReadVersion( cx.getPtr(), flags, trLogInfo, req.reply.getFuture(), options.lockAware, startTime, metadataVersion);
readVersion = extractReadVersion( cx.getPtr(), flags, trLogInfo, req.reply.getFuture(), options.lockAware, startTime, metadataVersion, options.tags, maxThrottle);
}
return readVersion;
}

View File

@ -98,8 +98,8 @@ struct ProxyStats {
};
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, double* outTransactionRate,
double* outBatchTransactionRate, std::map<Standalone<StringRef>, double> *throttledTags, std::map<Standalone<StringRef>, double> *throttledBatchTags,
GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply) {
double* outBatchTransactionRate, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply,
std::map<TagThrottleInfo::Priority, std::map<Standalone<StringRef>, TagThrottleInfo>> *throttledTags) {
state Future<Void> nextRequestTimer = Never();
state Future<Void> leaseTimeout = Never();
state Future<GetRateInfoReply> reply = Never();
@ -141,7 +141,6 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
}
*throttledTags = std::move(rep.throttledTags);
*throttledBatchTags = std::move(rep.throttledBatchTags);
}
when ( wait( leaseTimeout ) ) {
*outTransactionRate = 0;
@ -156,8 +155,6 @@ struct TransactionRateInfo {
double rate;
double limit;
std::map<Standalone<StringRef>, double> throttledTags; // TODO: unordered map?
TransactionRateInfo(double rate) : rate(rate), limit(0) {}
void reset(double elapsed) {
@ -183,8 +180,6 @@ ACTOR Future<Void> queueTransactionStartRequests(
FutureStream<GetReadVersionRequest> readVersionRequests,
PromiseStream<Void> GRVTimer, double *lastGRVTime,
double *GRVBatchTime, FutureStream<double> replyTimes,
std::map<Standalone<StringRef>, double> *throttledTags,
std::map<Standalone<StringRef>, double> *throttledBatchTags,
ProxyStats* stats, TransactionRateInfo* batchRateInfo)
{
loop choose{
@ -202,21 +197,6 @@ ACTOR Future<Void> queueTransactionStartRequests(
if (req.debugID.present())
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.queueTransactionStartRequests.Before");
if(req.priority() != GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE) {
double maxThrottle = 0;
std::map<Standalone<StringRef>, double> *tagsToCheck = (req.priority() == GetReadVersionRequest::PRIORITY_DEFAULT) ? throttledTags : throttledBatchTags;
for(auto& tag : req.tags) {
auto itr = tagsToCheck->find(tag);
if(itr != tagsToCheck->end()) {
maxThrottle = std::max(maxThrottle, itr->second);
}
}
if(deterministicRandom()->random01() < maxThrottle) {
req.reply.sendError(tag_throttled());
continue;
}
}
if (systemQueue->empty() && defaultQueue->empty() && batchQueue->empty()) {
forwardPromise(GRVTimer, delayJittered(std::max(0.0, *GRVBatchTime - (now() - *lastGRVTime)), TaskPriority::ProxyGRVTimer));
}
@ -1239,7 +1219,9 @@ ACTOR Future<Void> updateLastCommit(ProxyCommitData* self, Optional<UID> debugID
return Void();
}
ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commitData, uint32_t flags, vector<MasterProxyInterface> *otherProxies, Optional<UID> debugID, int transactionCount, int systemTransactionCount, int defaultPriTransactionCount, int batchPriTransactionCount)
ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commitData, uint32_t flags, vector<MasterProxyInterface> *otherProxies, Optional<UID> debugID,
int transactionCount, int systemTransactionCount, int defaultPriTransactionCount, int batchPriTransactionCount,
std::map<TagThrottleInfo::Priority)
{
// Returns a version which (1) is committed, and (2) is >= the latest version reported committed (by a commit response) when this request was sent
// (1) The version returned is the committedVersion of some proxy at some point before the request returns, so it is committed.
@ -1286,21 +1268,54 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commi
}
ACTOR Future<Void> sendGrvReplies(Future<GetReadVersionReply> replyFuture, std::vector<GetReadVersionRequest> requests,
ProxyStats* stats, Version minKnownCommittedVersion) {
GetReadVersionReply reply = wait(replyFuture);
ProxyStats* stats, Version minKnownCommittedVersion, std::map<TagThrottleInfo::Priority, std::map<Standalone<StringRef>, TagThrottleInfo>> throttledTags) {
GetReadVersionReply _baseReply = wait(replyFuture);
GetReadVersionReply baseReply = _baseReply;
double end = g_network->timer();
for(GetReadVersionRequest const& request : requests) {
if(request.priority() >= GetReadVersionRequest::PRIORITY_DEFAULT) {
stats->grvLatencyBands.addMeasurement(end - request.requestTime());
}
GetReadVersionReply &reply = baseReply;
if (request.flags & GetReadVersionRequest::FLAG_USE_MIN_KNOWN_COMMITTED_VERSION) {
// Only backup worker may infrequently use this flag.
GetReadVersionReply minKCVReply = reply;
minKCVReply.version = minKnownCommittedVersion;
request.reply.send(minKCVReply);
} else {
request.reply.send(reply);
GetReadVersionReply minKcvReply = reply;
minKcvReply.version = minKnownCommittedVersion;
reply = minKcvReply;
}
reply.tagThrottleInfo.clear();
for(auto tag : request.tags) {
TagThrottleInfo::Priority priority;
if(request.priority() == GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE) {
priority = TagThrottleInfo::Priority::IMMEDIATE;
}
else if(request.priority() == GetReadVersionRequest::PRIORITY_DEFAULT) {
priority = TagThrottleInfo::Priority::DEFAULT;
}
else if(request.priority() == GetReadVersionRequest::PRIORITY_BATCH) {
priority = TagThrottleInfo::Priority::BATCH;
}
else {
ASSERT(false);
}
auto itr = throttledTags.find(priority);
ASSERT(itr != throttledTags.end());
auto tagItr = itr->second.find(tag);
while(tagItr == itr->second.end() && itr != throttledTags.begin()) {
--itr;
}
if(tagItr != itr->second.end()) {
reply.tagThrottleInfo[tag] = tagItr->second;
}
}
request.reply.send(reply);
++stats->txnRequestOut;
}
@ -1328,11 +1343,12 @@ ACTOR static Future<Void> transactionStarter(
state Deque<GetReadVersionRequest> batchQueue;
state vector<MasterProxyInterface> otherProxies;
state std::map<TagThrottleInfo::Priority, std::map<Standalone<StringRef>, TagThrottleInfo>> throttledTags;
state PromiseStream<double> replyTimes;
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo.rate, &batchRateInfo.rate, &normalRateInfo.throttledTags, &batchRateInfo.throttledTags, healthMetricsReply, detailedHealthMetricsReply));
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo.rate, &batchRateInfo.rate, healthMetricsReply, detailedHealthMetricsReply, &throttledTags));
addActor.send(queueTransactionStartRequests(db, &systemQueue, &defaultQueue, &batchQueue, proxy.getConsistentReadVersion.getFuture(),
GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &normalRateInfo.throttledTags,
&batchRateInfo.throttledTags, &commitData->stats, &batchRateInfo));
GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats, &batchRateInfo));
// Get a list of the other proxies that go together with us
while (std::find(db->get().client.proxies.begin(), db->get().client.proxies.end(), proxy) == db->get().client.proxies.end())
@ -1439,7 +1455,7 @@ ACTOR static Future<Void> transactionStarter(
if (start[i].size()) {
Future<GetReadVersionReply> readVersionReply = getLiveCommittedVersion(commitData, i, &otherProxies, debugID, transactionsStarted[i], systemTransactionsStarted[i], defaultPriTransactionsStarted[i], batchPriTransactionsStarted[i]);
addActor.send(sendGrvReplies(readVersionReply, start[i], &commitData->stats,
commitData->minKnownCommittedVersion));
commitData->minKnownCommittedVersion, throttledTags));
// for now, base dynamic batching on the time for normal requests (not read_risky)
if (i == 0) {

View File

@ -23,6 +23,7 @@
#include "fdbrpc/Smoother.h"
#include "fdbrpc/simulator.h"
#include "fdbclient/ReadYourWrites.h"
#include "fdbclient/TagThrottle.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/DataDistribution.actor.h"
#include "fdbserver/RatekeeperInterface.h"
@ -427,9 +428,19 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
TagThrottleInfo throttleInfo = decodeTagThrottleValue(entry.value);
TraceEvent("RatekeeperReadThrottleRead").detail("Tag", tag).detail("Expiration", throttleInfo.expiration);
if((!self->autoThrottlingEnabled && throttleInfo.autoThrottled) || throttleInfo.expiration <= now()) { // TODO: keep or delete auto throttles when disabling auto-throttling
tr.clear(tag);
tr.clear(entry.key);
}
else {
// Convert serialized version to absolute time
if(throttleInfo.serializeExpirationAsDuration) {
throttleInfo.serializeExpirationAsDuration = false;
BinaryWriter wr(IncludeVersion());
wr << throttleInfo;
state Value value = wr.toValue();
tr.set(entry.key, value);
}
auto oldItr = oldThrottleIterators[throttleInfo.priority];
while(oldItr.first != oldItr.second && oldItr.first->first < tag) {
++oldItr.first;
@ -917,9 +928,10 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
// TODO: avoid iteration every time
for(auto priorityItr = self.tagThrottles.begin(); priorityItr != self.tagThrottles.end(); ++priorityItr) {
auto &priorityTags = reply.throttledTags[priorityItr->first];
for(auto tagItr = priorityItr->second.begin(); tagItr != priorityItr->second.end();) {
if(tagItr->second.expiration > now()) {
reply.throttledTags[tagItr->first] = tagItr->second.rate;
priorityTags[tagItr->first] = tagItr->second;
++tagItr;
}
else {

View File

@ -59,12 +59,11 @@ struct GetRateInfoReply {
double leaseDuration;
HealthMetrics healthMetrics;
std::map<Standalone<StringRef>, double> throttledTags;
std::map<Standalone<StringRef>, double> throttledBatchTags;
std::map<TagThrottleInfo::Priority, std::map<Standalone<StringRef>, TagThrottleInfo>> throttledTags;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, transactionRate, batchTransactionRate, leaseDuration, healthMetrics, throttledTags, throttledBatchTags);
serializer(ar, transactionRate, batchTransactionRate, leaseDuration, healthMetrics, throttledTags);
}
};