Remove outSystemPriority parameter from TagQueue::runEpoch

This commit is contained in:
sfc-gh-tclinkenbeard 2022-10-05 14:54:50 -07:00
parent e313edbb85
commit 34857bd2bf
3 changed files with 6 additions and 39 deletions

View File

@ -888,8 +888,7 @@ ACTOR static Future<Void> transactionStarter(GrvProxyInterface proxy,
elapsed = 1e-15;
}
// TODO: Remove systemQueue parameter?
tagQueue.runEpoch(elapsed, defaultQueue, batchQueue, systemQueue);
tagQueue.runEpoch(elapsed, defaultQueue, batchQueue);
normalRateInfo.startEpoch();
batchRateInfo.startEpoch();

View File

@ -30,9 +30,6 @@ bool TagQueue::canStart(TransactionTag tag, int64_t count) const {
}
bool TagQueue::canStart(GetReadVersionRequest req) const {
if (req.priority == TransactionPriority::IMMEDIATE) {
return true;
}
for (const auto& [tag, count] : req.tags) {
if (!canStart(tag, count)) {
return false;
@ -60,8 +57,7 @@ void TagQueue::endEpoch(double elapsed) {
void TagQueue::runEpoch(double elapsed,
SpannedDeque<GetReadVersionRequest>& outBatchPriority,
SpannedDeque<GetReadVersionRequest>& outDefaultPriority,
SpannedDeque<GetReadVersionRequest>& outImmediatePriority) {
SpannedDeque<GetReadVersionRequest>& outDefaultPriority) {
startEpoch();
Deque<DelayedRequest> newDelayedRequests;
@ -77,9 +73,8 @@ void TagQueue::runEpoch(double elapsed,
outBatchPriority.push_back(req);
} else if (req.priority == TransactionPriority::DEFAULT) {
outDefaultPriority.push_back(req);
} else if (req.priority == TransactionPriority::IMMEDIATE) {
outImmediatePriority.push_back(req);
} else {
// Immediate priority transactions should bypass the TagQueue
ASSERT(false);
}
} else {
@ -98,9 +93,8 @@ void TagQueue::runEpoch(double elapsed,
outBatchPriority.push_back(req);
} else if (req.priority == TransactionPriority::DEFAULT) {
outDefaultPriority.push_back(req);
} else if (req.priority == TransactionPriority::IMMEDIATE) {
outImmediatePriority.push_back(req);
} else {
// Immediate priority transactions should bypass the TagQueue
ASSERT(false);
}
} else {
@ -140,11 +134,10 @@ ACTOR static Future<Void> mockClient(TagQueue* tagQueue,
ACTOR static Future<Void> mockServer(TagQueue* tagQueue) {
state SpannedDeque<GetReadVersionRequest> outBatchPriority("TestTagQueue_Batch"_loc);
state SpannedDeque<GetReadVersionRequest> outDefaultPriority("TestTagQueue_Default"_loc);
state SpannedDeque<GetReadVersionRequest> outImmediatePriority("TestTagQueue_Immediate"_loc);
loop {
state double elapsed = (0.009 + 0.002 * deterministicRandom()->random01());
wait(delay(elapsed));
tagQueue->runEpoch(elapsed, outBatchPriority, outDefaultPriority, outImmediatePriority);
tagQueue->runEpoch(elapsed, outBatchPriority, outDefaultPriority);
while (!outBatchPriority.empty()) {
outBatchPriority.front().reply.send(GetReadVersionReply{});
outBatchPriority.pop_front();
@ -153,10 +146,6 @@ ACTOR static Future<Void> mockServer(TagQueue* tagQueue) {
outDefaultPriority.front().reply.send(GetReadVersionReply{});
outDefaultPriority.pop_front();
}
while (!outImmediatePriority.empty()) {
outImmediatePriority.front().reply.send(GetReadVersionReply{});
outImmediatePriority.pop_front();
}
}
}
@ -185,26 +174,6 @@ TEST_CASE("/TagQueue/Simple") {
return Void();
}
// Immediate-priority transactions are not throttled by the TagQueue
TEST_CASE("/TagQueue/Immediate") {
state TagQueue tagQueue;
state TagSet tagSet;
state TransactionTagMap<uint32_t> counters;
{
TransactionTagMap<double> rates;
rates["sampleTag"_sr] = 10.0;
tagQueue.updateRates(rates);
}
tagSet.addTag("sampleTag"_sr);
state Future<Void> client = mockClient(&tagQueue, TransactionPriority::IMMEDIATE, tagSet, 1, 20.0, &counters);
state Future<Void> server = mockServer(&tagQueue);
wait(timeout(client && server, 60.0, Void()));
TraceEvent("TagQuotaTest_Immediate").detail("Counter", counters["sampleTag"_sr]);
ASSERT(isNear(counters["sampleTag"_sr], 60.0 * 20.0));
return Void();
}
// Throttle based on the tag with the lowest rate
TEST_CASE("/TagQueue/MultiTag") {
state TagQueue tagQueue;

View File

@ -27,7 +27,6 @@ public:
void updateRates(TransactionTagMap<double> const& newRates);
void runEpoch(double elapsed,
SpannedDeque<GetReadVersionRequest>& outBatchPriority,
SpannedDeque<GetReadVersionRequest>& outDefaultPriority,
SpannedDeque<GetReadVersionRequest>& outImmediatePriority);
SpannedDeque<GetReadVersionRequest>& outDefaultPriority);
void addRequest(GetReadVersionRequest);
};