Modularize and comment GrvTransactionRateInfo

This commit is contained in:
sfc-gh-tclinkenbeard 2022-10-03 10:09:09 -07:00
parent 9795347aa9
commit 4c973c11ad
3 changed files with 199 additions and 88 deletions

View File

@ -24,11 +24,12 @@
#include "fdbclient/Notified.h" #include "fdbclient/Notified.h"
#include "fdbclient/TransactionLineage.h" #include "fdbclient/TransactionLineage.h"
#include "fdbclient/Tuple.h" #include "fdbclient/Tuple.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/LogSystemDiskQueueAdapter.h"
#include "fdbclient/CommitProxyInterface.h" #include "fdbclient/CommitProxyInterface.h"
#include "fdbclient/GrvProxyInterface.h" #include "fdbclient/GrvProxyInterface.h"
#include "fdbclient/VersionVector.h" #include "fdbclient/VersionVector.h"
#include "fdbserver/GrvTransactionRateInfo.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/LogSystemDiskQueueAdapter.h"
#include "fdbserver/WaitFailure.h" #include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h" #include "fdbserver/WorkerInterface.actor.h"
#include "fdbrpc/sim_validation.h" #include "fdbrpc/sim_validation.h"
@ -158,83 +159,6 @@ struct GrvProxyStats {
} }
}; };
struct GrvTransactionRateInfo {
double rate;
double limit;
double budget;
bool disabled;
Smoother smoothRate;
Smoother smoothReleased;
GrvTransactionRateInfo(double rate = 0.0)
: rate(rate), limit(0), budget(0), disabled(true), smoothRate(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW),
smoothReleased(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW) {}
void reset() {
// Determine the number of transactions that this proxy is allowed to release
// Roughly speaking, this is done by computing the number of transactions over some historical window that we
// could have started but didn't, and making that our limit. More precisely, we track a smoothed rate limit and
// release rate, the difference of which is the rate of additional transactions that we could have released
// based on that window. Then we multiply by the window size to get a number of transactions.
//
// Limit can be negative in the event that we are releasing more transactions than we are allowed (due to the
// use of our budget or because of higher priority transactions).
double releaseRate = smoothRate.smoothTotal() - smoothReleased.smoothRate();
limit = SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW * releaseRate;
}
bool canStart(int64_t numAlreadyStarted, int64_t count) const {
return numAlreadyStarted + count <=
std::min(limit + budget, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START);
}
void updateBudget(int64_t numStartedAtPriority, bool queueEmptyAtPriority, double elapsed) {
// Update the budget to accumulate any extra capacity available or remove any excess that was used.
// The actual delta is the portion of the limit we didn't use multiplied by the fraction of the window that
// elapsed.
//
// We may have exceeded our limit due to the budget or because of higher priority transactions, in which case
// this delta will be negative. The delta can also be negative in the event that our limit was negative, which
// can happen if we had already started more transactions in our window than our rate would have allowed.
//
// This budget has the property that when the budget is required to start transactions (because batches are
// big), the sum limit+budget will increase linearly from 0 to the batch size over time and decrease by the
// batch size upon starting a batch. In other words, this works equivalently to a model where we linearly
// accumulate budget over time in the case that our batches are too big to take advantage of the window based
// limits.
budget = std::max(
0.0, budget + elapsed * (limit - numStartedAtPriority) / SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW);
// If we are emptying out the queue of requests, then we don't need to carry much budget forward
// If we did keep accumulating budget, then our responsiveness to changes in workflow could be compromised
if (queueEmptyAtPriority) {
budget = std::min(budget, SERVER_KNOBS->START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET);
}
smoothReleased.addDelta(numStartedAtPriority);
}
void disable() {
disabled = true;
// Use smoothRate.setTotal(0) instead of setting rate to 0 so txns will not be throttled immediately.
smoothRate.setTotal(0);
}
void setRate(double rate) {
ASSERT(rate >= 0 && rate != std::numeric_limits<double>::infinity() && !std::isnan(rate));
this->rate = rate;
if (disabled) {
smoothRate.reset(rate);
disabled = false;
} else {
smoothRate.setTotal(rate);
}
}
};
struct GrvProxyData { struct GrvProxyData {
GrvProxyInterface proxy; GrvProxyInterface proxy;
UID dbgid; UID dbgid;
@ -622,7 +546,7 @@ ACTOR Future<Void> queueGetReadVersionRequests(
} else { } else {
// Return error for batch_priority GRV requests // Return error for batch_priority GRV requests
int64_t proxiesCount = std::max((int)db->get().client.grvProxies.size(), 1); int64_t proxiesCount = std::max((int)db->get().client.grvProxies.size(), 1);
if (batchRateInfo->rate <= (1.0 / proxiesCount)) { if (batchRateInfo->getRate() <= (1.0 / proxiesCount)) {
req.reply.sendError(batch_transaction_throttled()); req.reply.sendError(batch_transaction_throttled());
stats->txnThrottled += req.transactionCount; stats->txnThrottled += req.transactionCount;
} else { } else {
@ -960,11 +884,11 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
elapsed = 1e-15; elapsed = 1e-15;
} }
normalRateInfo.reset(); normalRateInfo.startEpoch();
batchRateInfo.reset(); batchRateInfo.startEpoch();
grvProxyData->stats.transactionLimit = normalRateInfo.limit; grvProxyData->stats.transactionLimit = normalRateInfo.getLimit();
grvProxyData->stats.batchTransactionLimit = batchRateInfo.limit; grvProxyData->stats.batchTransactionLimit = batchRateInfo.getLimit();
int transactionsStarted[2] = { 0, 0 }; int transactionsStarted[2] = { 0, 0 };
int systemTransactionsStarted[2] = { 0, 0 }; int systemTransactionsStarted[2] = { 0, 0 };
@ -1071,11 +995,11 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
transactionCount += transactionsStarted[0] + transactionsStarted[1]; transactionCount += transactionsStarted[0] + transactionsStarted[1];
batchTransactionCount += batchTotalStarted; batchTransactionCount += batchTotalStarted;
normalRateInfo.updateBudget( normalRateInfo.endEpoch(
systemTotalStarted + normalTotalStarted, systemQueue.empty() && defaultQueue.empty(), elapsed); systemTotalStarted + normalTotalStarted, systemQueue.empty() && defaultQueue.empty(), elapsed);
batchRateInfo.updateBudget(systemTotalStarted + normalTotalStarted + batchTotalStarted, batchRateInfo.endEpoch(systemTotalStarted + normalTotalStarted + batchTotalStarted,
systemQueue.empty() && defaultQueue.empty() && batchQueue.empty(), systemQueue.empty() && defaultQueue.empty() && batchQueue.empty(),
elapsed); elapsed);
if (debugID.present()) { if (debugID.present()) {
g_traceBatch.addEvent("TransactionDebug", g_traceBatch.addEvent("TransactionDebug",

View File

@ -0,0 +1,118 @@
/*
* GrvTransactionRateInfo.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fdbserver/GrvTransactionRateInfo.h"
#include "fdbserver/Knobs.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // must be last include
GrvTransactionRateInfo::GrvTransactionRateInfo(double rate)
: rate(rate), smoothRate(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW),
smoothReleased(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW) {}
bool GrvTransactionRateInfo::canStart(int64_t numAlreadyStarted, int64_t count) const {
return numAlreadyStarted + count <=
std::min(limit + budget, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START);
}
void GrvTransactionRateInfo::endEpoch(int64_t numStartedAtPriority, bool queueEmptyAtPriority, double elapsed) {
// Update the budget to accumulate any extra capacity available or remove any excess that was used.
// The actual delta is the portion of the limit we didn't use multiplied by the fraction of the window that
// elapsed.
//
// We may have exceeded our limit due to the budget or because of higher priority transactions, in which case
// this delta will be negative. The delta can also be negative in the event that our limit was negative, which
// can happen if we had already started more transactions in our window than our rate would have allowed.
//
// This budget has the property that when the budget is required to start transactions (because batches are
// big), the sum limit+budget will increase linearly from 0 to the batch size over time and decrease by the
// batch size upon starting a batch. In other words, this works equivalently to a model where we linearly
// accumulate budget over time in the case that our batches are too big to take advantage of the window based
// limits.
budget =
std::max(0.0, budget + elapsed * (limit - numStartedAtPriority) / SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW);
// If we are emptying out the queue of requests, then we don't need to carry much budget forward
// If we did keep accumulating budget, then our responsiveness to changes in workflow could be compromised
if (queueEmptyAtPriority) {
budget = std::min(budget, SERVER_KNOBS->START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET);
}
smoothReleased.addDelta(numStartedAtPriority);
}
void GrvTransactionRateInfo::disable() {
disabled = true;
// Use smoothRate.setTotal(0) instead of setting rate to 0 so txns will not be throttled immediately.
smoothRate.setTotal(0);
}
void GrvTransactionRateInfo::setRate(double rate) {
ASSERT(rate >= 0 && rate != std::numeric_limits<double>::infinity() && !std::isnan(rate));
this->rate = rate;
if (disabled) {
smoothRate.reset(rate);
disabled = false;
} else {
smoothRate.setTotal(rate);
}
}
void GrvTransactionRateInfo::startEpoch() {
// Determine the number of transactions that this proxy is allowed to release
// Roughly speaking, this is done by computing the number of transactions over some historical window that we
// could have started but didn't, and making that our limit. More precisely, we track a smoothed rate limit and
// release rate, the difference of which is the rate of additional transactions that we could have released
// based on that window. Then we multiply by the window size to get a number of transactions.
//
// Limit can be negative in the event that we are releasing more transactions than we are allowed (due to the
// use of our budget or because of higher priority transactions).
double releaseRate = smoothRate.smoothTotal() - smoothReleased.smoothRate();
limit = SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW * releaseRate;
}
static bool isNear(double desired, int64_t actual) {
return std::abs(desired - actual) * 10 < desired;
}
ACTOR static Future<Void> mockClient(GrvTransactionRateInfo* rateInfo, double desiredRate, int64_t* counter) {
loop {
state double elapsed = (0.9 + 0.2 * deterministicRandom()->random01()) / desiredRate;
wait(delay(elapsed));
rateInfo->startEpoch();
int started = rateInfo->canStart(0, 1) ? 1 : 0;
*counter += started;
rateInfo->endEpoch(started, false, elapsed);
}
}
// Rate limit set at 10, but client attempts 20 transactions per second.
// Client should be throttled to only 10 transactions per second.
TEST_CASE("/GrvTransactionRateInfo/Simple") {
state GrvTransactionRateInfo rateInfo;
state int64_t counter;
rateInfo.setRate(10.0);
wait(timeout(mockClient(&rateInfo, 20.0, &counter), 60.0, Void()));
TraceEvent("GrvTransactionRateInfoTest").detail("Counter", counter);
ASSERT(isNear(60.0 * 20.0, counter));
return Void();
}

View File

@ -0,0 +1,69 @@
/*
* GrvTransactionRateInfo.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "fdbrpc/Smoother.h"
// Used by GRV Proxy to enforce rate limits received from the Ratekeeper.
//
// Between delays, the GrvTransactionRateInfo executes an "epoch" starting
// with a call to the startEpoch method. Within this epoch, transactions are
// released while canStart returns true. At the end of the epoch, the
// endEpoch method is called, and the budget is updated to add or
// remove capacity.
//
// Meanwhile, the desired rate is updated through the setRate method.
//
// Smoothers are used to avoid turbulent throttling behaviour.
class GrvTransactionRateInfo {
double rate{ 0.0 };
double limit{ 0.0 };
double budget{ 0.0 };
bool disabled{ true };
Smoother smoothRate;
Smoother smoothReleased;
public:
explicit GrvTransactionRateInfo(double rate = 0.0);
// Determines the number of transactions that this proxy is allowed to release
// in this epoch.
void startEpoch();
// Checks if a "count" new transactions can be released, given that
// "numAlreadyStarted" transactions have already been released in the
// current epoch.
bool canStart(int64_t numAlreadyStarted, int64_t count) const;
// Updates the budget to accumulate any extra capacity available or remove any excess that was used.
// Call at the end of an epoch.
void endEpoch(int64_t numStartedAtPriority, bool queueEmptyAtPriority, double elapsed);
// Smoothly sets rate. If currently disabled, reenable
void setRate(double rate);
// Smoothly sets transaction rate to 0. Call disable when new rates have not been
// set for a sufficiently long period of time.
void disable();
double getRate() const { return rate; }
double getLimit() const { return limit; }
};