Decouple token bucket knobs for different types of throttlers

This commit is contained in:
sfc-gh-tclinkenbeard 2023-06-27 15:27:52 -07:00
parent cf3b5d8c57
commit 6e86f94cb9
7 changed files with 28 additions and 14 deletions

View File

@ -557,7 +557,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( START_TRANSACTION_MAX_TRANSACTIONS_TO_START, 100000 );
init( START_TRANSACTION_MAX_REQUESTS_TO_START, 10000 );
init( START_TRANSACTION_RATE_WINDOW, 2.0 );
init( TAG_THROTTLE_RATE_WINDOW, 2.0 );
init( START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET, 10.0 );
init( TAG_THROTTLE_MAX_EMPTY_QUEUE_BUDGET, 10.0 );
init( START_TRANSACTION_MAX_QUEUE_SIZE, 1e6 );
init( KEY_LOCATION_MAX_QUEUE_SIZE, 1e6 );
init( TENANT_ID_REQUEST_MAX_QUEUE_SIZE, 1e6 );

View File

@ -504,7 +504,9 @@ public:
double START_TRANSACTION_MAX_TRANSACTIONS_TO_START;
int START_TRANSACTION_MAX_REQUESTS_TO_START;
double START_TRANSACTION_RATE_WINDOW;
double TAG_THROTTLE_RATE_WINDOW;
double START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET;
double TAG_THROTTLE_MAX_EMPTY_QUEUE_BUDGET;
int START_TRANSACTION_MAX_QUEUE_SIZE;
int KEY_LOCATION_MAX_QUEUE_SIZE;
int TENANT_ID_REQUEST_MAX_QUEUE_SIZE;

View File

@ -833,8 +833,12 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
state int64_t transactionCount = 0;
state int64_t batchTransactionCount = 0;
state GrvTransactionRateInfo normalRateInfo(10);
state GrvTransactionRateInfo batchRateInfo(0);
state GrvTransactionRateInfo normalRateInfo(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW,
SERVER_KNOBS->START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET,
/*rate=*/10);
state GrvTransactionRateInfo batchRateInfo(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW,
SERVER_KNOBS->START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET,
/*rate=*/0);
state Deque<GetReadVersionRequest> systemQueue;
state Deque<GetReadVersionRequest> defaultQueue;

View File

@ -40,7 +40,8 @@ void GrvProxyTagThrottler::TagQueue::setRate(double rate) {
if (rateInfo.present()) {
rateInfo.get().setRate(rate);
} else {
rateInfo = GrvTransactionRateInfo(rate);
rateInfo = GrvTransactionRateInfo(
SERVER_KNOBS->TAG_THROTTLE_RATE_WINDOW, SERVER_KNOBS->TAG_THROTTLE_MAX_EMPTY_QUEUE_BUDGET, rate);
}
}

View File

@ -24,9 +24,9 @@
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // must be last include
GrvTransactionRateInfo::GrvTransactionRateInfo(double rate)
: rate(rate), smoothRate(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW),
smoothReleased(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW) {
GrvTransactionRateInfo::GrvTransactionRateInfo(double rateWindow, double maxEmptyQueueBudget, double rate)
: rateWindow(rateWindow), maxEmptyQueueBudget(maxEmptyQueueBudget), rate(rate), smoothRate(rateWindow),
smoothReleased(rateWindow) {
smoothRate.setTotal(rate);
}
@ -50,14 +50,14 @@ void GrvTransactionRateInfo::endReleaseWindow(int64_t numStarted, bool queueEmpt
// accumulate budget over time in the case that our batches are too big to take advantage of the rate window based
// limits.
//
// Note that "rate window" here indicates a period of SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW seconds,
// Note that "rate window" here indicates a period of rateWindow seconds,
// whereas "release window" is the period between wait statements, with duration indicated by "elapsed."
budget = std::max(0.0, budget + elapsed * (limit - numStarted) / SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW);
budget = std::max(0.0, budget + elapsed * (limit - numStarted) / rateWindow);
// If we are emptying out the queue of requests, then we don't need to carry much budget forward
// If we did keep accumulating budget, then our responsiveness to changes in workflow could be compromised
if (queueEmpty) {
budget = std::min(budget, SERVER_KNOBS->START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET);
budget = std::min(budget, maxEmptyQueueBudget);
}
smoothReleased.addDelta(numStarted);
@ -91,7 +91,7 @@ void GrvTransactionRateInfo::startReleaseWindow() {
// Limit can be negative in the event that we are releasing more transactions than we are allowed (due to the
// use of our budget or because of higher priority transactions).
double releaseRate = smoothRate.smoothTotal() - smoothReleased.smoothRate();
limit = SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW * releaseRate;
limit = rateWindow * releaseRate;
}
static bool isNear(double desired, int64_t actual) {
@ -112,7 +112,7 @@ ACTOR static Future<Void> mockClient(GrvTransactionRateInfo* rateInfo, double de
// Rate limit set at 10, but client attempts 20 transactions per second.
// Client should be throttled to only 10 transactions per second.
TEST_CASE("/GrvTransactionRateInfo/Simple") {
state GrvTransactionRateInfo rateInfo;
state GrvTransactionRateInfo rateInfo(/*rateWindow=*/2.0, /*maxEmptyQueueBudget=*/100, /*rate=*/10);
state int64_t counter;
rateInfo.setRate(10.0);
wait(timeout(mockClient(&rateInfo, 20.0, &counter), 60.0, Void()));

View File

@ -55,7 +55,10 @@ class GrvProxyTagThrottler {
Deque<DelayedRequest> requests;
TagQueue() = default;
explicit TagQueue(double rate) : rateInfo(rate) {}
explicit TagQueue(double rate)
: rateInfo(GrvTransactionRateInfo(SERVER_KNOBS->TAG_THROTTLE_RATE_WINDOW,
SERVER_KNOBS->TAG_THROTTLE_MAX_EMPTY_QUEUE_BUDGET,
rate)) {}
void setRate(double rate);
bool isMaxThrottled(double maxThrottleDuration) const;

View File

@ -34,7 +34,9 @@
//
// Smoothers are used to avoid turbulent throttling behaviour.
class GrvTransactionRateInfo {
double rate = 0.0;
double rateWindow{ 1.0 };
double maxEmptyQueueBudget{ 0.0 };
double rate{ 0.0 };
double limit{ 0.0 };
double budget{ 0.0 };
bool disabled{ true };
@ -42,7 +44,7 @@ class GrvTransactionRateInfo {
Smoother smoothReleased;
public:
explicit GrvTransactionRateInfo(double rate = 0.0);
GrvTransactionRateInfo(double rateWindow, double maxEmptyQueueBudget, double rate);
// Determines the number of transactions that this proxy is allowed to release
// in this release window.