Add comments to TagQueue class

This commit is contained in:
sfc-gh-tclinkenbeard 2022-10-06 07:37:58 -07:00
parent 69b3c09cf7
commit 8ff66d6fad
2 changed files with 41 additions and 30 deletions

View File

@ -19,19 +19,17 @@ void TagQueue::updateRates(TransactionTagMap<double> const& newRates) {
}
}
bool TagQueue::canStart(TransactionTag tag, int64_t count) const {
bool TagQueue::canStart(TransactionTag tag, int64_t alreadyReleased, int64_t count) const {
auto it = rateInfos.find(tag);
if (it == rateInfos.end()) {
return true;
}
auto it2 = releasedInEpoch.find(tag);
auto alreadyReleased = (it2 == releasedInEpoch.end() ? 0 : it2->second);
return it->second.canStart(alreadyReleased, count);
}
bool TagQueue::canStart(GetReadVersionRequest req) const {
bool TagQueue::canStart(GetReadVersionRequest req, TransactionTagMap<int64_t>& releasedInEpoch) const {
for (const auto& [tag, count] : req.tags) {
if (!canStart(tag, count)) {
if (!canStart(tag, releasedInEpoch[tag], count)) {
return false;
}
}
@ -42,29 +40,20 @@ void TagQueue::addRequest(GetReadVersionRequest req) {
newRequests.push_back(req);
}
void TagQueue::startEpoch() {
for (auto& [_, rateInfo] : rateInfos) {
rateInfo.startEpoch();
}
releasedInEpoch.clear();
}
void TagQueue::endEpoch(double elapsed) {
for (auto& [tag, rateInfo] : rateInfos) {
rateInfo.endEpoch(releasedInEpoch[tag], false, elapsed);
}
}
void TagQueue::runEpoch(double elapsed,
SpannedDeque<GetReadVersionRequest>& outBatchPriority,
SpannedDeque<GetReadVersionRequest>& outDefaultPriority) {
startEpoch();
for (auto& [_, rateInfo] : rateInfos) {
rateInfo.startEpoch();
}
Deque<DelayedRequest> newDelayedRequests;
TransactionTagMap<int64_t> releasedInEpoch;
while (!delayedRequests.empty()) {
auto& delayedReq = delayedRequests.front();
auto& req = delayedReq.req;
if (canStart(req)) {
if (canStart(req, releasedInEpoch)) {
for (const auto& [tag, count] : req.tags) {
releasedInEpoch[tag] += count;
}
@ -85,7 +74,7 @@ void TagQueue::runEpoch(double elapsed,
while (!newRequests.empty()) {
auto const& req = newRequests.front();
if (canStart(req)) {
if (canStart(req, releasedInEpoch)) {
for (const auto& [tag, count] : req.tags) {
releasedInEpoch[tag] += count;
}
@ -104,7 +93,9 @@ void TagQueue::runEpoch(double elapsed,
}
delayedRequests = std::move(newDelayedRequests);
endEpoch(elapsed);
for (auto& [tag, rateInfo] : rateInfos) {
rateInfo.endEpoch(std::move(releasedInEpoch)[tag], false, elapsed);
}
}
ACTOR static Future<Void> mockClient(TagQueue* tagQueue,

View File

@ -1,10 +1,18 @@
#pragma once
#include "fdbclient/CommitProxyInterface.h"
#include "fdbclient/TagThrottle.actor.h"
#include "fdbserver/GrvTransactionRateInfo.h"
#include <map>
// TagQueue is used to throttle GetReadVersionRequests based on tag quotas
// before they're pushed into priority-partitioned queues.
//
// A GrvTransactionRateInfo object is maintained for each tag. This object
// is used to determine when a request can be released.
//
// Between each set of waits, runEpoch is run, releasing queued transactions
// that have passed the tag throttling stage. Transactions that are not yet ready
// are requeued during runEpoch.
class TagQueue {
struct DelayedRequest {
double startTime;
@ -13,20 +21,32 @@ class TagQueue {
double delayTime() const { return now() - startTime; }
};
std::map<TransactionTag, GrvTransactionRateInfo> rateInfos;
std::map<TransactionTag, int64_t> releasedInEpoch;
// Track the budgets for each tag
TransactionTagMap<GrvTransactionRateInfo> rateInfos;
// Requests that have not yet been processed
Deque<GetReadVersionRequest> newRequests;
// Requests that have been delayed at least once
Deque<DelayedRequest> delayedRequests;
bool canStart(TransactionTag tag, int64_t count) const;
bool canStart(GetReadVersionRequest req) const;
void startEpoch();
void endEpoch(double elapsed);
// Checks if count transactions can be released, given that
// alreadyReleased transactions have already been released in this epoch.
bool canStart(TransactionTag tag, int64_t alreadyReleased, int64_t count) const;
// Checks if a request can be released
bool canStart(GetReadVersionRequest req, TransactionTagMap<int64_t>& releasedInEpoch) const;
public:
// Called with rates received from ratekeeper
void updateRates(TransactionTagMap<double> const& newRates);
// elapsed indicates the amount of time since the last epoch was run.
// If a request is ready to be executed, it is sent to the deque
// corresponding to its priority. If not, the request remains queued.
void runEpoch(double elapsed,
SpannedDeque<GetReadVersionRequest>& outBatchPriority,
SpannedDeque<GetReadVersionRequest>& outDefaultPriority);
void addRequest(GetReadVersionRequest);
};