Change GrvProxy tag throttling algorithm.
The new algorithm assumes there is only one tag per request, so queues are partitioned by tag. This is a more efficient approach than the old algorithm.
This commit is contained in:
parent
bbf69b2d0a
commit
24a0dd9f17
|
@ -27,10 +27,10 @@
|
||||||
#include "fdbclient/CommitProxyInterface.h"
|
#include "fdbclient/CommitProxyInterface.h"
|
||||||
#include "fdbclient/GrvProxyInterface.h"
|
#include "fdbclient/GrvProxyInterface.h"
|
||||||
#include "fdbclient/VersionVector.h"
|
#include "fdbclient/VersionVector.h"
|
||||||
|
#include "fdbserver/GrvProxyTransactionTagThrottler.h"
|
||||||
#include "fdbserver/GrvTransactionRateInfo.h"
|
#include "fdbserver/GrvTransactionRateInfo.h"
|
||||||
#include "fdbserver/LogSystem.h"
|
#include "fdbserver/LogSystem.h"
|
||||||
#include "fdbserver/LogSystemDiskQueueAdapter.h"
|
#include "fdbserver/LogSystemDiskQueueAdapter.h"
|
||||||
#include "fdbserver/TagQueue.h"
|
|
||||||
#include "fdbserver/WaitFailure.h"
|
#include "fdbserver/WaitFailure.h"
|
||||||
#include "fdbserver/WorkerInterface.actor.h"
|
#include "fdbserver/WorkerInterface.actor.h"
|
||||||
#include "fdbrpc/sim_validation.h"
|
#include "fdbrpc/sim_validation.h"
|
||||||
|
@ -362,7 +362,7 @@ ACTOR Future<Void> getRate(UID myID,
|
||||||
GetHealthMetricsReply* detailedHealthMetricsReply,
|
GetHealthMetricsReply* detailedHealthMetricsReply,
|
||||||
TransactionTagMap<uint64_t>* transactionTagCounter,
|
TransactionTagMap<uint64_t>* transactionTagCounter,
|
||||||
PrioritizedTransactionTagMap<ClientTagThrottleLimits>* clientThrottledTags,
|
PrioritizedTransactionTagMap<ClientTagThrottleLimits>* clientThrottledTags,
|
||||||
TagQueue* tagQueue,
|
GrvProxyTransactionTagThrottler* tagThrottler,
|
||||||
GrvProxyStats* stats,
|
GrvProxyStats* stats,
|
||||||
GrvProxyData* proxyData) {
|
GrvProxyData* proxyData) {
|
||||||
state Future<Void> nextRequestTimer = Never();
|
state Future<Void> nextRequestTimer = Never();
|
||||||
|
@ -423,7 +423,7 @@ ACTOR Future<Void> getRate(UID myID,
|
||||||
*clientThrottledTags = std::move(rep.clientThrottledTags.get());
|
*clientThrottledTags = std::move(rep.clientThrottledTags.get());
|
||||||
}
|
}
|
||||||
if (rep.proxyThrottledTags.present()) {
|
if (rep.proxyThrottledTags.present()) {
|
||||||
tagQueue->updateRates(rep.proxyThrottledTags.get());
|
tagThrottler->updateRates(rep.proxyThrottledTags.get());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
when(wait(leaseTimeout)) {
|
when(wait(leaseTimeout)) {
|
||||||
|
@ -469,7 +469,7 @@ ACTOR Future<Void> queueGetReadVersionRequests(Reference<AsyncVar<ServerDBInfo>
|
||||||
GrvProxyStats* stats,
|
GrvProxyStats* stats,
|
||||||
GrvTransactionRateInfo* batchRateInfo,
|
GrvTransactionRateInfo* batchRateInfo,
|
||||||
TransactionTagMap<uint64_t>* transactionTagCounter,
|
TransactionTagMap<uint64_t>* transactionTagCounter,
|
||||||
TagQueue* tagQueue) {
|
GrvProxyTransactionTagThrottler* tagThrottler) {
|
||||||
getCurrentLineage()->modify(&TransactionLineage::operation) =
|
getCurrentLineage()->modify(&TransactionLineage::operation) =
|
||||||
TransactionLineage::Operation::GetConsistentReadVersion;
|
TransactionLineage::Operation::GetConsistentReadVersion;
|
||||||
loop choose {
|
loop choose {
|
||||||
|
@ -537,7 +537,7 @@ ACTOR Future<Void> queueGetReadVersionRequests(Reference<AsyncVar<ServerDBInfo>
|
||||||
stats->txnDefaultPriorityStartIn += req.transactionCount;
|
stats->txnDefaultPriorityStartIn += req.transactionCount;
|
||||||
++stats->defaultGRVQueueSize;
|
++stats->defaultGRVQueueSize;
|
||||||
if (SERVER_KNOBS->ENFORCE_TAG_THROTTLING_ON_PROXIES) {
|
if (SERVER_KNOBS->ENFORCE_TAG_THROTTLING_ON_PROXIES) {
|
||||||
tagQueue->addRequest(req);
|
tagThrottler->addRequest(req);
|
||||||
} else {
|
} else {
|
||||||
defaultQueue->push_back(req);
|
defaultQueue->push_back(req);
|
||||||
}
|
}
|
||||||
|
@ -554,7 +554,7 @@ ACTOR Future<Void> queueGetReadVersionRequests(Reference<AsyncVar<ServerDBInfo>
|
||||||
stats->txnBatchPriorityStartIn += req.transactionCount;
|
stats->txnBatchPriorityStartIn += req.transactionCount;
|
||||||
++stats->batchGRVQueueSize;
|
++stats->batchGRVQueueSize;
|
||||||
if (SERVER_KNOBS->ENFORCE_TAG_THROTTLING_ON_PROXIES) {
|
if (SERVER_KNOBS->ENFORCE_TAG_THROTTLING_ON_PROXIES) {
|
||||||
tagQueue->addRequest(req);
|
tagThrottler->addRequest(req);
|
||||||
} else {
|
} else {
|
||||||
batchQueue->push_back(req);
|
batchQueue->push_back(req);
|
||||||
}
|
}
|
||||||
|
@ -823,7 +823,7 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
|
||||||
state int64_t batchTransactionCount = 0;
|
state int64_t batchTransactionCount = 0;
|
||||||
state GrvTransactionRateInfo normalRateInfo(10);
|
state GrvTransactionRateInfo normalRateInfo(10);
|
||||||
state GrvTransactionRateInfo batchRateInfo(0);
|
state GrvTransactionRateInfo batchRateInfo(0);
|
||||||
state TagQueue tagQueue;
|
state GrvProxyTransactionTagThrottler tagThrottler;
|
||||||
|
|
||||||
state SpannedDeque<GetReadVersionRequest> systemQueue("GP:transactionStarterSystemQueue"_loc);
|
state SpannedDeque<GetReadVersionRequest> systemQueue("GP:transactionStarterSystemQueue"_loc);
|
||||||
state SpannedDeque<GetReadVersionRequest> defaultQueue("GP:transactionStarterDefaultQueue"_loc);
|
state SpannedDeque<GetReadVersionRequest> defaultQueue("GP:transactionStarterDefaultQueue"_loc);
|
||||||
|
@ -850,7 +850,7 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
|
||||||
detailedHealthMetricsReply,
|
detailedHealthMetricsReply,
|
||||||
&transactionTagCounter,
|
&transactionTagCounter,
|
||||||
&clientThrottledTags,
|
&clientThrottledTags,
|
||||||
&tagQueue,
|
&tagThrottler,
|
||||||
&grvProxyData->stats,
|
&grvProxyData->stats,
|
||||||
grvProxyData));
|
grvProxyData));
|
||||||
addActor.send(queueGetReadVersionRequests(db,
|
addActor.send(queueGetReadVersionRequests(db,
|
||||||
|
@ -865,7 +865,7 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
|
||||||
&grvProxyData->stats,
|
&grvProxyData->stats,
|
||||||
&batchRateInfo,
|
&batchRateInfo,
|
||||||
&transactionTagCounter,
|
&transactionTagCounter,
|
||||||
&tagQueue));
|
&tagThrottler));
|
||||||
|
|
||||||
while (std::find(db->get().client.grvProxies.begin(), db->get().client.grvProxies.end(), proxy) ==
|
while (std::find(db->get().client.grvProxies.begin(), db->get().client.grvProxies.end(), proxy) ==
|
||||||
db->get().client.grvProxies.end()) {
|
db->get().client.grvProxies.end()) {
|
||||||
|
@ -888,7 +888,7 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
|
||||||
elapsed = 1e-15;
|
elapsed = 1e-15;
|
||||||
}
|
}
|
||||||
|
|
||||||
tagQueue.releaseTransactions(elapsed, defaultQueue, batchQueue);
|
tagThrottler.releaseTransactions(elapsed, defaultQueue, batchQueue);
|
||||||
normalRateInfo.startReleaseWindow();
|
normalRateInfo.startReleaseWindow();
|
||||||
batchRateInfo.startReleaseWindow();
|
batchRateInfo.startReleaseWindow();
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,196 @@
|
||||||
|
#include "fdbserver/GrvProxyTransactionTagThrottler.h"
|
||||||
|
#include "flow/UnitTest.h"
|
||||||
|
#include "flow/actorcompiler.h" // must be last include
|
||||||
|
|
||||||
|
void GrvProxyTransactionTagThrottler::updateRates(TransactionTagMap<double> const& newRates) {
|
||||||
|
for (const auto& [tag, rate] : newRates) {
|
||||||
|
auto it = queues.find(tag);
|
||||||
|
if (it == queues.end()) {
|
||||||
|
queues[tag] = TagQueue(rate);
|
||||||
|
} else {
|
||||||
|
it->second.setRate(rate);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up tags that did not appear in newRates
|
||||||
|
for (auto& [tag, queue] : queues) {
|
||||||
|
if (newRates.find(tag) == newRates.end()) {
|
||||||
|
queue.rateInfo.reset();
|
||||||
|
if (queue.requests.empty()) {
|
||||||
|
// FIXME: Use cleaner method of cleanup
|
||||||
|
queues.erase(tag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void GrvProxyTransactionTagThrottler::addRequest(GetReadVersionRequest const& req) {
|
||||||
|
if (req.tags.empty()) {
|
||||||
|
untaggedRequests.push_back(req);
|
||||||
|
} else {
|
||||||
|
auto const& tag = req.tags.begin()->first;
|
||||||
|
if (req.tags.size() > 1) {
|
||||||
|
// The GrvProxyTransactionTagThrottler assumes that each GetReadVersionRequest
|
||||||
|
// has at most one tag. If a transaction uses multiple tags and
|
||||||
|
// SERVER_KNOBS->ENFORCE_TAG_THROTTLING_ON_PROXIES is enabled, there may be
|
||||||
|
// unexpected behaviour, because only one tag is used for throttling.
|
||||||
|
TraceEvent(SevWarnAlways, "GrvProxyTransactionTagThrottler_MultipleTags")
|
||||||
|
.detail("NumTags", req.tags.size())
|
||||||
|
.detail("UsingTag", printable(tag));
|
||||||
|
}
|
||||||
|
queues[tag].requests.emplace_back(req);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void GrvProxyTransactionTagThrottler::TagQueue::releaseTransactions(
|
||||||
|
double elapsed,
|
||||||
|
SpannedDeque<GetReadVersionRequest>& outBatchPriority,
|
||||||
|
SpannedDeque<GetReadVersionRequest>& outDefaultPriority) {
|
||||||
|
Deque<DelayedRequest> newDelayedRequests;
|
||||||
|
if (rateInfo.present())
|
||||||
|
rateInfo.get().startReleaseWindow();
|
||||||
|
int transactionsReleased = 0;
|
||||||
|
while (!requests.empty()) {
|
||||||
|
auto& delayedReq = requests.front();
|
||||||
|
auto& req = delayedReq.req;
|
||||||
|
auto const count = req.tags.begin()->second;
|
||||||
|
if (!rateInfo.present() || rateInfo.get().canStart(transactionsReleased, count)) {
|
||||||
|
req.proxyTagThrottledDuration = now() - delayedReq.startTime;
|
||||||
|
transactionsReleased += count;
|
||||||
|
if (req.priority == TransactionPriority::BATCH) {
|
||||||
|
outBatchPriority.push_back(req);
|
||||||
|
} else if (req.priority == TransactionPriority::DEFAULT) {
|
||||||
|
outDefaultPriority.push_back(req);
|
||||||
|
} else {
|
||||||
|
// Immediate priority transactions should bypass the GrvProxyTransactionTagThrottler
|
||||||
|
ASSERT(false);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
newDelayedRequests.push_back(delayedReq);
|
||||||
|
}
|
||||||
|
requests.pop_front();
|
||||||
|
}
|
||||||
|
if (rateInfo.present())
|
||||||
|
rateInfo.get().endReleaseWindow(transactionsReleased, false, elapsed);
|
||||||
|
requests = std::move(newDelayedRequests);
|
||||||
|
}
|
||||||
|
|
||||||
|
void GrvProxyTransactionTagThrottler::releaseTransactions(double elapsed,
|
||||||
|
SpannedDeque<GetReadVersionRequest>& outBatchPriority,
|
||||||
|
SpannedDeque<GetReadVersionRequest>& outDefaultPriority) {
|
||||||
|
for (auto& [_, tagQueue] : queues) {
|
||||||
|
tagQueue.releaseTransactions(elapsed, outBatchPriority, outDefaultPriority);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR static Future<Void> mockClient(GrvProxyTransactionTagThrottler* throttler,
|
||||||
|
TransactionPriority priority,
|
||||||
|
TagSet tagSet,
|
||||||
|
int batchSize,
|
||||||
|
double desiredRate,
|
||||||
|
TransactionTagMap<uint32_t>* counters) {
|
||||||
|
state Future<Void> timer;
|
||||||
|
state TransactionTagMap<uint32_t> tags;
|
||||||
|
for (const auto& tag : tagSet) {
|
||||||
|
tags[tag] = batchSize;
|
||||||
|
}
|
||||||
|
loop {
|
||||||
|
timer = delayJittered(static_cast<double>(batchSize) / desiredRate);
|
||||||
|
GetReadVersionRequest req;
|
||||||
|
req.tags = tags;
|
||||||
|
req.priority = priority;
|
||||||
|
throttler->addRequest(req);
|
||||||
|
wait(success(req.reply.getFuture()) && timer);
|
||||||
|
for (auto& [tag, _] : tags) {
|
||||||
|
(*counters)[tag] += batchSize;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ACTOR static Future<Void> mockServer(GrvProxyTransactionTagThrottler* throttler) {
|
||||||
|
state SpannedDeque<GetReadVersionRequest> outBatchPriority("TestGrvProxyTransactionTagThrottler_Batch"_loc);
|
||||||
|
state SpannedDeque<GetReadVersionRequest> outDefaultPriority("TestGrvProxyTransactionTagThrottler_Default"_loc);
|
||||||
|
loop {
|
||||||
|
state double elapsed = (0.009 + 0.002 * deterministicRandom()->random01());
|
||||||
|
wait(delay(elapsed));
|
||||||
|
throttler->releaseTransactions(elapsed, outBatchPriority, outDefaultPriority);
|
||||||
|
while (!outBatchPriority.empty()) {
|
||||||
|
outBatchPriority.front().reply.send(GetReadVersionReply{});
|
||||||
|
outBatchPriority.pop_front();
|
||||||
|
}
|
||||||
|
while (!outDefaultPriority.empty()) {
|
||||||
|
outDefaultPriority.front().reply.send(GetReadVersionReply{});
|
||||||
|
outDefaultPriority.pop_front();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool isNear(double desired, int64_t actual) {
|
||||||
|
return std::abs(desired - actual) * 10 < desired;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rate limit set at 10, but client attempts 20 transactions per second.
|
||||||
|
// Client should be throttled to only 10 transactions per second.
|
||||||
|
TEST_CASE("/GrvProxyTransactionTagThrottler/Simple") {
|
||||||
|
state GrvProxyTransactionTagThrottler throttler;
|
||||||
|
state TagSet tagSet;
|
||||||
|
state TransactionTagMap<uint32_t> counters;
|
||||||
|
{
|
||||||
|
TransactionTagMap<double> rates;
|
||||||
|
rates["sampleTag"_sr] = 10.0;
|
||||||
|
throttler.updateRates(rates);
|
||||||
|
}
|
||||||
|
tagSet.addTag("sampleTag"_sr);
|
||||||
|
|
||||||
|
state Future<Void> client = mockClient(&throttler, TransactionPriority::DEFAULT, tagSet, 1, 20.0, &counters);
|
||||||
|
state Future<Void> server = mockServer(&throttler);
|
||||||
|
wait(timeout(client && server, 60.0, Void()));
|
||||||
|
TraceEvent("TagQuotaTest_Simple").detail("Counter", counters["sampleTag"_sr]);
|
||||||
|
ASSERT(isNear(counters["sampleTag"_sr], 60.0 * 10.0));
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clients share the available 30 transaction/second budget
|
||||||
|
TEST_CASE("/GrvProxyTransactionTagThrottler/MultiClient") {
|
||||||
|
state GrvProxyTransactionTagThrottler throttler;
|
||||||
|
state TagSet tagSet;
|
||||||
|
state TransactionTagMap<uint32_t> counters;
|
||||||
|
{
|
||||||
|
TransactionTagMap<double> rates;
|
||||||
|
rates["sampleTag"_sr] = 30.0;
|
||||||
|
throttler.updateRates(rates);
|
||||||
|
}
|
||||||
|
tagSet.addTag("sampleTag"_sr);
|
||||||
|
|
||||||
|
state std::vector<Future<Void>> clients;
|
||||||
|
clients.reserve(10);
|
||||||
|
for (int i = 0; i < 10; ++i) {
|
||||||
|
clients.push_back(mockClient(&throttler, TransactionPriority::DEFAULT, tagSet, 1, 10.0, &counters));
|
||||||
|
}
|
||||||
|
|
||||||
|
state Future<Void> server = mockServer(&throttler);
|
||||||
|
wait(timeout(waitForAll(clients) && server, 60.0, Void()));
|
||||||
|
TraceEvent("TagQuotaTest_MultiClient").detail("Counter", counters["sampleTag"_sr]);
|
||||||
|
ASSERT(isNear(counters["sampleTag"_sr], 60.0 * 30.0));
|
||||||
|
return Void();
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_CASE("/GrvProxyTransactionTagThrottler/Batch") {
|
||||||
|
state GrvProxyTransactionTagThrottler throttler;
|
||||||
|
state TagSet tagSet;
|
||||||
|
state TransactionTagMap<uint32_t> counters;
|
||||||
|
{
|
||||||
|
TransactionTagMap<double> rates;
|
||||||
|
rates["sampleTag"_sr] = 10.0;
|
||||||
|
throttler.updateRates(rates);
|
||||||
|
}
|
||||||
|
tagSet.addTag("sampleTag"_sr);
|
||||||
|
|
||||||
|
state Future<Void> client = mockClient(&throttler, TransactionPriority::DEFAULT, tagSet, 5, 20.0, &counters);
|
||||||
|
state Future<Void> server = mockServer(&throttler);
|
||||||
|
wait(timeout(client && server, 60.0, Void()));
|
||||||
|
|
||||||
|
TraceEvent("TagQuotaTest_Batch").detail("Counter", counters["sampleTag"_sr]);
|
||||||
|
ASSERT(isNear(counters["sampleTag"_sr], 60.0 * 10.0));
|
||||||
|
return Void();
|
||||||
|
}
|
|
@ -1,235 +0,0 @@
|
||||||
#include "fdbserver/TagQueue.h"
|
|
||||||
#include "flow/UnitTest.h"
|
|
||||||
#include "flow/actorcompiler.h" // must be last include
|
|
||||||
|
|
||||||
void TagQueue::updateRates(TransactionTagMap<double> const& newRates) {
|
|
||||||
for (const auto& [tag, rate] : newRates) {
|
|
||||||
auto it = rateInfos.find(tag);
|
|
||||||
if (it == rateInfos.end()) {
|
|
||||||
rateInfos[tag] = GrvTransactionRateInfo(rate);
|
|
||||||
} else {
|
|
||||||
it->second.setRate(rate);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const auto& [tag, _] : rateInfos) {
|
|
||||||
if (newRates.find(tag) == newRates.end()) {
|
|
||||||
rateInfos.erase(tag);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool TagQueue::canStart(TransactionTag tag, int64_t alreadyReleased, int64_t count) const {
|
|
||||||
auto it = rateInfos.find(tag);
|
|
||||||
if (it == rateInfos.end()) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return it->second.canStart(alreadyReleased, count);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool TagQueue::canStart(GetReadVersionRequest req, TransactionTagMap<int64_t>& releasedInEpoch) const {
|
|
||||||
for (const auto& [tag, count] : req.tags) {
|
|
||||||
if (!canStart(tag, releasedInEpoch[tag], count)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
void TagQueue::addRequest(GetReadVersionRequest req) {
|
|
||||||
newRequests.push_back(req);
|
|
||||||
}
|
|
||||||
|
|
||||||
void TagQueue::releaseTransactions(double elapsed,
|
|
||||||
SpannedDeque<GetReadVersionRequest>& outBatchPriority,
|
|
||||||
SpannedDeque<GetReadVersionRequest>& outDefaultPriority) {
|
|
||||||
for (auto& [_, rateInfo] : rateInfos) {
|
|
||||||
rateInfo.startReleaseWindow();
|
|
||||||
}
|
|
||||||
|
|
||||||
Deque<DelayedRequest> newDelayedRequests;
|
|
||||||
TransactionTagMap<int64_t> releasedInEpoch;
|
|
||||||
|
|
||||||
while (!delayedRequests.empty()) {
|
|
||||||
auto& delayedReq = delayedRequests.front();
|
|
||||||
auto& req = delayedReq.req;
|
|
||||||
if (canStart(req, releasedInEpoch)) {
|
|
||||||
for (const auto& [tag, count] : req.tags) {
|
|
||||||
releasedInEpoch[tag] += count;
|
|
||||||
}
|
|
||||||
req.proxyTagThrottledDuration = delayedReq.delayTime();
|
|
||||||
if (req.priority == TransactionPriority::BATCH) {
|
|
||||||
outBatchPriority.push_back(req);
|
|
||||||
} else if (req.priority == TransactionPriority::DEFAULT) {
|
|
||||||
outDefaultPriority.push_back(req);
|
|
||||||
} else {
|
|
||||||
// Immediate priority transactions should bypass the TagQueue
|
|
||||||
ASSERT(false);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
newDelayedRequests.push_back(delayedReq);
|
|
||||||
}
|
|
||||||
delayedRequests.pop_front();
|
|
||||||
}
|
|
||||||
|
|
||||||
while (!newRequests.empty()) {
|
|
||||||
auto const& req = newRequests.front();
|
|
||||||
if (canStart(req, releasedInEpoch)) {
|
|
||||||
for (const auto& [tag, count] : req.tags) {
|
|
||||||
releasedInEpoch[tag] += count;
|
|
||||||
}
|
|
||||||
if (req.priority == TransactionPriority::BATCH) {
|
|
||||||
outBatchPriority.push_back(req);
|
|
||||||
} else if (req.priority == TransactionPriority::DEFAULT) {
|
|
||||||
outDefaultPriority.push_back(req);
|
|
||||||
} else {
|
|
||||||
// Immediate priority transactions should bypass the TagQueue
|
|
||||||
ASSERT(false);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
newDelayedRequests.emplace_back(req);
|
|
||||||
}
|
|
||||||
newRequests.pop_front();
|
|
||||||
}
|
|
||||||
|
|
||||||
delayedRequests = std::move(newDelayedRequests);
|
|
||||||
for (auto& [tag, rateInfo] : rateInfos) {
|
|
||||||
rateInfo.endReleaseWindow(std::move(releasedInEpoch)[tag], false, elapsed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ACTOR static Future<Void> mockClient(TagQueue* tagQueue,
|
|
||||||
TransactionPriority priority,
|
|
||||||
TagSet tagSet,
|
|
||||||
int batchSize,
|
|
||||||
double desiredRate,
|
|
||||||
TransactionTagMap<uint32_t>* counters) {
|
|
||||||
state Future<Void> timer;
|
|
||||||
state TransactionTagMap<uint32_t> tags;
|
|
||||||
for (const auto& tag : tagSet) {
|
|
||||||
tags[tag] = batchSize;
|
|
||||||
}
|
|
||||||
loop {
|
|
||||||
timer = delayJittered(static_cast<double>(batchSize) / desiredRate);
|
|
||||||
GetReadVersionRequest req;
|
|
||||||
req.tags = tags;
|
|
||||||
req.priority = priority;
|
|
||||||
tagQueue->addRequest(req);
|
|
||||||
wait(success(req.reply.getFuture()) && timer);
|
|
||||||
for (auto& [tag, _] : tags) {
|
|
||||||
(*counters)[tag] += batchSize;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ACTOR static Future<Void> mockServer(TagQueue* tagQueue) {
|
|
||||||
state SpannedDeque<GetReadVersionRequest> outBatchPriority("TestTagQueue_Batch"_loc);
|
|
||||||
state SpannedDeque<GetReadVersionRequest> outDefaultPriority("TestTagQueue_Default"_loc);
|
|
||||||
loop {
|
|
||||||
state double elapsed = (0.009 + 0.002 * deterministicRandom()->random01());
|
|
||||||
wait(delay(elapsed));
|
|
||||||
tagQueue->releaseTransactions(elapsed, outBatchPriority, outDefaultPriority);
|
|
||||||
while (!outBatchPriority.empty()) {
|
|
||||||
outBatchPriority.front().reply.send(GetReadVersionReply{});
|
|
||||||
outBatchPriority.pop_front();
|
|
||||||
}
|
|
||||||
while (!outDefaultPriority.empty()) {
|
|
||||||
outDefaultPriority.front().reply.send(GetReadVersionReply{});
|
|
||||||
outDefaultPriority.pop_front();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool isNear(double desired, int64_t actual) {
|
|
||||||
return std::abs(desired - actual) * 10 < desired;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Rate limit set at 10, but client attempts 20 transactions per second.
|
|
||||||
// Client should be throttled to only 10 transactions per second.
|
|
||||||
TEST_CASE("/TagQueue/Simple") {
|
|
||||||
state TagQueue tagQueue;
|
|
||||||
state TagSet tagSet;
|
|
||||||
state TransactionTagMap<uint32_t> counters;
|
|
||||||
{
|
|
||||||
TransactionTagMap<double> rates;
|
|
||||||
rates["sampleTag"_sr] = 10.0;
|
|
||||||
tagQueue.updateRates(rates);
|
|
||||||
}
|
|
||||||
tagSet.addTag("sampleTag"_sr);
|
|
||||||
|
|
||||||
state Future<Void> client = mockClient(&tagQueue, TransactionPriority::DEFAULT, tagSet, 1, 20.0, &counters);
|
|
||||||
state Future<Void> server = mockServer(&tagQueue);
|
|
||||||
wait(timeout(client && server, 60.0, Void()));
|
|
||||||
TraceEvent("TagQuotaTest_Simple").detail("Counter", counters["sampleTag"_sr]);
|
|
||||||
ASSERT(isNear(counters["sampleTag"_sr], 60.0 * 10.0));
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Throttle based on the tag with the lowest rate
|
|
||||||
TEST_CASE("/TagQueue/MultiTag") {
|
|
||||||
state TagQueue tagQueue;
|
|
||||||
state TagSet tagSet;
|
|
||||||
state TransactionTagMap<uint32_t> counters;
|
|
||||||
{
|
|
||||||
TransactionTagMap<double> rates;
|
|
||||||
rates["sampleTag1"_sr] = 10.0;
|
|
||||||
rates["sampleTag2"_sr] = 20.0;
|
|
||||||
tagQueue.updateRates(rates);
|
|
||||||
}
|
|
||||||
tagSet.addTag("sampleTag1"_sr);
|
|
||||||
tagSet.addTag("sampleTag2"_sr);
|
|
||||||
|
|
||||||
state Future<Void> client = mockClient(&tagQueue, TransactionPriority::DEFAULT, tagSet, 1, 30.0, &counters);
|
|
||||||
state Future<Void> server = mockServer(&tagQueue);
|
|
||||||
wait(timeout(client && server, 60.0, Void()));
|
|
||||||
TraceEvent("TagQuotaTest_MultiTag").detail("Counter", counters["sampleTag1"_sr]);
|
|
||||||
ASSERT_EQ(counters["sampleTag1"_sr], counters["sampleTag2"_sr]);
|
|
||||||
ASSERT(isNear(counters["sampleTag1"_sr], 60.0 * 10.0));
|
|
||||||
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clients share the available 30 transaction/second budget
|
|
||||||
TEST_CASE("/TagQueue/MultiClient") {
|
|
||||||
state TagQueue tagQueue;
|
|
||||||
state TagSet tagSet;
|
|
||||||
state TransactionTagMap<uint32_t> counters;
|
|
||||||
{
|
|
||||||
TransactionTagMap<double> rates;
|
|
||||||
rates["sampleTag"_sr] = 30.0;
|
|
||||||
tagQueue.updateRates(rates);
|
|
||||||
}
|
|
||||||
tagSet.addTag("sampleTag"_sr);
|
|
||||||
|
|
||||||
state std::vector<Future<Void>> clients;
|
|
||||||
clients.reserve(10);
|
|
||||||
for (int i = 0; i < 10; ++i) {
|
|
||||||
clients.push_back(mockClient(&tagQueue, TransactionPriority::DEFAULT, tagSet, 1, 10.0, &counters));
|
|
||||||
}
|
|
||||||
|
|
||||||
state Future<Void> server = mockServer(&tagQueue);
|
|
||||||
wait(timeout(waitForAll(clients) && server, 60.0, Void()));
|
|
||||||
TraceEvent("TagQuotaTest_MultiClient").detail("Counter", counters["sampleTag"_sr]);
|
|
||||||
ASSERT(isNear(counters["sampleTag"_sr], 60.0 * 30.0));
|
|
||||||
return Void();
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST_CASE("/TagQueue/Batch") {
|
|
||||||
state TagQueue tagQueue;
|
|
||||||
state TagSet tagSet;
|
|
||||||
state TransactionTagMap<uint32_t> counters;
|
|
||||||
{
|
|
||||||
TransactionTagMap<double> rates;
|
|
||||||
rates["sampleTag"_sr] = 10.0;
|
|
||||||
tagQueue.updateRates(rates);
|
|
||||||
}
|
|
||||||
tagSet.addTag("sampleTag"_sr);
|
|
||||||
|
|
||||||
state Future<Void> client = mockClient(&tagQueue, TransactionPriority::DEFAULT, tagSet, 5, 20.0, &counters);
|
|
||||||
state Future<Void> server = mockServer(&tagQueue);
|
|
||||||
wait(timeout(client && server, 60.0, Void()));
|
|
||||||
|
|
||||||
TraceEvent("TagQuotaTest_Batch").detail("Counter", counters["sampleTag"_sr]);
|
|
||||||
ASSERT(isNear(counters["sampleTag"_sr], 60.0 * 10.0));
|
|
||||||
return Void();
|
|
||||||
}
|
|
|
@ -0,0 +1,62 @@
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "fdbclient/CommitProxyInterface.h"
|
||||||
|
#include "fdbclient/TagThrottle.actor.h"
|
||||||
|
#include "fdbserver/GrvTransactionRateInfo.h"
|
||||||
|
|
||||||
|
// GrvProxyTransactionTagThrottler is used to throttle GetReadVersionRequests based on tag quotas
|
||||||
|
// before they're pushed into priority-partitioned queues.
|
||||||
|
//
|
||||||
|
// A GrvTransactionRateInfo object and a request queue are maintained for each tag.
|
||||||
|
// The GrvTransactionRateInfo object is used to determine when a request can be released.
|
||||||
|
//
|
||||||
|
// Between each set of waits, releaseTransactions is run, releasing queued transactions
|
||||||
|
// that have passed the tag throttling stage. Transactions that are not yet ready
|
||||||
|
// are requeued during releaseTransactions.
|
||||||
|
class GrvProxyTransactionTagThrottler {
|
||||||
|
struct DelayedRequest {
|
||||||
|
GetReadVersionRequest req;
|
||||||
|
double startTime;
|
||||||
|
|
||||||
|
explicit DelayedRequest(GetReadVersionRequest const& req, double startTime = now())
|
||||||
|
: req(req), startTime(startTime) {}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct TagQueue {
|
||||||
|
Optional<GrvTransactionRateInfo> rateInfo;
|
||||||
|
Deque<DelayedRequest> requests;
|
||||||
|
|
||||||
|
explicit TagQueue(double rate = 0.0) : rateInfo(rate) {}
|
||||||
|
|
||||||
|
void releaseTransactions(double elapsed,
|
||||||
|
SpannedDeque<GetReadVersionRequest>& outBatchPriority,
|
||||||
|
SpannedDeque<GetReadVersionRequest>& outDefaultPriority);
|
||||||
|
|
||||||
|
void setRate(double rate) {
|
||||||
|
if (rateInfo.present()) {
|
||||||
|
rateInfo.get().setRate(rate);
|
||||||
|
} else {
|
||||||
|
rateInfo = GrvTransactionRateInfo(rate);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Track the budgets for each tag
|
||||||
|
TransactionTagMap<TagQueue> queues;
|
||||||
|
|
||||||
|
// These requests are simply passed through with no throttling
|
||||||
|
Deque<GetReadVersionRequest> untaggedRequests;
|
||||||
|
|
||||||
|
public:
|
||||||
|
// Called with rates received from ratekeeper
|
||||||
|
void updateRates(TransactionTagMap<double> const& newRates);
|
||||||
|
|
||||||
|
// elapsed indicates the amount of time since the last epoch was run.
|
||||||
|
// If a request is ready to be executed, it is sent to the deque
|
||||||
|
// corresponding to its priority. If not, the request remains queued.
|
||||||
|
void releaseTransactions(double elapsed,
|
||||||
|
SpannedDeque<GetReadVersionRequest>& outBatchPriority,
|
||||||
|
SpannedDeque<GetReadVersionRequest>& outDefaultPriority);
|
||||||
|
|
||||||
|
void addRequest(GetReadVersionRequest const&);
|
||||||
|
};
|
|
@ -1,52 +0,0 @@
|
||||||
#pragma once
|
|
||||||
|
|
||||||
#include "fdbclient/CommitProxyInterface.h"
|
|
||||||
#include "fdbclient/TagThrottle.actor.h"
|
|
||||||
#include "fdbserver/GrvTransactionRateInfo.h"
|
|
||||||
|
|
||||||
// TagQueue is used to throttle GetReadVersionRequests based on tag quotas
|
|
||||||
// before they're pushed into priority-partitioned queues.
|
|
||||||
//
|
|
||||||
// A GrvTransactionRateInfo object is maintained for each tag. This object
|
|
||||||
// is used to determine when a request can be released.
|
|
||||||
//
|
|
||||||
// Between each set of waits, runEpoch is run, releasing queued transactions
|
|
||||||
// that have passed the tag throttling stage. Transactions that are not yet ready
|
|
||||||
// are requeued during runEpoch.
|
|
||||||
class TagQueue {
|
|
||||||
struct DelayedRequest {
|
|
||||||
double startTime;
|
|
||||||
GetReadVersionRequest req;
|
|
||||||
explicit DelayedRequest(GetReadVersionRequest req) : startTime(now()), req(req) {}
|
|
||||||
double delayTime() const { return now() - startTime; }
|
|
||||||
};
|
|
||||||
|
|
||||||
// Track the budgets for each tag
|
|
||||||
TransactionTagMap<GrvTransactionRateInfo> rateInfos;
|
|
||||||
|
|
||||||
// Requests that have not yet been processed
|
|
||||||
Deque<GetReadVersionRequest> newRequests;
|
|
||||||
|
|
||||||
// Requests that have been delayed at least once
|
|
||||||
Deque<DelayedRequest> delayedRequests;
|
|
||||||
|
|
||||||
// Checks if count transactions can be released, given that
|
|
||||||
// alreadyReleased transactions have already been released in this epoch.
|
|
||||||
bool canStart(TransactionTag tag, int64_t alreadyReleased, int64_t count) const;
|
|
||||||
|
|
||||||
// Checks if a request can be released
|
|
||||||
bool canStart(GetReadVersionRequest req, TransactionTagMap<int64_t>& releasedInEpoch) const;
|
|
||||||
|
|
||||||
public:
|
|
||||||
// Called with rates received from ratekeeper
|
|
||||||
void updateRates(TransactionTagMap<double> const& newRates);
|
|
||||||
|
|
||||||
// elapsed indicates the amount of time since the last epoch was run.
|
|
||||||
// If a request is ready to be executed, it is sent to the deque
|
|
||||||
// corresponding to its priority. If not, the request remains queued.
|
|
||||||
void releaseTransactions(double elapsed,
|
|
||||||
SpannedDeque<GetReadVersionRequest>& outBatchPriority,
|
|
||||||
SpannedDeque<GetReadVersionRequest>& outDefaultPriority);
|
|
||||||
|
|
||||||
void addRequest(GetReadVersionRequest);
|
|
||||||
};
|
|
Loading…
Reference in New Issue