Merge branch 'main' of https://github.com/apple/foundationdb into fix-head

This commit is contained in:
Jingyu Zhou 2022-11-01 20:25:41 -07:00
commit 89857c4be0
8 changed files with 60 additions and 56 deletions

View File

@ -273,7 +273,6 @@ void ClientKnobs::initialize(Randomize randomize) {
init( WRITE_COST_BYTE_FACTOR, 16384 ); if( randomize && BUGGIFY ) WRITE_COST_BYTE_FACTOR = 4096;
init( READ_COST_BYTE_FACTOR, 16384 ); if( randomize && BUGGIFY ) READ_COST_BYTE_FACTOR = 4096;
init( GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO, 5.0 );
init( PROXY_MAX_TAG_THROTTLE_DURATION, 5.0 ); if( randomize && BUGGIFY ) PROXY_MAX_TAG_THROTTLE_DURATION = 0.5;
// busyness reporting
init( BUSYNESS_SPIKE_START_THRESHOLD, 0.100 );

View File

@ -736,6 +736,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( GLOBAL_TAG_THROTTLING_FOLDING_TIME, 10.0 );
init( GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED, 10 );
init( GLOBAL_TAG_THROTTLING_TAG_EXPIRE_AFTER, 240.0 );
init( PROXY_MAX_TAG_THROTTLE_DURATION, 5.0 ); if( randomize && BUGGIFY ) PROXY_MAX_TAG_THROTTLE_DURATION = 0.5;
init( GLOBAL_TAG_THROTTLING_PROXY_LOGGING_INTERVAL, 60.0 );
//Storage Metrics

View File

@ -264,8 +264,6 @@ public:
int64_t READ_COST_BYTE_FACTOR; // Used to round up the cost of read operations
// Cost multiplier for writes (because write operations are more expensive than reads):
double GLOBAL_TAG_THROTTLING_RW_FUNGIBILITY_RATIO;
double PROXY_MAX_TAG_THROTTLE_DURATION; // Maximum duration that a transaction can be tag throttled by proxy before
// being rejected
// busyness reporting
double BUSYNESS_SPIKE_START_THRESHOLD;

View File

@ -636,6 +636,8 @@ public:
// Global tag throttler forgets about throughput from a tag once no new transactions from that
// tag have been received for this duration (in seconds):
int64_t GLOBAL_TAG_THROTTLING_TAG_EXPIRE_AFTER;
// Maximum duration that a transaction can be tag throttled by proxy before being rejected
double PROXY_MAX_TAG_THROTTLE_DURATION;
// Interval at which latency bands are logged for each tag on grv proxy
double GLOBAL_TAG_THROTTLING_PROXY_LOGGING_INTERVAL;

View File

@ -216,7 +216,8 @@ struct GrvProxyData {
dbgid,
SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
updateCommitRequests(0), lastCommitTime(0), version(0), minKnownCommittedVersion(invalidVersion) {}
updateCommitRequests(0), lastCommitTime(0), version(0), minKnownCommittedVersion(invalidVersion),
tagThrottler(SERVER_KNOBS->PROXY_MAX_TAG_THROTTLE_DURATION) {}
};
ACTOR Future<Void> healthMetricsRequestServer(GrvProxyInterface grvProxy,

View File

@ -33,8 +33,8 @@ void GrvProxyTransactionTagThrottler::DelayedRequest::updateProxyTagThrottledDur
latencyBandsMap.addMeasurement(tag, req.proxyTagThrottledDuration, count);
}
bool GrvProxyTransactionTagThrottler::DelayedRequest::isMaxThrottled() const {
return now() - startTime > CLIENT_KNOBS->PROXY_MAX_TAG_THROTTLE_DURATION;
bool GrvProxyTransactionTagThrottler::DelayedRequest::isMaxThrottled(double maxThrottleDuration) const {
return now() - startTime > maxThrottleDuration;
}
void GrvProxyTransactionTagThrottler::TagQueue::setRate(double rate) {
@ -45,8 +45,8 @@ void GrvProxyTransactionTagThrottler::TagQueue::setRate(double rate) {
}
}
bool GrvProxyTransactionTagThrottler::TagQueue::isMaxThrottled() const {
return !requests.empty() && requests.front().isMaxThrottled();
bool GrvProxyTransactionTagThrottler::TagQueue::isMaxThrottled(double maxThrottleDuration) const {
return !requests.empty() && requests.front().isMaxThrottled(maxThrottleDuration);
}
void GrvProxyTransactionTagThrottler::TagQueue::rejectRequests(LatencyBandsMap& latencyBandsMap) {
@ -59,8 +59,9 @@ void GrvProxyTransactionTagThrottler::TagQueue::rejectRequests(LatencyBandsMap&
}
}
GrvProxyTransactionTagThrottler::GrvProxyTransactionTagThrottler()
: latencyBandsMap("GrvProxyTagThrottler",
GrvProxyTransactionTagThrottler::GrvProxyTransactionTagThrottler(double maxThrottleDuration)
: maxThrottleDuration(maxThrottleDuration),
latencyBandsMap("GrvProxyTagThrottler",
deterministicRandom()->randomUniqueID(),
SERVER_KNOBS->GLOBAL_TAG_THROTTLING_PROXY_LOGGING_INTERVAL,
SERVER_KNOBS->GLOBAL_TAG_THROTTLING_MAX_TAGS_TRACKED) {}
@ -170,7 +171,7 @@ void GrvProxyTransactionTagThrottler::releaseTransactions(double elapsed,
// Cannot release any more transaction from this tag (don't push the tag queue handle back into
// pqOfQueues)
CODE_PROBE(true, "GrvProxyTransactionTagThrottler throttling transaction");
if (tagQueueHandle.queue->isMaxThrottled()) {
if (tagQueueHandle.queue->isMaxThrottled(maxThrottleDuration)) {
// Requests in this queue have been throttled too long and errors
// should be sent to clients.
tagQueueHandle.queue->rejectRequests(latencyBandsMap);
@ -305,6 +306,7 @@ ACTOR static Future<Void> mockServer(GrvProxyTransactionTagThrottler* throttler)
outBatchPriority.front().reply.send(GetReadVersionReply{});
outBatchPriority.pop_front();
}
TraceEvent("HERE_ServerProcessing").detail("Size", outDefaultPriority.size());
while (!outDefaultPriority.empty()) {
outDefaultPriority.front().reply.send(GetReadVersionReply{});
outDefaultPriority.pop_front();
@ -329,7 +331,7 @@ static bool isNear(double desired, int64_t actual) {
// Rate limit set at 10, but client attempts 20 transactions per second.
// Client should be throttled to only 10 transactions per second.
TEST_CASE("/GrvProxyTransactionTagThrottler/Simple") {
state GrvProxyTransactionTagThrottler throttler;
state GrvProxyTransactionTagThrottler throttler(5.0);
state TagSet tagSet;
state TransactionTagMap<uint32_t> counters;
{
@ -349,7 +351,7 @@ TEST_CASE("/GrvProxyTransactionTagThrottler/Simple") {
// Clients share the available 30 transaction/second budget
TEST_CASE("/GrvProxyTransactionTagThrottler/MultiClient") {
state GrvProxyTransactionTagThrottler throttler;
state GrvProxyTransactionTagThrottler throttler(5.0);
state TagSet tagSet;
state TransactionTagMap<uint32_t> counters;
{
@ -374,7 +376,7 @@ TEST_CASE("/GrvProxyTransactionTagThrottler/MultiClient") {
// Test processing GetReadVersionRequests that batch several transactions
TEST_CASE("/GrvProxyTransactionTagThrottler/Batch") {
state GrvProxyTransactionTagThrottler throttler;
state GrvProxyTransactionTagThrottler throttler(5.0);
state TagSet tagSet;
state TransactionTagMap<uint32_t> counters;
{
@ -395,7 +397,7 @@ TEST_CASE("/GrvProxyTransactionTagThrottler/Batch") {
// Tests cleanup of tags that are no longer throttled.
TEST_CASE("/GrvProxyTransactionTagThrottler/Cleanup1") {
GrvProxyTransactionTagThrottler throttler;
GrvProxyTransactionTagThrottler throttler(5.0);
for (int i = 0; i < 1000; ++i) {
auto const tag = getRandomTag();
TransactionTagMap<double> rates;
@ -408,7 +410,7 @@ TEST_CASE("/GrvProxyTransactionTagThrottler/Cleanup1") {
// Tests cleanup of tags once queues have been emptied
TEST_CASE("/GrvProxyTransactionTagThrottler/Cleanup2") {
GrvProxyTransactionTagThrottler throttler;
GrvProxyTransactionTagThrottler throttler(5.0);
{
GetReadVersionRequest req;
req.tags["sampleTag"_sr] = 1;
@ -432,7 +434,7 @@ TEST_CASE("/GrvProxyTransactionTagThrottler/Cleanup2") {
// Tests that unthrottled transactions are released in FIFO order, even when they
// have different tags
TEST_CASE("/GrvProxyTransactionTagThrottler/Fifo") {
state GrvProxyTransactionTagThrottler throttler;
state GrvProxyTransactionTagThrottler throttler(5.0);
state Future<Void> server = mockServer(&throttler);
wait(mockFifoClient(&throttler));
return Void();

View File

@ -47,7 +47,7 @@ class GrvProxyTransactionTagThrottler {
: req(req), startTime(now()), sequenceNumber(++lastSequenceNumber) {}
void updateProxyTagThrottledDuration(LatencyBandsMap&);
bool isMaxThrottled() const;
bool isMaxThrottled(double maxThrottleDuration) const;
};
struct TagQueue {
@ -58,18 +58,19 @@ class GrvProxyTransactionTagThrottler {
explicit TagQueue(double rate) : rateInfo(rate) {}
void setRate(double rate);
bool isMaxThrottled() const;
bool isMaxThrottled(double maxThrottleDuration) const;
void rejectRequests(LatencyBandsMap&);
};
// Track the budgets for each tag
TransactionTagMap<TagQueue> queues;
double maxThrottleDuration;
// Track latency bands for each tag
LatencyBandsMap latencyBandsMap;
public:
GrvProxyTransactionTagThrottler();
explicit GrvProxyTransactionTagThrottler(double maxThrottleDuration);
// Called with rates received from ratekeeper
void updateRates(TransactionTagMap<double> const& newRates);

View File

@ -42,21 +42,21 @@
#endif
// A multi user lock with a concurrent holder limit where waiters request a lock with a priority
// id and are granted locks based on a total concurrency and relative importants of the priority
// ids defined.
// id and are granted locks based on a total concurrency and relative weights of the current active
// priorities. Priority id's must start at 0 and are sequential integers.
//
// Scheduling logic
// Let
// launchLimits[n] = configured amount from the launchLimit vector for priority n
// weights[n] = configured weight for priority n
// waiters[n] = the number of waiters for priority n
// runnerCounts[n] = number of runners at priority n
//
// totalActiveLaunchLimits = sum of limits for all priorities with waiters[n] > 0
// When waiters[n] becomes == 0, totalActiveLaunchLimits -= launchLimits[n]
// When waiters[n] becomes > 0, totalActiveLaunchLimits += launchLimits[n]
// totalPendingWeights = sum of weights for all priorities with waiters[n] > 0
// When waiters[n] becomes == 0, totalPendingWeights -= weights[n]
// When waiters[n] becomes > 0, totalPendingWeights += weights[n]
//
// The total capacity of a priority to be considered when launching tasks is
// ceil(launchLimits[n] / totalLimits * concurrency)
// ceil(weights[n] / totalPendingWeights * concurrency)
//
// For improved memory locality the properties mentioned above are stored as priorities[n].<property>
// in the actual implementation.
@ -80,15 +80,15 @@ public:
Promise<Void> promise;
};
PriorityMultiLock(int concurrency, std::string launchLimits)
: PriorityMultiLock(concurrency, parseStringToVector<int>(launchLimits, ',')) {}
PriorityMultiLock(int concurrency, std::string weights)
: PriorityMultiLock(concurrency, parseStringToVector<int>(weights, ',')) {}
PriorityMultiLock(int concurrency, std::vector<int> launchLimitsByPriority)
: concurrency(concurrency), available(concurrency), waiting(0), totalActiveLaunchLimits(0), releaseDebugID(0) {
PriorityMultiLock(int concurrency, std::vector<int> weightsByPriority)
: concurrency(concurrency), available(concurrency), waiting(0), totalPendingWeights(0), releaseDebugID(0) {
priorities.resize(launchLimitsByPriority.size());
priorities.resize(weightsByPriority.size());
for (int i = 0; i < priorities.size(); ++i) {
priorities[i].launchLimit = launchLimitsByPriority[i];
priorities[i].weight = weightsByPriority[i];
}
fRunner = runner(this);
@ -99,17 +99,16 @@ public:
Future<Lock> lock(int priority = 0) {
Priority& p = priorities[priority];
Queue& q = p.queue;
Waiter w;
// If this priority currently has no waiters
if (q.empty()) {
// Add this priority's launch limit to totalLimits
totalActiveLaunchLimits += p.launchLimit;
// Add this priority's weight to the total for priorities with pending work
totalPendingWeights += p.weight;
// If there are slots available and the priority has capacity then don't make the caller wait
if (available > 0 && p.runners < currentCapacity(p.launchLimit)) {
// Remove this priority's launch limit from the total since it will remain empty
totalActiveLaunchLimits -= p.launchLimit;
if (available > 0 && p.runners < currentCapacity(p.weight)) {
// Remove this priority's weight from the total since it will remain empty
totalPendingWeights -= p.weight;
// Return a Lock to the caller
Lock lock;
@ -119,6 +118,8 @@ public:
return lock;
}
}
Waiter w;
q.push_back(w);
++waiting;
@ -144,7 +145,7 @@ public:
}
std::string s = format("{ ptr=%p concurrency=%d available=%d running=%d waiting=%d runnersQueue=%d "
"runnersDone=%d activeLimits=%d ",
"runnersDone=%d pendingWeights=%d ",
this,
concurrency,
available,
@ -152,7 +153,7 @@ public:
waiting,
runners.size(),
runnersDone,
totalActiveLaunchLimits);
totalPendingWeights);
for (int i = 0; i < priorities.size(); ++i) {
s += format("p%d:{%s} ", i, priorities[i].toString(this).c_str());
@ -196,27 +197,27 @@ private:
int available;
// Total waiters across all priorities
int waiting;
// Sum of launch limits for all priorities with 1 or more waiters
int totalActiveLaunchLimits;
// Sum of weights for all priorities with 1 or more waiters
int totalPendingWeights;
typedef Deque<Waiter> Queue;
struct Priority {
Priority() : runners(0), launchLimit(0) {}
Priority() : runners(0), weight(0) {}
// Queue of waiters at this priority
Queue queue;
// Number of runners at this priority
int runners;
// Configured launch limit for this priority
int launchLimit;
// Configured weight for this priority
int weight;
std::string toString(const PriorityMultiLock* pml) const {
return format("limit=%d run=%d wait=%d cap=%d",
launchLimit,
return format("weight=%d run=%d wait=%d cap=%d",
weight,
runners,
queue.size(),
queue.empty() ? 0 : pml->currentCapacity(launchLimit));
queue.empty() ? 0 : pml->currentCapacity(weight));
}
};
@ -270,10 +271,10 @@ private:
// Current maximum running tasks for the specified priority, which must have waiters
// or the result is undefined
int currentCapacity(int launchLimit) const {
int currentCapacity(int weight) const {
// The total concurrency allowed for this priority at present is the total concurrency times
// priority's launch limit divided by the total launch limits for all priorities with waiters.
return ceil((float)launchLimit / totalActiveLaunchLimits * concurrency);
// priority's weight divided by the total weights for all priorities with waiters.
return ceil((float)weight / totalPendingWeights * concurrency);
}
ACTOR static Future<Void> runner(PriorityMultiLock* self) {
@ -329,8 +330,7 @@ private:
priority,
self->toString().c_str());
if (!pPriority->queue.empty() &&
pPriority->runners < self->currentCapacity(pPriority->launchLimit)) {
if (!pPriority->queue.empty() && pPriority->runners < self->currentCapacity(pPriority->weight)) {
break;
}
}
@ -340,9 +340,9 @@ private:
Waiter w = queue.front();
queue.pop_front();
// If this priority is now empty, subtract its launch limit from totalLimits
// If this priority is now empty, subtract its weight from the total pending weights
if (queue.empty()) {
self->totalActiveLaunchLimits -= pPriority->launchLimit;
self->totalPendingWeights -= pPriority->weight;
pml_debug_printf(" emptied priority line %d priority=%d %s\n",
__LINE__,