Merge pull request #2735 from ajbeamon/grv-proxy-perf-improvements

Improve proxy GRV latencies and responsiveness
This commit is contained in:
Evan Tschannen 2020-04-14 16:45:48 -07:00 committed by GitHub
commit dd951afe92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 105 additions and 42 deletions

View File

@ -12,6 +12,9 @@ Features
Performance
-----------
* Improve GRV tail latencies, particularly as the transaction rate gets nearer the ratekeeper limit. `(PR #2735) <https://github.com/apple/foundationdb/pull/2735>`_
* The proxies are now more responsive to changes in workload when unthrottling lower priority transactions. `(PR #2735) <https://github.com/apple/foundationdb/pull/2735>`_
Fixes
-----

View File

@ -307,6 +307,8 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( START_TRANSACTION_BATCH_QUEUE_CHECK_INTERVAL, 0.001 );
init( START_TRANSACTION_MAX_TRANSACTIONS_TO_START, 100000 );
init( START_TRANSACTION_MAX_REQUESTS_TO_START, 10000 );
init( START_TRANSACTION_RATE_WINDOW, 2.0 );
init( START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET, 10.0 );
init( START_TRANSACTION_MAX_QUEUE_SIZE, 1e6 );
init( KEY_LOCATION_MAX_QUEUE_SIZE, 1e6 );
@ -326,7 +328,6 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( COMMIT_TRANSACTION_BATCH_BYTES_SCALE_BASE, 100000 );
init( COMMIT_TRANSACTION_BATCH_BYTES_SCALE_POWER, 0.0 );
init( TRANSACTION_BUDGET_TIME, 0.050 ); if( randomize && BUGGIFY ) TRANSACTION_BUDGET_TIME = 0.0;
init( RESOLVER_COALESCE_TIME, 1.0 );
init( BUGGIFIED_ROW_LIMIT, APPLY_MUTATION_BYTES ); if( randomize && BUGGIFY ) BUGGIFIED_ROW_LIMIT = deterministicRandom()->randomInt(3, 30);
init( PROXY_SPIN_DELAY, 0.01 );

View File

@ -248,6 +248,8 @@ public:
double START_TRANSACTION_BATCH_QUEUE_CHECK_INTERVAL;
double START_TRANSACTION_MAX_TRANSACTIONS_TO_START;
int START_TRANSACTION_MAX_REQUESTS_TO_START;
double START_TRANSACTION_RATE_WINDOW;
double START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET;
int START_TRANSACTION_MAX_QUEUE_SIZE;
int KEY_LOCATION_MAX_QUEUE_SIZE;
@ -265,7 +267,6 @@ public:
double COMMIT_BATCHES_MEM_FRACTION_OF_TOTAL;
double COMMIT_BATCHES_MEM_TO_TOTAL_MEM_SCALE_FACTOR;
double TRANSACTION_BUDGET_TIME;
double RESOLVER_COALESCE_TIME;
int BUGGIFIED_ROW_LIMIT;
double PROXY_SPIN_DELAY;

View File

@ -97,8 +97,82 @@ struct ProxyStats {
}
};
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, double* outTransactionRate,
double* outBatchTransactionRate, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply) {
struct TransactionRateInfo {
double rate;
double limit;
double budget;
bool disabled;
Smoother smoothRate;
Smoother smoothReleased;
TransactionRateInfo(double rate) : rate(rate), limit(0), budget(0), disabled(true), smoothRate(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW),
smoothReleased(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW) {}
void reset() {
// Determine the number of transactions that this proxy is allowed to release
// Roughly speaking, this is done by computing the number of transactions over some historical window that we could
// have started but didn't, and making that our limit. More precisely, we track a smoothed rate limit and release rate,
// the difference of which is the rate of additional transactions that we could have released based on that window.
// Then we multiply by the window size to get a number of transactions.
//
// Limit can be negative in the event that we are releasing more transactions than we are allowed (due to the use of
// our budget or because of higher priority transactions).
double releaseRate = smoothRate.smoothTotal() - smoothReleased.smoothRate();
limit = SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW * releaseRate;
}
bool canStart(int64_t numAlreadyStarted, int64_t count) {
return numAlreadyStarted + count <= std::min(limit + budget, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START);
}
void updateBudget(int64_t numStartedAtPriority, bool queueEmptyAtPriority, double elapsed) {
// Update the budget to accumulate any extra capacity available or remove any excess that was used.
// The actual delta is the portion of the limit we didn't use multiplied by the fraction of the window that elapsed.
//
// We may have exceeded our limit due to the budget or because of higher priority transactions, in which case this
// delta will be negative. The delta can also be negative in the event that our limit was negative, which can happen
// if we had already started more transactions in our window than our rate would have allowed.
//
// This budget has the property that when the budget is required to start transactions (because batches are big),
// the sum limit+budget will increase linearly from 0 to the batch size over time and decrease by the batch size
// upon starting a batch. In other words, this works equivalently to a model where we linearly accumulate budget over
// time in the case that our batches are too big to take advantage of the window based limits.
budget = std::max(0.0, budget + elapsed * (limit - numStartedAtPriority) / SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW);
// If we are emptying out the queue of requests, then we don't need to carry much budget forward
// If we did keep accumulating budget, then our responsiveness to changes in workflow could be compromised
if(queueEmptyAtPriority) {
budget = std::min(budget, SERVER_KNOBS->START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET);
}
smoothReleased.addDelta(numStartedAtPriority);
}
void disable() {
disabled = true;
rate = 0;
smoothRate.reset(0);
}
void setRate(double rate) {
ASSERT(rate >= 0 && rate != std::numeric_limits<double>::infinity() && !isnan(rate));
this->rate = rate;
if(disabled) {
smoothRate.reset(rate);
disabled = false;
}
else {
smoothRate.setTotal(rate);
}
}
};
ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64_t* inTransactionCount, int64_t* inBatchTransactionCount, TransactionRateInfo *transactionRateInfo,
TransactionRateInfo *batchTransactionRateInfo, GetHealthMetricsReply* healthMetricsReply, GetHealthMetricsReply* detailedHealthMetricsReply) {
state Future<Void> nextRequestTimer = Never();
state Future<Void> leaseTimeout = Never();
state Future<GetRateInfoReply> reply = Never();
@ -127,8 +201,9 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
}
when ( GetRateInfoReply rep = wait(reply) ) {
reply = Never();
*outTransactionRate = rep.transactionRate;
*outBatchTransactionRate = rep.batchTransactionRate;
transactionRateInfo->setRate(rep.transactionRate);
batchTransactionRateInfo->setRate(rep.batchTransactionRate);
//TraceEvent("MasterProxyRate", myID).detail("Rate", rep.transactionRate).detail("BatchRate", rep.batchTransactionRate).detail("Lease", rep.leaseDuration).detail("ReleasedTransactions", *inTransactionCount - lastTC);
lastTC = *inTransactionCount;
leaseTimeout = delay(rep.leaseDuration);
@ -140,35 +215,15 @@ ACTOR Future<Void> getRate(UID myID, Reference<AsyncVar<ServerDBInfo>> db, int64
}
}
when ( wait( leaseTimeout ) ) {
*outTransactionRate = 0;
*outBatchTransactionRate = 0;
//TraceEvent("MasterProxyRate", myID).detail("Rate", 0.0).detail("BatchRate", 0.0).detail("Lease", "Expired");
transactionRateInfo->disable();
batchTransactionRateInfo->disable();
TraceEvent(SevWarn, "MasterProxyRateLeaseExpired", myID).suppressFor(5.0);
//TraceEvent("MasterProxyRate", myID).detail("Rate", 0.0).detail("BatchRate", 0.0).detail("Lease", 0);
leaseTimeout = Never();
}
}
}
struct TransactionRateInfo {
double rate;
double limit;
TransactionRateInfo(double rate) : rate(rate), limit(0) {}
void reset(double elapsed) {
limit = std::min(0.0, limit) + rate * elapsed; // Adjust the limit based on the full elapsed interval in order to properly erase a deficit
limit = std::min(limit, rate * SERVER_KNOBS->START_TRANSACTION_BATCH_INTERVAL_MAX); // Don't allow the rate to exceed what would be allowed in the maximum batch interval
limit = std::min(limit, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START);
}
bool canStart(int64_t numAlreadyStarted) {
return numAlreadyStarted < limit;
}
void updateBudget(int64_t numStarted) {
limit -= numStarted;
}
};
ACTOR Future<Void> queueTransactionStartRequests(
Reference<AsyncVar<ServerDBInfo>> db,
Deque<GetReadVersionRequest> *systemQueue,
@ -1331,7 +1386,7 @@ ACTOR static Future<Void> transactionStarter(
state vector<MasterProxyInterface> otherProxies;
state PromiseStream<double> replyTimes;
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo.rate, &batchRateInfo.rate, healthMetricsReply, detailedHealthMetricsReply));
addActor.send(getRate(proxy.id(), db, &transactionCount, &batchTransactionCount, &normalRateInfo, &batchRateInfo, healthMetricsReply, detailedHealthMetricsReply));
addActor.send(queueTransactionStartRequests(db, &systemQueue, &defaultQueue, &batchQueue, proxy.getConsistentReadVersion.getFuture(),
GRVTimer, &lastGRVTime, &GRVBatchTime, replyTimes.getFuture(),
&commitData->stats, &batchRateInfo));
@ -1357,8 +1412,8 @@ ACTOR static Future<Void> transactionStarter(
if(elapsed == 0) elapsed = 1e-15; // resolve a possible indeterminant multiplication with infinite transaction rate
normalRateInfo.reset(elapsed);
batchRateInfo.reset(elapsed);
normalRateInfo.reset();
batchRateInfo.reset();
int transactionsStarted[2] = {0,0};
int systemTransactionsStarted[2] = {0,0};
@ -1385,13 +1440,12 @@ ACTOR static Future<Void> transactionStarter(
auto& req = transactionQueue->front();
int tc = req.transactionCount;
if (req.priority() < GetReadVersionRequest::PRIORITY_DEFAULT &&
!batchRateInfo.canStart(transactionsStarted[0] + transactionsStarted[1])) {
break;
} else if (req.priority() < GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE &&
!normalRateInfo.canStart(transactionsStarted[0] + transactionsStarted[1])) {
if(req.priority() < GetReadVersionRequest::PRIORITY_DEFAULT && !batchRateInfo.canStart(transactionsStarted[0] + transactionsStarted[1], tc)) {
break;
}
else if(req.priority() < GetReadVersionRequest::PRIORITY_SYSTEM_IMMEDIATE && !normalRateInfo.canStart(transactionsStarted[0] + transactionsStarted[1], tc)) {
break;
}
if (req.debugID.present()) {
if (!debugID.present()) debugID = nondeterministicRandom()->randomUniqueID();
@ -1427,11 +1481,15 @@ ACTOR static Future<Void> transactionStarter(
.detail("TransactionBudget", transactionBudget)
.detail("BatchTransactionBudget", batchTransactionBudget);*/
transactionCount += transactionsStarted[0] + transactionsStarted[1];
batchTransactionCount += batchPriTransactionsStarted[0] + batchPriTransactionsStarted[1];
int systemTotalStarted = systemTransactionsStarted[0] + systemTransactionsStarted[1];
int normalTotalStarted = defaultPriTransactionsStarted[0] + defaultPriTransactionsStarted[1];
int batchTotalStarted = batchPriTransactionsStarted[0] + batchPriTransactionsStarted[1];
normalRateInfo.updateBudget(transactionsStarted[0] + transactionsStarted[1]);
batchRateInfo.updateBudget(transactionsStarted[0] + transactionsStarted[1]);
transactionCount += transactionsStarted[0] + transactionsStarted[1];
batchTransactionCount += batchTotalStarted;
normalRateInfo.updateBudget(systemTotalStarted + normalTotalStarted, systemQueue.empty() && defaultQueue.empty(), elapsed);
batchRateInfo.updateBudget(systemTotalStarted + normalTotalStarted + batchTotalStarted, systemQueue.empty() && defaultQueue.empty() && batchQueue.empty(), elapsed);
if (debugID.present()) {
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.masterProxyServerCore.Broadcast");