Make GrvProxyTransactionTagThrottler a FIFO queue for unthrottled transactions, even for transactions with different tags
This commit is contained in:
parent
f1cb4e40f5
commit
671c4fe0a2
|
@ -22,6 +22,20 @@
|
|||
#include "flow/UnitTest.h"
|
||||
#include "flow/actorcompiler.h" // must be last include
|
||||
|
||||
uint64_t GrvProxyTransactionTagThrottler::DelayedRequest::lastSequenceNumber = 0;
|
||||
|
||||
void GrvProxyTransactionTagThrottler::DelayedRequest::updateProxyTagThrottledDuration() {
|
||||
req.proxyTagThrottledDuration = now() - startTime;
|
||||
}
|
||||
|
||||
void GrvProxyTransactionTagThrottler::TagQueue::setRate(double rate) {
|
||||
if (rateInfo.present()) {
|
||||
rateInfo.get().setRate(rate);
|
||||
} else {
|
||||
rateInfo = GrvTransactionRateInfo(rate);
|
||||
}
|
||||
}
|
||||
|
||||
void GrvProxyTransactionTagThrottler::updateRates(TransactionTagMap<double> const& newRates) {
|
||||
for (const auto& [tag, rate] : newRates) {
|
||||
auto it = queues.find(tag);
|
||||
|
@ -65,44 +79,79 @@ void GrvProxyTransactionTagThrottler::addRequest(GetReadVersionRequest const& re
|
|||
queues[tag].requests.emplace_back(req);
|
||||
}
|
||||
|
||||
void GrvProxyTransactionTagThrottler::TagQueue::releaseTransactions(
|
||||
double elapsed,
|
||||
SpannedDeque<GetReadVersionRequest>& outBatchPriority,
|
||||
SpannedDeque<GetReadVersionRequest>& outDefaultPriority) {
|
||||
Deque<DelayedRequest> newDelayedRequests;
|
||||
if (rateInfo.present())
|
||||
rateInfo.get().startReleaseWindow();
|
||||
int transactionsReleased = 0;
|
||||
while (!requests.empty()) {
|
||||
auto& delayedReq = requests.front();
|
||||
auto& req = delayedReq.req;
|
||||
auto const count = req.tags.begin()->second;
|
||||
if (!rateInfo.present() || rateInfo.get().canStart(transactionsReleased, count)) {
|
||||
req.proxyTagThrottledDuration = now() - delayedReq.startTime;
|
||||
transactionsReleased += count;
|
||||
if (req.priority == TransactionPriority::BATCH) {
|
||||
outBatchPriority.push_back(req);
|
||||
} else if (req.priority == TransactionPriority::DEFAULT) {
|
||||
outDefaultPriority.push_back(req);
|
||||
} else {
|
||||
// Immediate priority transactions should bypass the GrvProxyTransactionTagThrottler
|
||||
ASSERT(false);
|
||||
}
|
||||
} else {
|
||||
newDelayedRequests.push_back(delayedReq);
|
||||
}
|
||||
requests.pop_front();
|
||||
}
|
||||
if (rateInfo.present())
|
||||
rateInfo.get().endReleaseWindow(transactionsReleased, false, elapsed);
|
||||
requests = std::move(newDelayedRequests);
|
||||
}
|
||||
|
||||
void GrvProxyTransactionTagThrottler::releaseTransactions(double elapsed,
|
||||
SpannedDeque<GetReadVersionRequest>& outBatchPriority,
|
||||
SpannedDeque<GetReadVersionRequest>& outDefaultPriority) {
|
||||
for (auto& [_, tagQueue] : queues) {
|
||||
tagQueue.releaseTransactions(elapsed, outBatchPriority, outDefaultPriority);
|
||||
struct TagInfo {
|
||||
// Store pointers here to avoid frequent std::unordered_map lookups
|
||||
TagQueue* queue;
|
||||
uint32_t* numReleased;
|
||||
// Sequence number of the first queued request
|
||||
int64_t nextSeqNo;
|
||||
bool operator<(TagInfo const& rhs) const { return nextSeqNo < rhs.nextSeqNo; }
|
||||
explicit TagInfo(TagQueue& queue, uint32_t& numReleased) : queue(&queue), numReleased(&numReleased) {
|
||||
ASSERT(!this->queue->requests.empty());
|
||||
nextSeqNo = this->queue->requests.front().sequenceNumber;
|
||||
}
|
||||
};
|
||||
|
||||
// Track transactions released for each tag
|
||||
TransactionTagMap<uint32_t> transactionsReleased;
|
||||
|
||||
std::priority_queue<TagInfo> pq;
|
||||
for (auto& [tag, queue] : queues) {
|
||||
if (queue.rateInfo.present()) {
|
||||
queue.rateInfo.get().startReleaseWindow();
|
||||
}
|
||||
if (!queue.requests.empty()) {
|
||||
pq.emplace(queue, transactionsReleased[tag]);
|
||||
}
|
||||
}
|
||||
|
||||
while (!pq.empty()) {
|
||||
auto info = pq.top();
|
||||
pq.pop();
|
||||
// Used to determine when it is time to start processing another tag
|
||||
auto const nextQueueSeqNo = pq.empty() ? std::numeric_limits<int64_t>::max() : pq.top().nextSeqNo;
|
||||
|
||||
while (!info.queue->requests.empty()) {
|
||||
auto& delayedReq = info.queue->requests.front();
|
||||
auto count = delayedReq.req.tags.begin()->second;
|
||||
ASSERT_EQ(info.nextSeqNo, delayedReq.sequenceNumber);
|
||||
if (info.queue->rateInfo.present() && !info.queue->rateInfo.get().canStart(*(info.numReleased), count)) {
|
||||
// Cannot release any more transaction from this tag (don't push the info back into pq)
|
||||
CODE_PROBE(true, "GrvProxyTransactionTagThrottler::releaseTransactions : Throttling transaction");
|
||||
break;
|
||||
} else {
|
||||
if (info.nextSeqNo < nextQueueSeqNo) {
|
||||
// Releasing transaction
|
||||
*(info.numReleased) += count;
|
||||
delayedReq.updateProxyTagThrottledDuration();
|
||||
if (delayedReq.req.priority == TransactionPriority::BATCH) {
|
||||
outBatchPriority.push_back(delayedReq.req);
|
||||
} else if (delayedReq.req.priority == TransactionPriority::DEFAULT) {
|
||||
outDefaultPriority.push_back(delayedReq.req);
|
||||
} else {
|
||||
// Immediate priority transactions should bypass the GrvProxyTransactionTagThrottler
|
||||
ASSERT(false);
|
||||
}
|
||||
info.queue->requests.pop_front();
|
||||
if (!info.queue->requests.empty()) {
|
||||
info.nextSeqNo = info.queue->requests.front().sequenceNumber;
|
||||
}
|
||||
} else {
|
||||
CODE_PROBE(
|
||||
true, "GrvProxyTransactionTagThrottler::releaseTransactions : Switching tags to preserve FIFO");
|
||||
pq.push(info);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for (auto& [tag, queue] : queues) {
|
||||
if (queue.rateInfo.present()) {
|
||||
queue.rateInfo.get().endReleaseWindow(transactionsReleased[tag], false, elapsed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -31,6 +31,10 @@ GrvTransactionRateInfo::GrvTransactionRateInfo(double rate)
|
|||
}
|
||||
|
||||
bool GrvTransactionRateInfo::canStart(int64_t numAlreadyStarted, int64_t count) const {
|
||||
TraceEvent("HERE_CanStart")
|
||||
.detail("Limit", limit)
|
||||
.detail("Budget", budget)
|
||||
.detail("Rate", smoothRate.smoothTotal());
|
||||
return numAlreadyStarted + count <=
|
||||
std::min(limit + budget, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START);
|
||||
}
|
||||
|
|
|
@ -34,12 +34,18 @@
|
|||
// that have passed the tag throttling stage. Transactions that are not yet ready
|
||||
// are requeued during releaseTransactions.
|
||||
class GrvProxyTransactionTagThrottler {
|
||||
struct DelayedRequest {
|
||||
GetReadVersionRequest req;
|
||||
class DelayedRequest {
|
||||
static uint64_t lastSequenceNumber;
|
||||
double startTime;
|
||||
|
||||
explicit DelayedRequest(GetReadVersionRequest const& req, double startTime = now())
|
||||
: req(req), startTime(startTime) {}
|
||||
public:
|
||||
GetReadVersionRequest req;
|
||||
uint64_t sequenceNumber;
|
||||
|
||||
explicit DelayedRequest(GetReadVersionRequest const& req)
|
||||
: req(req), startTime(now()), sequenceNumber(++lastSequenceNumber) {}
|
||||
|
||||
void updateProxyTagThrottledDuration();
|
||||
};
|
||||
|
||||
struct TagQueue {
|
||||
|
@ -48,17 +54,7 @@ class GrvProxyTransactionTagThrottler {
|
|||
|
||||
explicit TagQueue(double rate = 0.0) : rateInfo(rate) {}
|
||||
|
||||
void releaseTransactions(double elapsed,
|
||||
SpannedDeque<GetReadVersionRequest>& outBatchPriority,
|
||||
SpannedDeque<GetReadVersionRequest>& outDefaultPriority);
|
||||
|
||||
void setRate(double rate) {
|
||||
if (rateInfo.present()) {
|
||||
rateInfo.get().setRate(rate);
|
||||
} else {
|
||||
rateInfo = GrvTransactionRateInfo(rate);
|
||||
}
|
||||
}
|
||||
void setRate(double rate);
|
||||
};
|
||||
|
||||
// Track the budgets for each tag
|
||||
|
|
Loading…
Reference in New Issue