Merge pull request #10572 from sfc-gh-tclinkenbeard/main-decouple-token-bucket-knobs

Decouple token bucket knobs for different types of throttlers
This commit is contained in:
Trevor Clinkenbeard 2023-06-28 12:29:31 -07:00 committed by GitHub
commit 4ec3eac0e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 35 additions and 21 deletions

View File

@ -548,7 +548,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( POLLING_FREQUENCY, 2.0 ); if( longLeaderElection ) POLLING_FREQUENCY = 8.0;
init( HEARTBEAT_FREQUENCY, 0.5 ); if( longLeaderElection ) HEARTBEAT_FREQUENCY = 1.0;
// Commit CommitProxy and GRV CommitProxy
// Commit Proxy and GRV Proxy
init( START_TRANSACTION_BATCH_INTERVAL_MIN, 1e-6 );
init( START_TRANSACTION_BATCH_INTERVAL_MAX, 0.010 );
init( START_TRANSACTION_BATCH_INTERVAL_LATENCY_FRACTION, 0.5 );
@ -557,7 +557,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( START_TRANSACTION_MAX_TRANSACTIONS_TO_START, 100000 );
init( START_TRANSACTION_MAX_REQUESTS_TO_START, 10000 );
init( START_TRANSACTION_RATE_WINDOW, 2.0 );
init( TAG_THROTTLE_RATE_WINDOW, 2.0 );
init( START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET, 10.0 );
init( TAG_THROTTLE_MAX_EMPTY_QUEUE_BUDGET, 1000.0 );
init( START_TRANSACTION_MAX_QUEUE_SIZE, 1e6 );
init( KEY_LOCATION_MAX_QUEUE_SIZE, 1e6 );
init( TENANT_ID_REQUEST_MAX_QUEUE_SIZE, 1e6 );

View File

@ -495,7 +495,7 @@ public:
double POLLING_FREQUENCY;
double HEARTBEAT_FREQUENCY;
// Commit CommitProxy
// Commit Proxy and GRV Proxy
double START_TRANSACTION_BATCH_INTERVAL_MIN;
double START_TRANSACTION_BATCH_INTERVAL_MAX;
double START_TRANSACTION_BATCH_INTERVAL_LATENCY_FRACTION;
@ -504,7 +504,9 @@ public:
double START_TRANSACTION_MAX_TRANSACTIONS_TO_START;
int START_TRANSACTION_MAX_REQUESTS_TO_START;
double START_TRANSACTION_RATE_WINDOW;
double TAG_THROTTLE_RATE_WINDOW;
double START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET;
double TAG_THROTTLE_MAX_EMPTY_QUEUE_BUDGET;
int START_TRANSACTION_MAX_QUEUE_SIZE;
int KEY_LOCATION_MAX_QUEUE_SIZE;
int TENANT_ID_REQUEST_MAX_QUEUE_SIZE;

View File

@ -833,8 +833,12 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
state int64_t transactionCount = 0;
state int64_t batchTransactionCount = 0;
state GrvTransactionRateInfo normalRateInfo(10);
state GrvTransactionRateInfo batchRateInfo(0);
state GrvTransactionRateInfo normalRateInfo(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW,
SERVER_KNOBS->START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET,
/*rate=*/10);
state GrvTransactionRateInfo batchRateInfo(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW,
SERVER_KNOBS->START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET,
/*rate=*/0);
state Deque<GetReadVersionRequest> systemQueue;
state Deque<GetReadVersionRequest> defaultQueue;

View File

@ -40,7 +40,8 @@ void GrvProxyTagThrottler::TagQueue::setRate(double rate) {
if (rateInfo.present()) {
rateInfo.get().setRate(rate);
} else {
rateInfo = GrvTransactionRateInfo(rate);
rateInfo = GrvTransactionRateInfo(
SERVER_KNOBS->TAG_THROTTLE_RATE_WINDOW, SERVER_KNOBS->TAG_THROTTLE_MAX_EMPTY_QUEUE_BUDGET, rate);
}
}
@ -473,9 +474,9 @@ TEST_CASE("/GrvProxyTagThrottler/Fifo") {
// Tests that while throughput is low, the tag throttler
// does not accumulate too much budget.
//
// A server is setup to server 10 transactions per second,
// A server is setup to server 100 transactions per second,
// then runs idly for 60 seconds. Then a client starts
// and attempts 20 transactions per second for 60 seconds.
// and attempts 200 transactions per second for 60 seconds.
// The server throttles the client to only achieve
// 10 transactions per second during this 60 second window.
// If the throttler is allowed to accumulate budget indefinitely
@ -486,16 +487,16 @@ TEST_CASE("/GrvProxyTagThrottler/LimitedIdleBudget") {
state TransactionTagMap<uint32_t> counters;
{
TransactionTagMap<double> rates;
rates["sampleTag"_sr] = 10.0;
rates["sampleTag"_sr] = 100.0;
throttler.updateRates(rates);
}
tagSet.addTag("sampleTag"_sr);
state Future<Void> server = mockServer(&throttler);
wait(delay(60.0));
state Future<Void> client = mockClient(&throttler, TransactionPriority::DEFAULT, tagSet, 1, 20.0, &counters);
state Future<Void> client = mockClient(&throttler, TransactionPriority::DEFAULT, tagSet, 1, 200.0, &counters);
wait(timeout(client && server, 60.0, Void()));
TraceEvent("TagQuotaTest_LimitedIdleBudget").detail("Counter", counters["sampleTag"_sr]);
ASSERT(isNear(counters["sampleTag"_sr], 60.0 * 10.0));
ASSERT(isNear(counters["sampleTag"_sr], 60.0 * 100.0));
return Void();
}

View File

@ -24,9 +24,9 @@
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // must be last include
GrvTransactionRateInfo::GrvTransactionRateInfo(double rate)
: rate(rate), smoothRate(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW),
smoothReleased(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW) {
GrvTransactionRateInfo::GrvTransactionRateInfo(double rateWindow, double maxEmptyQueueBudget, double rate)
: rateWindow(rateWindow), maxEmptyQueueBudget(maxEmptyQueueBudget), rate(rate), smoothRate(rateWindow),
smoothReleased(rateWindow) {
smoothRate.setTotal(rate);
}
@ -50,14 +50,14 @@ void GrvTransactionRateInfo::endReleaseWindow(int64_t numStarted, bool queueEmpt
// accumulate budget over time in the case that our batches are too big to take advantage of the rate window based
// limits.
//
// Note that "rate window" here indicates a period of SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW seconds,
// Note that "rate window" here indicates a period of rateWindow seconds,
// whereas "release window" is the period between wait statements, with duration indicated by "elapsed."
budget = std::max(0.0, budget + elapsed * (limit - numStarted) / SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW);
budget = std::max(0.0, budget + elapsed * (limit - numStarted) / rateWindow);
// If we are emptying out the queue of requests, then we don't need to carry much budget forward
// If we did keep accumulating budget, then our responsiveness to changes in workflow could be compromised
if (queueEmpty) {
budget = std::min(budget, SERVER_KNOBS->START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET);
budget = std::min(budget, maxEmptyQueueBudget);
}
smoothReleased.addDelta(numStarted);
@ -91,7 +91,7 @@ void GrvTransactionRateInfo::startReleaseWindow() {
// Limit can be negative in the event that we are releasing more transactions than we are allowed (due to the
// use of our budget or because of higher priority transactions).
double releaseRate = smoothRate.smoothTotal() - smoothReleased.smoothRate();
limit = SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW * releaseRate;
limit = rateWindow * releaseRate;
}
static bool isNear(double desired, int64_t actual) {
@ -112,7 +112,7 @@ ACTOR static Future<Void> mockClient(GrvTransactionRateInfo* rateInfo, double de
// Rate limit set at 10, but client attempts 20 transactions per second.
// Client should be throttled to only 10 transactions per second.
TEST_CASE("/GrvTransactionRateInfo/Simple") {
state GrvTransactionRateInfo rateInfo;
state GrvTransactionRateInfo rateInfo(/*rateWindow=*/2.0, /*maxEmptyQueueBudget=*/100, /*rate=*/10);
state int64_t counter;
rateInfo.setRate(10.0);
wait(timeout(mockClient(&rateInfo, 20.0, &counter), 60.0, Void()));

View File

@ -55,7 +55,10 @@ class GrvProxyTagThrottler {
Deque<DelayedRequest> requests;
TagQueue() = default;
explicit TagQueue(double rate) : rateInfo(rate) {}
explicit TagQueue(double rate)
: rateInfo(GrvTransactionRateInfo(SERVER_KNOBS->TAG_THROTTLE_RATE_WINDOW,
SERVER_KNOBS->TAG_THROTTLE_MAX_EMPTY_QUEUE_BUDGET,
rate)) {}
void setRate(double rate);
bool isMaxThrottled(double maxThrottleDuration) const;

View File

@ -34,7 +34,9 @@
//
// Smoothers are used to avoid turbulent throttling behaviour.
class GrvTransactionRateInfo {
double rate = 0.0;
double rateWindow{ 1.0 };
double maxEmptyQueueBudget{ 0.0 };
double rate{ 0.0 };
double limit{ 0.0 };
double budget{ 0.0 };
bool disabled{ true };
@ -42,7 +44,7 @@ class GrvTransactionRateInfo {
Smoother smoothReleased;
public:
explicit GrvTransactionRateInfo(double rate = 0.0);
GrvTransactionRateInfo(double rateWindow, double maxEmptyQueueBudget, double rate);
// Determines the number of transactions that this proxy is allowed to release
// in this release window.