Merge pull request #4423 from halfprice/zhewu/proxy-report-ratekeeper-limit

Adding proxy local rate limit metrics reporting back to GrvProxyServer
This commit is contained in:
Jingyu Zhou 2021-03-05 08:49:18 -08:00 committed by GitHub
commit fab8c3f41a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 58 additions and 15 deletions

View File

@ -36,6 +36,14 @@ struct GrvProxyStats {
Counter txnBatchPriorityStartIn, txnBatchPriorityStartOut;
Counter txnDefaultPriorityStartIn, txnDefaultPriorityStartOut;
Counter txnThrottled;
double transactionRateAllowed, batchTransactionRateAllowed;
double transactionLimit, batchTransactionLimit;
// how much of the GRV requests queue was processed in one attempt to hand out read version.
double percentageOfDefaultGRVQueueProcessed;
double percentageOfBatchGRVQueueProcessed;
LatencySample defaultTxnGRVTimeInQueue;
LatencySample batchTxnGRVTimeInQueue;
LatencyBands grvLatencyBands;
LatencySample grvLatencySample;
@ -69,18 +77,34 @@ struct GrvProxyStats {
explicit GrvProxyStats(UID id)
: cc("GrvProxyStats", id.toString()), recentRequests(0), lastBucketBegin(now()),
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE/FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc),
txnRequestErrors("TxnRequestErrors", cc), txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc),
txnStartBatch("TxnStartBatch", cc), txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc),
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE / FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc), txnRequestErrors("TxnRequestErrors", cc),
txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc), txnStartBatch("TxnStartBatch", cc),
txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc),
txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc),
txnBatchPriorityStartIn("TxnBatchPriorityStartIn", cc),
txnBatchPriorityStartOut("TxnBatchPriorityStartOut", cc),
txnDefaultPriorityStartIn("TxnDefaultPriorityStartIn", cc),
txnDefaultPriorityStartOut("TxnDefaultPriorityStartOut", cc), txnThrottled("TxnThrottled", cc),
transactionRateAllowed(0), batchTransactionRateAllowed(0), transactionLimit(0), batchTransactionLimit(0),
percentageOfDefaultGRVQueueProcessed(0), percentageOfBatchGRVQueueProcessed(0),
defaultTxnGRVTimeInQueue("DefaultTxnGRVTimeInQueue", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
batchTxnGRVTimeInQueue("BatchTxnGRVTimeInQueue", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
grvLatencySample("GRVLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
grvLatencyBands("GRVLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
// The rate at which the limit(budget) is allowed to grow.
specialCounter(cc, "SystemAndDefaultTxnRateAllowed", [this]() { return this->transactionRateAllowed; });
specialCounter(cc, "BatchTransactionRateAllowed", [this]() { return this->batchTransactionRateAllowed; });
specialCounter(cc, "SystemAndDefaultTxnLimit", [this]() { return this->transactionLimit; });
specialCounter(cc, "BatchTransactionLimit", [this]() { return this->batchTransactionLimit; });
specialCounter(cc, "PercentageOfDefaultGRVQueueProcessed",
[this]() { return this->percentageOfDefaultGRVQueueProcessed; });
specialCounter(cc, "PercentageOfBatchGRVQueueProcessed",
[this]() { return this->percentageOfBatchGRVQueueProcessed; });
logger = traceCounters("GrvProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "GrvProxyMetrics");
for(int i = 0; i < FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS; i++) {
requestBuckets.push_back(0);
@ -219,13 +243,12 @@ ACTOR Future<Void> healthMetricsRequestServer(GrvProxyInterface grvProxy, GetHea
}
}
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount,
int64_t* inBatchTransactionCount, GrvTransactionRateInfo* transactionRateInfo,
GrvTransactionRateInfo* batchTransactionRateInfo, GetHealthMetricsReply* healthMetricsReply,
GetHealthMetricsReply* detailedHealthMetricsReply,
TransactionTagMap<uint64_t>* transactionTagCounter,
PrioritizedTransactionTagMap<ClientTagThrottleLimits>* throttledTags) {
int64_t* inBatchTransactionCount, GrvTransactionRateInfo* transactionRateInfo,
GrvTransactionRateInfo* batchTransactionRateInfo, GetHealthMetricsReply* healthMetricsReply,
GetHealthMetricsReply* detailedHealthMetricsReply,
TransactionTagMap<uint64_t>* transactionTagCounter,
PrioritizedTransactionTagMap<ClientTagThrottleLimits>* throttledTags, GrvProxyStats* stats) {
state Future<Void> nextRequestTimer = Never();
state Future<Void> leaseTimeout = Never();
state Future<GetRateInfoReply> reply = Never();
@ -266,7 +289,9 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
transactionRateInfo->setRate(rep.transactionRate);
batchTransactionRateInfo->setRate(rep.batchTransactionRate);
//TraceEvent("GrvProxyRate", myID).detail("Rate", rep.transactionRate).detail("BatchRate", rep.batchTransactionRate).detail("Lease", rep.leaseDuration).detail("ReleasedTransactions", *inTransactionCount - lastTC);
stats->transactionRateAllowed = rep.transactionRate;
stats->batchTransactionRateAllowed = rep.batchTransactionRate;
//TraceEvent("GrvProxyRate", myID).detail("Rate", rep.transactionRate).detail("BatchRate", rep.batchTransactionRate).detail("Lease", rep.leaseDuration).detail("ReleasedTransactions", *inTransactionCount - lastTC);
lastTC = *inTransactionCount;
leaseTimeout = delay(rep.leaseDuration);
nextRequestTimer = delayJittered(rep.leaseDuration / 2);
@ -568,7 +593,9 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy, Reference<
state int64_t midShardSize = SERVER_KNOBS->MIN_SHARD_BYTES;
addActor.send(monitorDDMetricsChanges(&midShardSize, db));
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo, &batchRateInfo, healthMetricsReply, detailedHealthMetricsReply, &transactionTagCounter, &throttledTags));
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo, &batchRateInfo,
healthMetricsReply, detailedHealthMetricsReply, &transactionTagCounter, &throttledTags,
&grvProxyData->stats));
addActor.send(queueGetReadVersionRequests(db, &systemQueue, &defaultQueue, &batchQueue,
proxy.getConsistentReadVersion.getFuture(), GRVTimer, &lastGRVTime,
&GRVBatchTime, normalGRVLatency.getFuture(), &grvProxyData->stats,
@ -597,6 +624,9 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy, Reference<
normalRateInfo.reset();
batchRateInfo.reset();
grvProxyData->stats.transactionLimit = normalRateInfo.limit;
grvProxyData->stats.batchTransactionLimit = batchRateInfo.limit;
int transactionsStarted[2] = {0,0};
int systemTransactionsStarted[2] = {0,0};
int defaultPriTransactionsStarted[2] = { 0, 0 };
@ -607,6 +637,8 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy, Reference<
int requestsToStart = 0;
uint32_t defaultQueueSize = defaultQueue.size();
uint32_t batchQueueSize = batchQueue.size();
while (requestsToStart < SERVER_KNOBS->START_TRANSACTION_MAX_REQUESTS_TO_START) {
SpannedDeque<GetReadVersionRequest>* transactionQueue;
if(!systemQueue.empty()) {
@ -636,12 +668,16 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy, Reference<
}
transactionsStarted[req.flags&1] += tc;
if (req.priority >= TransactionPriority::IMMEDIATE)
double currentTime = g_network->timer();
if (req.priority >= TransactionPriority::IMMEDIATE) {
systemTransactionsStarted[req.flags & 1] += tc;
else if (req.priority >= TransactionPriority::DEFAULT)
} else if (req.priority >= TransactionPriority::DEFAULT) {
defaultPriTransactionsStarted[req.flags & 1] += tc;
else
grvProxyData->stats.defaultTxnGRVTimeInQueue.addMeasurement(currentTime - req.requestTime());
} else {
batchPriTransactionsStarted[req.flags & 1] += tc;
grvProxyData->stats.batchTxnGRVTimeInQueue.addMeasurement(currentTime - req.requestTime());
}
start[req.flags & 1].push_back(std::move(req));
static_assert(GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY == 1, "Implementation dependent on flag value");
@ -680,6 +716,8 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy, Reference<
"GrvProxyServer.transactionStarter.AskLiveCommittedVersionFromMaster");
}
int defaultGRVProcessed = 0;
int batchGRVProcessed = 0;
for (int i = 0; i < start.size(); i++) {
if (start[i].size()) {
Future<GetReadVersionReply> readVersionReply = getLiveCommittedVersion(
@ -691,9 +729,14 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy, Reference<
if (i == 0) {
addActor.send(timeReply(readVersionReply, normalGRVLatency));
}
defaultGRVProcessed += defaultPriTransactionsStarted[i];
batchGRVProcessed += batchPriTransactionsStarted[i];
}
}
span = Span(span.location);
grvProxyData->stats.percentageOfDefaultGRVQueueProcessed = (double)defaultGRVProcessed / defaultQueueSize;
grvProxyData->stats.percentageOfBatchGRVQueueProcessed = (double)batchGRVProcessed / batchQueueSize;
}
}