From 80c2848af66988935fcf2f142b571e28e9bb3643 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Mon, 24 Feb 2020 09:52:31 -0800 Subject: [PATCH 1/4] Change the algorithm for the proxy handing out read versions to improve performance and increase responsiveness to changes in workload. --- fdbserver/Knobs.cpp | 3 +- fdbserver/Knobs.h | 3 +- fdbserver/MasterProxyServer.actor.cpp | 113 +++++++++++++++++--------- 3 files changed, 79 insertions(+), 40 deletions(-) diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index 5382f37c51..90adc9cd84 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -301,6 +301,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula init( START_TRANSACTION_BATCH_QUEUE_CHECK_INTERVAL, 0.001 ); init( START_TRANSACTION_MAX_TRANSACTIONS_TO_START, 100000 ); init( START_TRANSACTION_MAX_REQUESTS_TO_START, 10000 ); + init( START_TRANSACTION_RATE_WINDOW, 2.0 ); + init( START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET, 10.0 ); init( COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE, 0.0005 ); if( randomize && BUGGIFY ) COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE = 0.005; init( COMMIT_TRANSACTION_BATCH_INTERVAL_MIN, 0.001 ); if( randomize && BUGGIFY ) COMMIT_TRANSACTION_BATCH_INTERVAL_MIN = 0.1; @@ -318,7 +320,6 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula init( COMMIT_TRANSACTION_BATCH_BYTES_SCALE_BASE, 100000 ); init( COMMIT_TRANSACTION_BATCH_BYTES_SCALE_POWER, 0.0 ); - init( TRANSACTION_BUDGET_TIME, 0.050 ); if( randomize && BUGGIFY ) TRANSACTION_BUDGET_TIME = 0.0; init( RESOLVER_COALESCE_TIME, 1.0 ); init( BUGGIFIED_ROW_LIMIT, APPLY_MUTATION_BYTES ); if( randomize && BUGGIFY ) BUGGIFIED_ROW_LIMIT = deterministicRandom()->randomInt(3, 30); init( PROXY_SPIN_DELAY, 0.01 ); diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index 144b5eb142..c95b4e0f77 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -246,6 +246,8 @@ public: double START_TRANSACTION_BATCH_QUEUE_CHECK_INTERVAL; double START_TRANSACTION_MAX_TRANSACTIONS_TO_START; int START_TRANSACTION_MAX_REQUESTS_TO_START; + double START_TRANSACTION_RATE_WINDOW; + double START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET; double COMMIT_TRANSACTION_BATCH_INTERVAL_FROM_IDLE; double COMMIT_TRANSACTION_BATCH_INTERVAL_MIN; @@ -261,7 +263,6 @@ public: double COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL; double COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR; - double TRANSACTION_BUDGET_TIME; double RESOLVER_COALESCE_TIME; int BUGGIFIED_ROW_LIMIT; double PROXY_SPIN_DELAY; diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index 76fb0ad7ce..2aad5f149b 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -94,8 +94,61 @@ struct ProxyStats { } }; -ACTOR Future getRate(UID myID, Reference> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, double* outTransactionRate, - double* outBatchTransactionRate, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply) { +struct TransactionRateInfo { + double rate; + double limit; + double budget; + + bool disabled; + + Smoother smoothRate; + Smoother smoothReleased; + + TransactionRateInfo(double rate) : rate(rate), limit(0), budget(0), disabled(true), smoothRate(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW), + smoothReleased(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW) {} + + void reset(double elapsed) { + double releaseRate = smoothRate.smoothTotal() - smoothReleased.smoothRate(); + limit = SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW * releaseRate; + } + + bool canStart(int64_t numAlreadyStarted, int64_t count) { + return numAlreadyStarted + count <= std::min(limit + budget, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START); + } + + void updateBudget(int64_t numStartedAtPriority, bool queueEmptyAtPriority, double elapsed) { + budget = std::max(0.0, budget + elapsed * (limit - numStartedAtPriority) / SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW); + + if(queueEmptyAtPriority) { + budget = std::min(budget, SERVER_KNOBS->START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET); + } + + smoothReleased.addDelta(numStartedAtPriority); + } + + void disable() { + disabled = true; + rate = 0; + smoothRate.reset(0); + } + + void setRate(double rate) { + ASSERT(rate != std::numeric_limits::infinity()); + + this->rate = rate; + if(disabled) { + smoothRate.reset(rate); + disabled = false; + } + else { + smoothRate.setTotal(rate); + } + } +}; + + +ACTOR Future getRate(UID myID, Reference> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, TransactionRateInfo *transactionRateInfo, + TransactionRateInfo *batchTransactionRateInfo, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply) { state Future nextRequestTimer = Never(); state Future leaseTimeout = Never(); state Future reply = Never(); @@ -124,8 +177,9 @@ ACTOR Future getRate(UID myID, Reference> db, int64 } when ( GetRateInfoReply rep = wait(reply) ) { reply = Never(); - *outTransactionRate = rep.transactionRate; - *outBatchTransactionRate = rep.batchTransactionRate; + + transactionRateInfo->setRate(rep.transactionRate); + batchTransactionRateInfo->setRate(rep.batchTransactionRate); //TraceEvent("MasterProxyRate", myID).detail("Rate", rep.transactionRate).detail("BatchRate", rep.batchTransactionRate).detail("Lease", rep.leaseDuration).detail("ReleasedTransactions", *inTransactionCount - lastTC); lastTC = *inTransactionCount; leaseTimeout = delay(rep.leaseDuration); @@ -137,35 +191,15 @@ ACTOR Future getRate(UID myID, Reference> db, int64 } } when ( wait( leaseTimeout ) ) { - *outTransactionRate = 0; - *outBatchTransactionRate = 0; - //TraceEvent("MasterProxyRate", myID).detail("Rate", 0.0).detail("BatchRate", 0.0).detail("Lease", "Expired"); + transactionRateInfo->disable(); + batchTransactionRateInfo->disable(); + TraceEvent(SevWarn, "MasterProxyRateLeaseExpired", myID).suppressFor(5.0); + //TraceEvent("MasterProxyRate", myID).detail("Rate", 0.0).detail("BatchRate", 0.0).detail("Lease", 0); leaseTimeout = Never(); } } } -struct TransactionRateInfo { - double rate; - double limit; - - TransactionRateInfo(double rate) : rate(rate), limit(0) {} - - void reset(double elapsed) { - limit = std::min(0.0, limit) + rate * elapsed; // Adjust the limit based on the full elapsed interval in order to properly erase a deficit - limit = std::min(limit, rate * SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_MAX); // Don't allow the rate to exceed what would be allowed in the maximum batch interval - limit = std::min(limit, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START); - } - - bool canStart(int64_t numAlreadyStarted) { - return numAlreadyStarted < limit; - } - - void updateBudget(int64_t numStarted) { - limit -= numStarted; - } -}; - ACTOR Future queueTransactionStartRequests( Reference> db, std::priority_queue, @@ -1240,7 +1274,7 @@ ACTOR static Future transactionStarter( state vector otherProxies; state PromiseStream replyTimes; - addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo.rate, &batchRateInfo.rate, healthMetricsReply, detailedHealthMetricsReply)); + addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo, &batchRateInfo, healthMetricsReply, detailedHealthMetricsReply)); addActor.send(queueTransactionStartRequests(db, &transactionQueue, proxy.getConsistentReadVersion.getFuture(), GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(), &commitData->stats, &batchRateInfo)); @@ -1282,13 +1316,12 @@ ACTOR static Future transactionStarter( auto& req = transactionQueue.top().first; int tc = req.transactionCount; - if (req.priority() < GetReadVersionRequest::PRIORITY_DEFAULT && - !batchRateInfo.canStart(transactionsStarted[0] + transactionsStarted[1])) { - break; - } else if (req.priority() < GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE && - !normalRateInfo.canStart(transactionsStarted[0] + transactionsStarted[1])) { + if(req.priority() < GetReadVersionRequest::PRIORITY_DEFAULT && !batchRateInfo.canStart(transactionsStarted[0] + transactionsStarted[1], tc)) { break; } + else if(req.priority() < GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE && !normalRateInfo.canStart(transactionsStarted[0] + transactionsStarted[1], tc)) { + break; + } if (req.debugID.present()) { if (!debugID.present()) debugID = nondeterministicRandom()->randomUniqueID(); @@ -1323,11 +1356,15 @@ ACTOR static Future transactionStarter( .detail("TransactionBudget", transactionBudget) .detail("BatchTransactionBudget", batchTransactionBudget);*/ - transactionCount += transactionsStarted[0] + transactionsStarted[1]; - batchTransactionCount += batchPriTransactionsStarted[0] + batchPriTransactionsStarted[1]; + int systemTotalStarted = systemTransactionsStarted[0] + systemTransactionsStarted[1]; + int normalTotalStarted = defaultPriTransactionsStarted[0] + defaultPriTransactionsStarted[1]; + int batchTotalStarted = batchPriTransactionsStarted[0] + batchPriTransactionsStarted[1]; - normalRateInfo.updateBudget(transactionsStarted[0] + transactionsStarted[1]); - batchRateInfo.updateBudget(transactionsStarted[0] + transactionsStarted[1]); + transactionCount += transactionsStarted[0] + transactionsStarted[1]; + batchTransactionCount += batchTotalStarted; + + normalRateInfo.updateBudget(systemTotalStarted + normalTotalStarted, transactionQueue.empty() || transactionQueue.top().first.priority() < GetReadVersionRequest::PRIORITY_DEFAULT, elapsed); + batchRateInfo.updateBudget(systemTotalStarted + normalTotalStarted + batchTotalStarted, transactionQueue.empty(), elapsed); if (debugID.present()) { g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.masterProxyServerCore.Broadcast"); From 84b8f7ce9bc39c9f988f4d66b36a796ee3e91d37 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Mon, 24 Feb 2020 13:22:29 -0800 Subject: [PATCH 2/4] Add release notes. --- documentation/sphinx/source/release-notes.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/documentation/sphinx/source/release-notes.rst b/documentation/sphinx/source/release-notes.rst index 4061a9afc8..97f51b9814 100644 --- a/documentation/sphinx/source/release-notes.rst +++ b/documentation/sphinx/source/release-notes.rst @@ -8,6 +8,9 @@ Release Notes Performance ----------- +* Improve GRV tail latencies, particularly as the transaction rate gets nearer the ratekeeper limit. `(PR #2735) `_ +* The proxies are now more responsive to changes in workload when unthrottling lower priority transactions. `(PR #2735) `_ + Fixes ----- From 1ec55f21f707330edef34b75a9cf85ae46619ac4 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Fri, 28 Feb 2020 14:25:19 -0800 Subject: [PATCH 3/4] Add some comments for documentation; strengthen an assert. --- fdbserver/MasterProxyServer.actor.cpp | 29 +++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index 2aad5f149b..b973c2e818 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -107,7 +107,15 @@ struct TransactionRateInfo { TransactionRateInfo(double rate) : rate(rate), limit(0), budget(0), disabled(true), smoothRate(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW), smoothReleased(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW) {} - void reset(double elapsed) { + void reset() { + // Determine the number of transactions that this proxy is allowed to release + // Roughly speaking, this is done by computing the number of transactions over some historical window that we could + // have started but didn't, and making that our limit. More precisely, we track a smoothed rate limit and release rate, + // the difference of which is the rate of additional transactions that we could have released based on that window. + // Then we multiply by the window size to get a number of transactions. + // + // Limit can be negative in the event that we are releasing more transactions than we are allowed (due to the use of + // our budget or because of higher priority transactions). double releaseRate = smoothRate.smoothTotal() - smoothReleased.smoothRate(); limit = SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW * releaseRate; } @@ -117,8 +125,21 @@ struct TransactionRateInfo { } void updateBudget(int64_t numStartedAtPriority, bool queueEmptyAtPriority, double elapsed) { + // Update the budget to accumulate any extra capacity available or remove any excess that was used. + // The actual delta is the portion of the limit we didn't use multiplied by the fraction of the window that elapsed. + // + // We may have exceeded our limit due to the budget or because of higher priority transactions, in which case this + // delta will be negative. The delta can also be negative in the event that our limit was negative, which can happen + // if we had already started more transactions in our window than our rate would have allowed. + // + // This budget has the property that when the budget is required to start transactions (because batches are big), + // the sum limit+budget will increase linearly from 0 to the batch size over time and decrease by the batch size + // upon starting a batch. In other words, this works equivalently to a model where we linearly accumulate budget over + // time in the case that our batches are too big to take advantage of the window based limits. budget = std::max(0.0, budget + elapsed * (limit - numStartedAtPriority) / SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW); + // If we are emptying out the queue of requests, then we don't need to carry much budget forward + // If we did keep accumulating budget, then our responsiveness to changes in workflow could be compromised if(queueEmptyAtPriority) { budget = std::min(budget, SERVER_KNOBS->START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET); } @@ -133,7 +154,7 @@ struct TransactionRateInfo { } void setRate(double rate) { - ASSERT(rate != std::numeric_limits::infinity()); + ASSERT(rate >= 0 && rate != std::numeric_limits::infinity() && !isnan(rate)); this->rate = rate; if(disabled) { @@ -1300,8 +1321,8 @@ ACTOR static Future transactionStarter( if(elapsed == 0) elapsed = 1e-15; // resolve a possible indeterminant multiplication with infinite transaction rate - normalRateInfo.reset(elapsed); - batchRateInfo.reset(elapsed); + normalRateInfo.reset(); + batchRateInfo.reset(); int transactionsStarted[2] = {0,0}; int systemTransactionsStarted[2] = {0,0}; From 541c81a92af33437125fc7bf3e87dad2604dd038 Mon Sep 17 00:00:00 2001 From: "A.J. Beamon" Date: Tue, 14 Apr 2020 14:10:12 -0700 Subject: [PATCH 4/4] Fix merge related issue. --- fdbserver/MasterProxyServer.actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fdbserver/MasterProxyServer.actor.cpp b/fdbserver/MasterProxyServer.actor.cpp index cb80c49073..af9823a1fe 100644 --- a/fdbserver/MasterProxyServer.actor.cpp +++ b/fdbserver/MasterProxyServer.actor.cpp @@ -1488,8 +1488,8 @@ ACTOR static Future transactionStarter( transactionCount += transactionsStarted[0] + transactionsStarted[1]; batchTransactionCount += batchTotalStarted; - normalRateInfo.updateBudget(systemTotalStarted + normalTotalStarted, transactionQueue.empty() || transactionQueue.top().first.priority() < GetReadVersionRequest::PRIORITY_DEFAULT, elapsed); - batchRateInfo.updateBudget(systemTotalStarted + normalTotalStarted + batchTotalStarted, transactionQueue.empty(), elapsed); + normalRateInfo.updateBudget(systemTotalStarted + normalTotalStarted, systemQueue.empty() && defaultQueue.empty(), elapsed); + batchRateInfo.updateBudget(systemTotalStarted + normalTotalStarted + batchTotalStarted, systemQueue.empty() && defaultQueue.empty() && batchQueue.empty(), elapsed); if (debugID.present()) { g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.masterProxyServerCore.Broadcast");