From 24a0dd9f173f2cef85f16407762cecb925fe354c Mon Sep 17 00:00:00 2001 From: sfc-gh-tclinkenbeard Date: Fri, 7 Oct 2022 10:31:36 -0700 Subject: [PATCH] Change GrvProxy tag throttling algorithm. The new algorithm assumes there is only one tag per request, so queues are partitioned by tag. This is a more efficient approach than the old algorithm. --- fdbserver/GrvProxyServer.actor.cpp | 20 +- .../GrvProxyTransactionTagThrottler.actor.cpp | 196 +++++++++++++++ fdbserver/TagQueue.actor.cpp | 235 ------------------ .../GrvProxyTransactionTagThrottler.h | 62 +++++ fdbserver/include/fdbserver/TagQueue.h | 52 ---- 5 files changed, 268 insertions(+), 297 deletions(-) create mode 100644 fdbserver/GrvProxyTransactionTagThrottler.actor.cpp delete mode 100644 fdbserver/TagQueue.actor.cpp create mode 100644 fdbserver/include/fdbserver/GrvProxyTransactionTagThrottler.h delete mode 100644 fdbserver/include/fdbserver/TagQueue.h diff --git a/fdbserver/GrvProxyServer.actor.cpp b/fdbserver/GrvProxyServer.actor.cpp index f1f04581e1..456e8a7110 100644 --- a/fdbserver/GrvProxyServer.actor.cpp +++ b/fdbserver/GrvProxyServer.actor.cpp @@ -27,10 +27,10 @@ #include "fdbclient/CommitProxyInterface.h" #include "fdbclient/GrvProxyInterface.h" #include "fdbclient/VersionVector.h" +#include "fdbserver/GrvProxyTransactionTagThrottler.h" #include "fdbserver/GrvTransactionRateInfo.h" #include "fdbserver/LogSystem.h" #include "fdbserver/LogSystemDiskQueueAdapter.h" -#include "fdbserver/TagQueue.h" #include "fdbserver/WaitFailure.h" #include "fdbserver/WorkerInterface.actor.h" #include "fdbrpc/sim_validation.h" @@ -362,7 +362,7 @@ ACTOR Future getRate(UID myID, GetHealthMetricsReply* detailedHealthMetricsReply, TransactionTagMap* transactionTagCounter, PrioritizedTransactionTagMap* clientThrottledTags, - TagQueue* tagQueue, + GrvProxyTransactionTagThrottler* tagThrottler, GrvProxyStats* stats, GrvProxyData* proxyData) { state Future nextRequestTimer = Never(); @@ -423,7 +423,7 @@ ACTOR Future getRate(UID myID, *clientThrottledTags = std::move(rep.clientThrottledTags.get()); } if (rep.proxyThrottledTags.present()) { - tagQueue->updateRates(rep.proxyThrottledTags.get()); + tagThrottler->updateRates(rep.proxyThrottledTags.get()); } } when(wait(leaseTimeout)) { @@ -469,7 +469,7 @@ ACTOR Future queueGetReadVersionRequests(Reference GrvProxyStats* stats, GrvTransactionRateInfo* batchRateInfo, TransactionTagMap* transactionTagCounter, - TagQueue* tagQueue) { + GrvProxyTransactionTagThrottler* tagThrottler) { getCurrentLineage()->modify(&TransactionLineage::operation) = TransactionLineage::Operation::GetConsistentReadVersion; loop choose { @@ -537,7 +537,7 @@ ACTOR Future queueGetReadVersionRequests(Reference stats->txnDefaultPriorityStartIn += req.transactionCount; ++stats->defaultGRVQueueSize; if (SERVER_KNOBS->ENFORCE_TAG_THROTTLING_ON_PROXIES) { - tagQueue->addRequest(req); + tagThrottler->addRequest(req); } else { defaultQueue->push_back(req); } @@ -554,7 +554,7 @@ ACTOR Future queueGetReadVersionRequests(Reference stats->txnBatchPriorityStartIn += req.transactionCount; ++stats->batchGRVQueueSize; if (SERVER_KNOBS->ENFORCE_TAG_THROTTLING_ON_PROXIES) { - tagQueue->addRequest(req); + tagThrottler->addRequest(req); } else { batchQueue->push_back(req); } @@ -823,7 +823,7 @@ ACTOR static Future transactionStarter(GrvProxyInterface proxy, state int64_t batchTransactionCount = 0; state GrvTransactionRateInfo normalRateInfo(10); state GrvTransactionRateInfo batchRateInfo(0); - state TagQueue tagQueue; + state GrvProxyTransactionTagThrottler tagThrottler; state SpannedDeque systemQueue("GP:transactionStarterSystemQueue"_loc); state SpannedDeque defaultQueue("GP:transactionStarterDefaultQueue"_loc); @@ -850,7 +850,7 @@ ACTOR static Future transactionStarter(GrvProxyInterface proxy, detailedHealthMetricsReply, &transactionTagCounter, &clientThrottledTags, - &tagQueue, + &tagThrottler, &grvProxyData->stats, grvProxyData)); addActor.send(queueGetReadVersionRequests(db, @@ -865,7 +865,7 @@ ACTOR static Future transactionStarter(GrvProxyInterface proxy, &grvProxyData->stats, &batchRateInfo, &transactionTagCounter, - &tagQueue)); + &tagThrottler)); while (std::find(db->get().client.grvProxies.begin(), db->get().client.grvProxies.end(), proxy) == db->get().client.grvProxies.end()) { @@ -888,7 +888,7 @@ ACTOR static Future transactionStarter(GrvProxyInterface proxy, elapsed = 1e-15; } - tagQueue.releaseTransactions(elapsed, defaultQueue, batchQueue); + tagThrottler.releaseTransactions(elapsed, defaultQueue, batchQueue); normalRateInfo.startReleaseWindow(); batchRateInfo.startReleaseWindow(); diff --git a/fdbserver/GrvProxyTransactionTagThrottler.actor.cpp b/fdbserver/GrvProxyTransactionTagThrottler.actor.cpp new file mode 100644 index 0000000000..4583f06993 --- /dev/null +++ b/fdbserver/GrvProxyTransactionTagThrottler.actor.cpp @@ -0,0 +1,196 @@ +#include "fdbserver/GrvProxyTransactionTagThrottler.h" +#include "flow/UnitTest.h" +#include "flow/actorcompiler.h" // must be last include + +void GrvProxyTransactionTagThrottler::updateRates(TransactionTagMap const& newRates) { + for (const auto& [tag, rate] : newRates) { + auto it = queues.find(tag); + if (it == queues.end()) { + queues[tag] = TagQueue(rate); + } else { + it->second.setRate(rate); + } + } + + // Clean up tags that did not appear in newRates + for (auto& [tag, queue] : queues) { + if (newRates.find(tag) == newRates.end()) { + queue.rateInfo.reset(); + if (queue.requests.empty()) { + // FIXME: Use cleaner method of cleanup + queues.erase(tag); + } + } + } +} + +void GrvProxyTransactionTagThrottler::addRequest(GetReadVersionRequest const& req) { + if (req.tags.empty()) { + untaggedRequests.push_back(req); + } else { + auto const& tag = req.tags.begin()->first; + if (req.tags.size() > 1) { + // The GrvProxyTransactionTagThrottler assumes that each GetReadVersionRequest + // has at most one tag. If a transaction uses multiple tags and + // SERVER_KNOBS->ENFORCE_TAG_THROTTLING_ON_PROXIES is enabled, there may be + // unexpected behaviour, because only one tag is used for throttling. + TraceEvent(SevWarnAlways, "GrvProxyTransactionTagThrottler_MultipleTags") + .detail("NumTags", req.tags.size()) + .detail("UsingTag", printable(tag)); + } + queues[tag].requests.emplace_back(req); + } +} + +void GrvProxyTransactionTagThrottler::TagQueue::releaseTransactions( + double elapsed, + SpannedDeque& outBatchPriority, + SpannedDeque& outDefaultPriority) { + Deque newDelayedRequests; + if (rateInfo.present()) + rateInfo.get().startReleaseWindow(); + int transactionsReleased = 0; + while (!requests.empty()) { + auto& delayedReq = requests.front(); + auto& req = delayedReq.req; + auto const count = req.tags.begin()->second; + if (!rateInfo.present() || rateInfo.get().canStart(transactionsReleased, count)) { + req.proxyTagThrottledDuration = now() - delayedReq.startTime; + transactionsReleased += count; + if (req.priority == TransactionPriority::BATCH) { + outBatchPriority.push_back(req); + } else if (req.priority == TransactionPriority::DEFAULT) { + outDefaultPriority.push_back(req); + } else { + // Immediate priority transactions should bypass the GrvProxyTransactionTagThrottler + ASSERT(false); + } + } else { + newDelayedRequests.push_back(delayedReq); + } + requests.pop_front(); + } + if (rateInfo.present()) + rateInfo.get().endReleaseWindow(transactionsReleased, false, elapsed); + requests = std::move(newDelayedRequests); +} + +void GrvProxyTransactionTagThrottler::releaseTransactions(double elapsed, + SpannedDeque& outBatchPriority, + SpannedDeque& outDefaultPriority) { + for (auto& [_, tagQueue] : queues) { + tagQueue.releaseTransactions(elapsed, outBatchPriority, outDefaultPriority); + } +} + +ACTOR static Future mockClient(GrvProxyTransactionTagThrottler* throttler, + TransactionPriority priority, + TagSet tagSet, + int batchSize, + double desiredRate, + TransactionTagMap* counters) { + state Future timer; + state TransactionTagMap tags; + for (const auto& tag : tagSet) { + tags[tag] = batchSize; + } + loop { + timer = delayJittered(static_cast(batchSize) / desiredRate); + GetReadVersionRequest req; + req.tags = tags; + req.priority = priority; + throttler->addRequest(req); + wait(success(req.reply.getFuture()) && timer); + for (auto& [tag, _] : tags) { + (*counters)[tag] += batchSize; + } + } +} + +ACTOR static Future mockServer(GrvProxyTransactionTagThrottler* throttler) { + state SpannedDeque outBatchPriority("TestGrvProxyTransactionTagThrottler_Batch"_loc); + state SpannedDeque outDefaultPriority("TestGrvProxyTransactionTagThrottler_Default"_loc); + loop { + state double elapsed = (0.009 + 0.002 * deterministicRandom()->random01()); + wait(delay(elapsed)); + throttler->releaseTransactions(elapsed, outBatchPriority, outDefaultPriority); + while (!outBatchPriority.empty()) { + outBatchPriority.front().reply.send(GetReadVersionReply{}); + outBatchPriority.pop_front(); + } + while (!outDefaultPriority.empty()) { + outDefaultPriority.front().reply.send(GetReadVersionReply{}); + outDefaultPriority.pop_front(); + } + } +} + +static bool isNear(double desired, int64_t actual) { + return std::abs(desired - actual) * 10 < desired; +} + +// Rate limit set at 10, but client attempts 20 transactions per second. +// Client should be throttled to only 10 transactions per second. +TEST_CASE("/GrvProxyTransactionTagThrottler/Simple") { + state GrvProxyTransactionTagThrottler throttler; + state TagSet tagSet; + state TransactionTagMap counters; + { + TransactionTagMap rates; + rates["sampleTag"_sr] = 10.0; + throttler.updateRates(rates); + } + tagSet.addTag("sampleTag"_sr); + + state Future client = mockClient(&throttler, TransactionPriority::DEFAULT, tagSet, 1, 20.0, &counters); + state Future server = mockServer(&throttler); + wait(timeout(client && server, 60.0, Void())); + TraceEvent("TagQuotaTest_Simple").detail("Counter", counters["sampleTag"_sr]); + ASSERT(isNear(counters["sampleTag"_sr], 60.0 * 10.0)); + return Void(); +} + +// Clients share the available 30 transaction/second budget +TEST_CASE("/GrvProxyTransactionTagThrottler/MultiClient") { + state GrvProxyTransactionTagThrottler throttler; + state TagSet tagSet; + state TransactionTagMap counters; + { + TransactionTagMap rates; + rates["sampleTag"_sr] = 30.0; + throttler.updateRates(rates); + } + tagSet.addTag("sampleTag"_sr); + + state std::vector> clients; + clients.reserve(10); + for (int i = 0; i < 10; ++i) { + clients.push_back(mockClient(&throttler, TransactionPriority::DEFAULT, tagSet, 1, 10.0, &counters)); + } + + state Future server = mockServer(&throttler); + wait(timeout(waitForAll(clients) && server, 60.0, Void())); + TraceEvent("TagQuotaTest_MultiClient").detail("Counter", counters["sampleTag"_sr]); + ASSERT(isNear(counters["sampleTag"_sr], 60.0 * 30.0)); + return Void(); +} + +TEST_CASE("/GrvProxyTransactionTagThrottler/Batch") { + state GrvProxyTransactionTagThrottler throttler; + state TagSet tagSet; + state TransactionTagMap counters; + { + TransactionTagMap rates; + rates["sampleTag"_sr] = 10.0; + throttler.updateRates(rates); + } + tagSet.addTag("sampleTag"_sr); + + state Future client = mockClient(&throttler, TransactionPriority::DEFAULT, tagSet, 5, 20.0, &counters); + state Future server = mockServer(&throttler); + wait(timeout(client && server, 60.0, Void())); + + TraceEvent("TagQuotaTest_Batch").detail("Counter", counters["sampleTag"_sr]); + ASSERT(isNear(counters["sampleTag"_sr], 60.0 * 10.0)); + return Void(); +} diff --git a/fdbserver/TagQueue.actor.cpp b/fdbserver/TagQueue.actor.cpp deleted file mode 100644 index e0dc9ec7df..0000000000 --- a/fdbserver/TagQueue.actor.cpp +++ /dev/null @@ -1,235 +0,0 @@ -#include "fdbserver/TagQueue.h" -#include "flow/UnitTest.h" -#include "flow/actorcompiler.h" // must be last include - -void TagQueue::updateRates(TransactionTagMap const& newRates) { - for (const auto& [tag, rate] : newRates) { - auto it = rateInfos.find(tag); - if (it == rateInfos.end()) { - rateInfos[tag] = GrvTransactionRateInfo(rate); - } else { - it->second.setRate(rate); - } - } - - for (const auto& [tag, _] : rateInfos) { - if (newRates.find(tag) == newRates.end()) { - rateInfos.erase(tag); - } - } -} - -bool TagQueue::canStart(TransactionTag tag, int64_t alreadyReleased, int64_t count) const { - auto it = rateInfos.find(tag); - if (it == rateInfos.end()) { - return true; - } - return it->second.canStart(alreadyReleased, count); -} - -bool TagQueue::canStart(GetReadVersionRequest req, TransactionTagMap& releasedInEpoch) const { - for (const auto& [tag, count] : req.tags) { - if (!canStart(tag, releasedInEpoch[tag], count)) { - return false; - } - } - return true; -} - -void TagQueue::addRequest(GetReadVersionRequest req) { - newRequests.push_back(req); -} - -void TagQueue::releaseTransactions(double elapsed, - SpannedDeque& outBatchPriority, - SpannedDeque& outDefaultPriority) { - for (auto& [_, rateInfo] : rateInfos) { - rateInfo.startReleaseWindow(); - } - - Deque newDelayedRequests; - TransactionTagMap releasedInEpoch; - - while (!delayedRequests.empty()) { - auto& delayedReq = delayedRequests.front(); - auto& req = delayedReq.req; - if (canStart(req, releasedInEpoch)) { - for (const auto& [tag, count] : req.tags) { - releasedInEpoch[tag] += count; - } - req.proxyTagThrottledDuration = delayedReq.delayTime(); - if (req.priority == TransactionPriority::BATCH) { - outBatchPriority.push_back(req); - } else if (req.priority == TransactionPriority::DEFAULT) { - outDefaultPriority.push_back(req); - } else { - // Immediate priority transactions should bypass the TagQueue - ASSERT(false); - } - } else { - newDelayedRequests.push_back(delayedReq); - } - delayedRequests.pop_front(); - } - - while (!newRequests.empty()) { - auto const& req = newRequests.front(); - if (canStart(req, releasedInEpoch)) { - for (const auto& [tag, count] : req.tags) { - releasedInEpoch[tag] += count; - } - if (req.priority == TransactionPriority::BATCH) { - outBatchPriority.push_back(req); - } else if (req.priority == TransactionPriority::DEFAULT) { - outDefaultPriority.push_back(req); - } else { - // Immediate priority transactions should bypass the TagQueue - ASSERT(false); - } - } else { - newDelayedRequests.emplace_back(req); - } - newRequests.pop_front(); - } - - delayedRequests = std::move(newDelayedRequests); - for (auto& [tag, rateInfo] : rateInfos) { - rateInfo.endReleaseWindow(std::move(releasedInEpoch)[tag], false, elapsed); - } -} - -ACTOR static Future mockClient(TagQueue* tagQueue, - TransactionPriority priority, - TagSet tagSet, - int batchSize, - double desiredRate, - TransactionTagMap* counters) { - state Future timer; - state TransactionTagMap tags; - for (const auto& tag : tagSet) { - tags[tag] = batchSize; - } - loop { - timer = delayJittered(static_cast(batchSize) / desiredRate); - GetReadVersionRequest req; - req.tags = tags; - req.priority = priority; - tagQueue->addRequest(req); - wait(success(req.reply.getFuture()) && timer); - for (auto& [tag, _] : tags) { - (*counters)[tag] += batchSize; - } - } -} - -ACTOR static Future mockServer(TagQueue* tagQueue) { - state SpannedDeque outBatchPriority("TestTagQueue_Batch"_loc); - state SpannedDeque outDefaultPriority("TestTagQueue_Default"_loc); - loop { - state double elapsed = (0.009 + 0.002 * deterministicRandom()->random01()); - wait(delay(elapsed)); - tagQueue->releaseTransactions(elapsed, outBatchPriority, outDefaultPriority); - while (!outBatchPriority.empty()) { - outBatchPriority.front().reply.send(GetReadVersionReply{}); - outBatchPriority.pop_front(); - } - while (!outDefaultPriority.empty()) { - outDefaultPriority.front().reply.send(GetReadVersionReply{}); - outDefaultPriority.pop_front(); - } - } -} - -static bool isNear(double desired, int64_t actual) { - return std::abs(desired - actual) * 10 < desired; -} - -// Rate limit set at 10, but client attempts 20 transactions per second. -// Client should be throttled to only 10 transactions per second. -TEST_CASE("/TagQueue/Simple") { - state TagQueue tagQueue; - state TagSet tagSet; - state TransactionTagMap counters; - { - TransactionTagMap rates; - rates["sampleTag"_sr] = 10.0; - tagQueue.updateRates(rates); - } - tagSet.addTag("sampleTag"_sr); - - state Future client = mockClient(&tagQueue, TransactionPriority::DEFAULT, tagSet, 1, 20.0, &counters); - state Future server = mockServer(&tagQueue); - wait(timeout(client && server, 60.0, Void())); - TraceEvent("TagQuotaTest_Simple").detail("Counter", counters["sampleTag"_sr]); - ASSERT(isNear(counters["sampleTag"_sr], 60.0 * 10.0)); - return Void(); -} - -// Throttle based on the tag with the lowest rate -TEST_CASE("/TagQueue/MultiTag") { - state TagQueue tagQueue; - state TagSet tagSet; - state TransactionTagMap counters; - { - TransactionTagMap rates; - rates["sampleTag1"_sr] = 10.0; - rates["sampleTag2"_sr] = 20.0; - tagQueue.updateRates(rates); - } - tagSet.addTag("sampleTag1"_sr); - tagSet.addTag("sampleTag2"_sr); - - state Future client = mockClient(&tagQueue, TransactionPriority::DEFAULT, tagSet, 1, 30.0, &counters); - state Future server = mockServer(&tagQueue); - wait(timeout(client && server, 60.0, Void())); - TraceEvent("TagQuotaTest_MultiTag").detail("Counter", counters["sampleTag1"_sr]); - ASSERT_EQ(counters["sampleTag1"_sr], counters["sampleTag2"_sr]); - ASSERT(isNear(counters["sampleTag1"_sr], 60.0 * 10.0)); - - return Void(); -} - -// Clients share the available 30 transaction/second budget -TEST_CASE("/TagQueue/MultiClient") { - state TagQueue tagQueue; - state TagSet tagSet; - state TransactionTagMap counters; - { - TransactionTagMap rates; - rates["sampleTag"_sr] = 30.0; - tagQueue.updateRates(rates); - } - tagSet.addTag("sampleTag"_sr); - - state std::vector> clients; - clients.reserve(10); - for (int i = 0; i < 10; ++i) { - clients.push_back(mockClient(&tagQueue, TransactionPriority::DEFAULT, tagSet, 1, 10.0, &counters)); - } - - state Future server = mockServer(&tagQueue); - wait(timeout(waitForAll(clients) && server, 60.0, Void())); - TraceEvent("TagQuotaTest_MultiClient").detail("Counter", counters["sampleTag"_sr]); - ASSERT(isNear(counters["sampleTag"_sr], 60.0 * 30.0)); - return Void(); -} - -TEST_CASE("/TagQueue/Batch") { - state TagQueue tagQueue; - state TagSet tagSet; - state TransactionTagMap counters; - { - TransactionTagMap rates; - rates["sampleTag"_sr] = 10.0; - tagQueue.updateRates(rates); - } - tagSet.addTag("sampleTag"_sr); - - state Future client = mockClient(&tagQueue, TransactionPriority::DEFAULT, tagSet, 5, 20.0, &counters); - state Future server = mockServer(&tagQueue); - wait(timeout(client && server, 60.0, Void())); - - TraceEvent("TagQuotaTest_Batch").detail("Counter", counters["sampleTag"_sr]); - ASSERT(isNear(counters["sampleTag"_sr], 60.0 * 10.0)); - return Void(); -} diff --git a/fdbserver/include/fdbserver/GrvProxyTransactionTagThrottler.h b/fdbserver/include/fdbserver/GrvProxyTransactionTagThrottler.h new file mode 100644 index 0000000000..12542e682e --- /dev/null +++ b/fdbserver/include/fdbserver/GrvProxyTransactionTagThrottler.h @@ -0,0 +1,62 @@ +#pragma once + +#include "fdbclient/CommitProxyInterface.h" +#include "fdbclient/TagThrottle.actor.h" +#include "fdbserver/GrvTransactionRateInfo.h" + +// GrvProxyTransactionTagThrottler is used to throttle GetReadVersionRequests based on tag quotas +// before they're pushed into priority-partitioned queues. +// +// A GrvTransactionRateInfo object and a request queue are maintained for each tag. +// The GrvTransactionRateInfo object is used to determine when a request can be released. +// +// Between each set of waits, releaseTransactions is run, releasing queued transactions +// that have passed the tag throttling stage. Transactions that are not yet ready +// are requeued during releaseTransactions. +class GrvProxyTransactionTagThrottler { + struct DelayedRequest { + GetReadVersionRequest req; + double startTime; + + explicit DelayedRequest(GetReadVersionRequest const& req, double startTime = now()) + : req(req), startTime(startTime) {} + }; + + struct TagQueue { + Optional rateInfo; + Deque requests; + + explicit TagQueue(double rate = 0.0) : rateInfo(rate) {} + + void releaseTransactions(double elapsed, + SpannedDeque& outBatchPriority, + SpannedDeque& outDefaultPriority); + + void setRate(double rate) { + if (rateInfo.present()) { + rateInfo.get().setRate(rate); + } else { + rateInfo = GrvTransactionRateInfo(rate); + } + } + }; + + // Track the budgets for each tag + TransactionTagMap queues; + + // These requests are simply passed through with no throttling + Deque untaggedRequests; + +public: + // Called with rates received from ratekeeper + void updateRates(TransactionTagMap const& newRates); + + // elapsed indicates the amount of time since the last epoch was run. + // If a request is ready to be executed, it is sent to the deque + // corresponding to its priority. If not, the request remains queued. + void releaseTransactions(double elapsed, + SpannedDeque& outBatchPriority, + SpannedDeque& outDefaultPriority); + + void addRequest(GetReadVersionRequest const&); +}; diff --git a/fdbserver/include/fdbserver/TagQueue.h b/fdbserver/include/fdbserver/TagQueue.h deleted file mode 100644 index a72db8bdee..0000000000 --- a/fdbserver/include/fdbserver/TagQueue.h +++ /dev/null @@ -1,52 +0,0 @@ -#pragma once - -#include "fdbclient/CommitProxyInterface.h" -#include "fdbclient/TagThrottle.actor.h" -#include "fdbserver/GrvTransactionRateInfo.h" - -// TagQueue is used to throttle GetReadVersionRequests based on tag quotas -// before they're pushed into priority-partitioned queues. -// -// A GrvTransactionRateInfo object is maintained for each tag. This object -// is used to determine when a request can be released. -// -// Between each set of waits, runEpoch is run, releasing queued transactions -// that have passed the tag throttling stage. Transactions that are not yet ready -// are requeued during runEpoch. -class TagQueue { - struct DelayedRequest { - double startTime; - GetReadVersionRequest req; - explicit DelayedRequest(GetReadVersionRequest req) : startTime(now()), req(req) {} - double delayTime() const { return now() - startTime; } - }; - - // Track the budgets for each tag - TransactionTagMap rateInfos; - - // Requests that have not yet been processed - Deque newRequests; - - // Requests that have been delayed at least once - Deque delayedRequests; - - // Checks if count transactions can be released, given that - // alreadyReleased transactions have already been released in this epoch. - bool canStart(TransactionTag tag, int64_t alreadyReleased, int64_t count) const; - - // Checks if a request can be released - bool canStart(GetReadVersionRequest req, TransactionTagMap& releasedInEpoch) const; - -public: - // Called with rates received from ratekeeper - void updateRates(TransactionTagMap const& newRates); - - // elapsed indicates the amount of time since the last epoch was run. - // If a request is ready to be executed, it is sent to the deque - // corresponding to its priority. If not, the request remains queued. - void releaseTransactions(double elapsed, - SpannedDeque& outBatchPriority, - SpannedDeque& outDefaultPriority); - - void addRequest(GetReadVersionRequest); -};