diff --git a/fdbserver/GrvProxyServer.actor.cpp b/fdbserver/GrvProxyServer.actor.cpp index 04bc8dec96..b87b0e65a6 100644 --- a/fdbserver/GrvProxyServer.actor.cpp +++ b/fdbserver/GrvProxyServer.actor.cpp @@ -36,6 +36,8 @@ struct GrvProxyStats { Counter txnBatchPriorityStartIn, txnBatchPriorityStartOut; Counter txnDefaultPriorityStartIn, txnDefaultPriorityStartOut; Counter txnThrottled; + Counter updatesFromRatekeeper, leaseTimeouts; + int systemGRVQueueSize, defaultGRVQueueSize, batchGRVQueueSize; double transactionRateAllowed, batchTransactionRateAllowed; double transactionLimit, batchTransactionLimit; // how much of the GRV requests queue was processed in one attempt to hand out read version. @@ -89,12 +91,13 @@ struct GrvProxyStats { txnBatchPriorityStartOut("TxnBatchPriorityStartOut", cc), txnDefaultPriorityStartIn("TxnDefaultPriorityStartIn", cc), txnDefaultPriorityStartOut("TxnDefaultPriorityStartOut", cc), txnThrottled("TxnThrottled", cc), - transactionRateAllowed(0), batchTransactionRateAllowed(0), transactionLimit(0), batchTransactionLimit(0), - percentageOfDefaultGRVQueueProcessed(0), percentageOfBatchGRVQueueProcessed(0), - defaultTxnGRVTimeInQueue("DefaultTxnGRVTimeInQueue", - id, - SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, - SERVER_KNOBS->LATENCY_SAMPLE_SIZE), + updatesFromRatekeeper("UpdatesFromRatekeeper", cc), leaseTimeouts("LeaseTimeouts", cc), systemGRVQueueSize(0), + defaultGRVQueueSize(0), batchGRVQueueSize(0), transactionRateAllowed(0), batchTransactionRateAllowed(0), + transactionLimit(0), batchTransactionLimit(0), percentageOfDefaultGRVQueueProcessed(0), + percentageOfBatchGRVQueueProcessed(0), defaultTxnGRVTimeInQueue("DefaultTxnGRVTimeInQueue", + id, + SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, + SERVER_KNOBS->LATENCY_SAMPLE_SIZE), batchTxnGRVTimeInQueue("BatchTxnGRVTimeInQueue", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL, @@ -109,6 +112,9 @@ struct GrvProxyStats { SERVER_KNOBS->LATENCY_SAMPLE_SIZE), grvLatencyBands("GRVLatencyBands", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) { // The rate at which the limit(budget) is allowed to grow. + specialCounter(cc, "SystemGRVQueueSize", [this]() { return this->systemGRVQueueSize; }); + specialCounter(cc, "DefaultGRVQueueSize", [this]() { return this->defaultGRVQueueSize; }); + specialCounter(cc, "BatchGRVQueueSize", [this]() { return this->batchGRVQueueSize; }); specialCounter( cc, "SystemAndDefaultTxnRateAllowed", [this]() { return int64_t(this->transactionRateAllowed); }); specialCounter( @@ -267,6 +273,7 @@ ACTOR Future healthMetricsRequestServer(GrvProxyInterface grvProxy, } } +// Get transaction rate info from RateKeeper. ACTOR Future getRate(UID myID, Reference> db, int64_t* inTransactionCount, @@ -320,6 +327,7 @@ ACTOR Future getRate(UID myID, batchTransactionRateInfo->setRate(rep.batchTransactionRate); stats->transactionRateAllowed = rep.transactionRate; stats->batchTransactionRateAllowed = rep.batchTransactionRate; + ++stats->updatesFromRatekeeper; //TraceEvent("GrvProxyRate", myID).detail("Rate", rep.transactionRate).detail("BatchRate", rep.batchTransactionRate).detail("Lease", rep.leaseDuration).detail("ReleasedTransactions", *inTransactionCount - lastTC); lastTC = *inTransactionCount; leaseTimeout = delay(rep.leaseDuration); @@ -339,6 +347,7 @@ ACTOR Future getRate(UID myID, when(wait(leaseTimeout)) { transactionRateInfo->disable(); batchTransactionRateInfo->disable(); + ++stats->leaseTimeouts; TraceEvent(SevWarn, "GrvProxyRateLeaseExpired", myID).suppressFor(5.0); //TraceEvent("GrvProxyRate", myID).detail("Rate", 0.0).detail("BatchRate", 0.0).detail("Lease", 0); leaseTimeout = Never(); @@ -390,14 +399,17 @@ ACTOR Future queueGetReadVersionRequests(Reference> } else if (req.priority == TransactionPriority::DEFAULT) { if (!batchQueue->empty()) { dropRequestFromQueue(batchQueue, stats); + --stats->batchGRVQueueSize; } else { canBeQueued = false; } } else { if (!batchQueue->empty()) { dropRequestFromQueue(batchQueue, stats); + --stats->batchGRVQueueSize; } else if (!defaultQueue->empty()) { dropRequestFromQueue(defaultQueue, stats); + --stats->defaultGRVQueueSize; } else { canBeQueued = false; } @@ -427,12 +439,14 @@ ACTOR Future queueGetReadVersionRequests(Reference> ++stats->txnRequestIn; stats->txnStartIn += req.transactionCount; stats->txnSystemPriorityStartIn += req.transactionCount; + ++stats->systemGRVQueueSize; systemQueue->push_back(req); systemQueue->span.addParent(req.spanContext); } else if (req.priority >= TransactionPriority::DEFAULT) { ++stats->txnRequestIn; stats->txnStartIn += req.transactionCount; stats->txnDefaultPriorityStartIn += req.transactionCount; + ++stats->defaultGRVQueueSize; defaultQueue->push_back(req); defaultQueue->span.addParent(req.spanContext); } else { @@ -445,6 +459,7 @@ ACTOR Future queueGetReadVersionRequests(Reference> ++stats->txnRequestIn; stats->txnStartIn += req.transactionCount; stats->txnBatchPriorityStartIn += req.transactionCount; + ++stats->batchGRVQueueSize; batchQueue->push_back(req); batchQueue->span.addParent(req.spanContext); } @@ -791,12 +806,15 @@ ACTOR static Future transactionStarter(GrvProxyInterface proxy, double currentTime = g_network->timer(); if (req.priority >= TransactionPriority::IMMEDIATE) { systemTransactionsStarted[req.flags & 1] += tc; + --grvProxyData->stats.systemGRVQueueSize; } else if (req.priority >= TransactionPriority::DEFAULT) { defaultPriTransactionsStarted[req.flags & 1] += tc; grvProxyData->stats.defaultTxnGRVTimeInQueue.addMeasurement(currentTime - req.requestTime()); + --grvProxyData->stats.defaultGRVQueueSize; } else { batchPriTransactionsStarted[req.flags & 1] += tc; grvProxyData->stats.batchTxnGRVTimeInQueue.addMeasurement(currentTime - req.requestTime()); + --grvProxyData->stats.batchGRVQueueSize; } start[req.flags & 1].push_back(std::move(req));