use a Deque for each priority instead of a priority queue to improve CPU with large numbers of outstanding requests

This commit is contained in:
Evan Tschannen 2020-03-13 18:07:48 -07:00
parent 243c268d9d
commit ebbf4490b3
1 changed files with 37 additions and 19 deletions

View File

@ -136,13 +136,14 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
}
ACTOR Future<Void> queueTransactionStartRequests(
std::priority_queue< std::pair<GetReadVersionRequest, int64_t>, std::vector< std::pair<GetReadVersionRequest, int64_t> > > *transactionQueue,
Deque<GetReadVersionRequest> *systemQueue,
Deque<GetReadVersionRequest> *defaultQueue,
Deque<GetReadVersionRequest> *batchQueue,
FutureStream<GetReadVersionRequest> readVersionRequests,
PromiseStream<Void> GRVTimer, double *lastGRVTime,
double *GRVBatchTime, FutureStream<double> replyTimes,
ProxyStats* stats)
{
state int64_t counter = 0;
loop choose{
when(GetReadVersionRequest req = waitNext(readVersionRequests)) {
if( stats->txnRequestIn.getValue() - stats->txnRequestOut.getValue() > SERVER_KNOBS->START_TRANSACTION_MAX_QUEUE_SIZE ) {
@ -156,20 +157,22 @@ ACTOR Future<Void> queueTransactionStartRequests(
if (req.debugID.present())
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.queueTransactionStartRequests.Before");
++stats->txnRequestIn;
stats->txnStartIn += req.transactionCount;
if (req.priority() >= GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE)
stats->txnSystemPriorityStartIn += req.transactionCount;
else if (req.priority() >= GetReadVersionRequest::PRIORITY_DEFAULT)
stats->txnDefaultPriorityStartIn += req.transactionCount;
else
stats->txnBatchPriorityStartIn += req.transactionCount;
if (transactionQueue->empty()) {
if (systemQueue->empty() && defaultQueue->empty() && batchQueue->empty()) {
forwardPromise(GRVTimer, delayJittered(std::max(0.0, *GRVBatchTime - (now() - *lastGRVTime)), TaskPriority::ProxyGRVTimer));
}
transactionQueue->push(std::make_pair(req, counter--));
++stats->txnRequestIn;
stats->txnStartIn += req.transactionCount;
if (req.priority() >= GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE) {
stats->txnSystemPriorityStartIn += req.transactionCount;
systemQueue->push_back(req);
} else if (req.priority() >= GetReadVersionRequest::PRIORITY_DEFAULT) {
stats->txnDefaultPriorityStartIn += req.transactionCount;
defaultQueue->push_back(req);
} else {
stats->txnBatchPriorityStartIn += req.transactionCount;
batchQueue->push_back(req);
}
}
}
// dynamic batching monitors reply latencies
@ -1230,12 +1233,14 @@ ACTOR static Future<Void> transactionStarter(
state TransactionRateInfo normalRateInfo(10);
state TransactionRateInfo batchRateInfo(0);
state std::priority_queue<std::pair<GetReadVersionRequest, int64_t>, std::vector<std::pair<GetReadVersionRequest, int64_t>>> transactionQueue;
state Deque<GetReadVersionRequest> systemQueue,
state Deque<GetReadVersionRequest> defaultQueue,
state Deque<GetReadVersionRequest> batchQueue,
state vector<MasterProxyInterface> otherProxies;
state PromiseStream<double> replyTimes;
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo.rate, &batchRateInfo.rate, healthMetricsReply, detailedHealthMetricsReply));
addActor.send(queueTransactionStartRequests(&transactionQueue, proxy.getConsistentReadVersion.getFuture(), GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats));
addActor.send(queueTransactionStartRequests(&systemQueue, &defaultQueue, &batchQueue, proxy.getConsistentReadVersion.getFuture(), GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats));
// Get a list of the other proxies that go together with us
while (std::find(db->get().client.proxies.begin(), db->get().client.proxies.end(), proxy) == db->get().client.proxies.end())
@ -1270,8 +1275,20 @@ ACTOR static Future<Void> transactionStarter(
Optional<UID> debugID;
int requestsToStart = 0;
while (!transactionQueue.empty() && requestsToStart < SERVER_KNOBS->START_TRANSACTION_MAX_REQUESTS_TO_START) {
auto& req = transactionQueue.top().first;
while (requestsToStart < SERVER_KNOBS->START_TRANSACTION_MAX_REQUESTS_TO_START) {
Deque<GetReadVersionRequest>* transactionQueue;
if(!systemQueue.empty()) {
transactionQueue = &systemQueue;
} else if(!defaultQueue.empty()) {
transactionQueue = &defaultQueue;
} else if(!batchQueue.empty()) {
transactionQueue = &batchQueue;
} else {
break;
}
auto& req = transactionQueue->front();
int tc = req.transactionCount;
if(req.priority() < GetReadVersionRequest::PRIORITY_DEFAULT && !batchRateInfo.canStart(transactionsStarted[0] + transactionsStarted[1])) {
@ -1295,12 +1312,13 @@ ACTOR static Future<Void> transactionStarter(
batchPriTransactionsStarted[req.flags & 1] += tc;
start[req.flags & 1].push_back(std::move(req)); static_assert(GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY == 1, "Implementation dependent on flag value");
transactionQueue.pop();
transactionQueue->pop_front();
requestsToStart++;
}
if (!transactionQueue.empty())
if (!systemQueue.empty() || !defaultQueue.empty() || !batchQueue.empty()) {
forwardPromise(GRVTimer, delayJittered(SERVER_KNOBS->START_TRANSACTION_BATCH_QUEUE_CHECK_INTERVAL, TaskPriority::ProxyGRVTimer));
}
/*TraceEvent("GRVBatch", proxy.id())
.detail("Elapsed", elapsed)