diff --git a/fdbserver/GrvProxyServer.actor.cpp b/fdbserver/GrvProxyServer.actor.cpp index 789b2973b1..c25df2cd1a 100644 --- a/fdbserver/GrvProxyServer.actor.cpp +++ b/fdbserver/GrvProxyServer.actor.cpp @@ -24,11 +24,12 @@ #include "fdbclient/Notified.h" #include "fdbclient/TransactionLineage.h" #include "fdbclient/Tuple.h" -#include "fdbserver/LogSystem.h" -#include "fdbserver/LogSystemDiskQueueAdapter.h" #include "fdbclient/CommitProxyInterface.h" #include "fdbclient/GrvProxyInterface.h" #include "fdbclient/VersionVector.h" +#include "fdbserver/GrvTransactionRateInfo.h" +#include "fdbserver/LogSystem.h" +#include "fdbserver/LogSystemDiskQueueAdapter.h" #include "fdbserver/WaitFailure.h" #include "fdbserver/WorkerInterface.actor.h" #include "fdbrpc/sim_validation.h" @@ -158,83 +159,6 @@ struct GrvProxyStats { } }; -struct GrvTransactionRateInfo { - double rate; - double limit; - double budget; - - bool disabled; - - Smoother smoothRate; - Smoother smoothReleased; - - GrvTransactionRateInfo(double rate = 0.0) - : rate(rate), limit(0), budget(0), disabled(true), smoothRate(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW), - smoothReleased(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW) {} - - void reset() { - // Determine the number of transactions that this proxy is allowed to release - // Roughly speaking, this is done by computing the number of transactions over some historical window that we - // could have started but didn't, and making that our limit. More precisely, we track a smoothed rate limit and - // release rate, the difference of which is the rate of additional transactions that we could have released - // based on that window. Then we multiply by the window size to get a number of transactions. - // - // Limit can be negative in the event that we are releasing more transactions than we are allowed (due to the - // use of our budget or because of higher priority transactions). - double releaseRate = smoothRate.smoothTotal() - smoothReleased.smoothRate(); - limit = SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW * releaseRate; - } - - bool canStart(int64_t numAlreadyStarted, int64_t count) const { - return numAlreadyStarted + count <= - std::min(limit + budget, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START); - } - - void updateBudget(int64_t numStartedAtPriority, bool queueEmptyAtPriority, double elapsed) { - // Update the budget to accumulate any extra capacity available or remove any excess that was used. - // The actual delta is the portion of the limit we didn't use multiplied by the fraction of the window that - // elapsed. - // - // We may have exceeded our limit due to the budget or because of higher priority transactions, in which case - // this delta will be negative. The delta can also be negative in the event that our limit was negative, which - // can happen if we had already started more transactions in our window than our rate would have allowed. - // - // This budget has the property that when the budget is required to start transactions (because batches are - // big), the sum limit+budget will increase linearly from 0 to the batch size over time and decrease by the - // batch size upon starting a batch. In other words, this works equivalently to a model where we linearly - // accumulate budget over time in the case that our batches are too big to take advantage of the window based - // limits. - budget = std::max( - 0.0, budget + elapsed * (limit - numStartedAtPriority) / SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW); - - // If we are emptying out the queue of requests, then we don't need to carry much budget forward - // If we did keep accumulating budget, then our responsiveness to changes in workflow could be compromised - if (queueEmptyAtPriority) { - budget = std::min(budget, SERVER_KNOBS->START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET); - } - - smoothReleased.addDelta(numStartedAtPriority); - } - - void disable() { - disabled = true; - // Use smoothRate.setTotal(0) instead of setting rate to 0 so txns will not be throttled immediately. - smoothRate.setTotal(0); - } - - void setRate(double rate) { - ASSERT(rate >= 0 && rate != std::numeric_limits::infinity() && !std::isnan(rate)); - - this->rate = rate; - if (disabled) { - smoothRate.reset(rate); - disabled = false; - } else { - smoothRate.setTotal(rate); - } - } -}; - struct GrvProxyData { GrvProxyInterface proxy; UID dbgid; @@ -622,7 +546,7 @@ ACTOR Future queueGetReadVersionRequests( } else { // Return error for batch_priority GRV requests int64_t proxiesCount = std::max((int)db->get().client.grvProxies.size(), 1); - if (batchRateInfo->rate <= (1.0 / proxiesCount)) { + if (batchRateInfo->getRate() <= (1.0 / proxiesCount)) { req.reply.sendError(batch_transaction_throttled()); stats->txnThrottled += req.transactionCount; } else { @@ -960,11 +884,11 @@ ACTOR static Future transactionStarter(GrvProxyInterface proxy, elapsed = 1e-15; } - normalRateInfo.reset(); - batchRateInfo.reset(); + normalRateInfo.startEpoch(); + batchRateInfo.startEpoch(); - grvProxyData->stats.transactionLimit = normalRateInfo.limit; - grvProxyData->stats.batchTransactionLimit = batchRateInfo.limit; + grvProxyData->stats.transactionLimit = normalRateInfo.getLimit(); + grvProxyData->stats.batchTransactionLimit = batchRateInfo.getLimit(); int transactionsStarted[2] = { 0, 0 }; int systemTransactionsStarted[2] = { 0, 0 }; @@ -1071,11 +995,11 @@ ACTOR static Future transactionStarter(GrvProxyInterface proxy, transactionCount += transactionsStarted[0] + transactionsStarted[1]; batchTransactionCount += batchTotalStarted; - normalRateInfo.updateBudget( + normalRateInfo.endEpoch( systemTotalStarted + normalTotalStarted, systemQueue.empty() && defaultQueue.empty(), elapsed); - batchRateInfo.updateBudget(systemTotalStarted + normalTotalStarted + batchTotalStarted, - systemQueue.empty() && defaultQueue.empty() && batchQueue.empty(), - elapsed); + batchRateInfo.endEpoch(systemTotalStarted + normalTotalStarted + batchTotalStarted, + systemQueue.empty() && defaultQueue.empty() && batchQueue.empty(), + elapsed); if (debugID.present()) { g_traceBatch.addEvent("TransactionDebug", diff --git a/fdbserver/GrvTransactionRateInfo.actor.cpp b/fdbserver/GrvTransactionRateInfo.actor.cpp new file mode 100644 index 0000000000..b676f3afb1 --- /dev/null +++ b/fdbserver/GrvTransactionRateInfo.actor.cpp @@ -0,0 +1,118 @@ +/* + * GrvTransactionRateInfo.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbserver/GrvTransactionRateInfo.h" + +#include "fdbserver/Knobs.h" +#include "flow/UnitTest.h" +#include "flow/actorcompiler.h" // must be last include + +GrvTransactionRateInfo::GrvTransactionRateInfo(double rate) + : rate(rate), smoothRate(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW), + smoothReleased(SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW) {} + +bool GrvTransactionRateInfo::canStart(int64_t numAlreadyStarted, int64_t count) const { + return numAlreadyStarted + count <= + std::min(limit + budget, SERVER_KNOBS->START_TRANSACTION_MAX_TRANSACTIONS_TO_START); +} + +void GrvTransactionRateInfo::endEpoch(int64_t numStartedAtPriority, bool queueEmptyAtPriority, double elapsed) { + // Update the budget to accumulate any extra capacity available or remove any excess that was used. + // The actual delta is the portion of the limit we didn't use multiplied by the fraction of the window that + // elapsed. + // + // We may have exceeded our limit due to the budget or because of higher priority transactions, in which case + // this delta will be negative. The delta can also be negative in the event that our limit was negative, which + // can happen if we had already started more transactions in our window than our rate would have allowed. + // + // This budget has the property that when the budget is required to start transactions (because batches are + // big), the sum limit+budget will increase linearly from 0 to the batch size over time and decrease by the + // batch size upon starting a batch. In other words, this works equivalently to a model where we linearly + // accumulate budget over time in the case that our batches are too big to take advantage of the window based + // limits. + budget = + std::max(0.0, budget + elapsed * (limit - numStartedAtPriority) / SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW); + + // If we are emptying out the queue of requests, then we don't need to carry much budget forward + // If we did keep accumulating budget, then our responsiveness to changes in workflow could be compromised + if (queueEmptyAtPriority) { + budget = std::min(budget, SERVER_KNOBS->START_TRANSACTION_MAX_EMPTY_QUEUE_BUDGET); + } + + smoothReleased.addDelta(numStartedAtPriority); +} + +void GrvTransactionRateInfo::disable() { + disabled = true; + // Use smoothRate.setTotal(0) instead of setting rate to 0 so txns will not be throttled immediately. + smoothRate.setTotal(0); +} + +void GrvTransactionRateInfo::setRate(double rate) { + ASSERT(rate >= 0 && rate != std::numeric_limits::infinity() && !std::isnan(rate)); + + this->rate = rate; + if (disabled) { + smoothRate.reset(rate); + disabled = false; + } else { + smoothRate.setTotal(rate); + } +} + +void GrvTransactionRateInfo::startEpoch() { + // Determine the number of transactions that this proxy is allowed to release + // Roughly speaking, this is done by computing the number of transactions over some historical window that we + // could have started but didn't, and making that our limit. More precisely, we track a smoothed rate limit and + // release rate, the difference of which is the rate of additional transactions that we could have released + // based on that window. Then we multiply by the window size to get a number of transactions. + // + // Limit can be negative in the event that we are releasing more transactions than we are allowed (due to the + // use of our budget or because of higher priority transactions). + double releaseRate = smoothRate.smoothTotal() - smoothReleased.smoothRate(); + limit = SERVER_KNOBS->START_TRANSACTION_RATE_WINDOW * releaseRate; +} + +static bool isNear(double desired, int64_t actual) { + return std::abs(desired - actual) * 10 < desired; +} + +ACTOR static Future mockClient(GrvTransactionRateInfo* rateInfo, double desiredRate, int64_t* counter) { + loop { + state double elapsed = (0.9 + 0.2 * deterministicRandom()->random01()) / desiredRate; + wait(delay(elapsed)); + rateInfo->startEpoch(); + int started = rateInfo->canStart(0, 1) ? 1 : 0; + *counter += started; + rateInfo->endEpoch(started, false, elapsed); + } +} + +// Rate limit set at 10, but client attempts 20 transactions per second. +// Client should be throttled to only 10 transactions per second. +TEST_CASE("/GrvTransactionRateInfo/Simple") { + state GrvTransactionRateInfo rateInfo; + state int64_t counter; + rateInfo.setRate(10.0); + wait(timeout(mockClient(&rateInfo, 20.0, &counter), 60.0, Void())); + TraceEvent("GrvTransactionRateInfoTest").detail("Counter", counter); + ASSERT(isNear(60.0 * 20.0, counter)); + return Void(); +} diff --git a/fdbserver/include/fdbserver/GrvTransactionRateInfo.h b/fdbserver/include/fdbserver/GrvTransactionRateInfo.h new file mode 100644 index 0000000000..9b4a553923 --- /dev/null +++ b/fdbserver/include/fdbserver/GrvTransactionRateInfo.h @@ -0,0 +1,69 @@ +/* + * GrvTransactionRateInfo.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "fdbrpc/Smoother.h" + +// Used by GRV Proxy to enforce rate limits received from the Ratekeeper. +// +// Between delays, the GrvTransactionRateInfo executes an "epoch" starting +// with a call to the startEpoch method. Within this epoch, transactions are +// released while canStart returns true. At the end of the epoch, the +// endEpoch method is called, and the budget is updated to add or +// remove capacity. +// +// Meanwhile, the desired rate is updated through the setRate method. +// +// Smoothers are used to avoid turbulent throttling behaviour. +class GrvTransactionRateInfo { + double rate{ 0.0 }; + double limit{ 0.0 }; + double budget{ 0.0 }; + bool disabled{ true }; + Smoother smoothRate; + Smoother smoothReleased; + +public: + explicit GrvTransactionRateInfo(double rate = 0.0); + + // Determines the number of transactions that this proxy is allowed to release + // in this epoch. + void startEpoch(); + + // Checks if a "count" new transactions can be released, given that + // "numAlreadyStarted" transactions have already been released in the + // current epoch. + bool canStart(int64_t numAlreadyStarted, int64_t count) const; + + // Updates the budget to accumulate any extra capacity available or remove any excess that was used. + // Call at the end of an epoch. + void endEpoch(int64_t numStartedAtPriority, bool queueEmptyAtPriority, double elapsed); + + // Smoothly sets rate. If currently disabled, reenable + void setRate(double rate); + + // Smoothly sets transaction rate to 0. Call disable when new rates have not been + // set for a sufficiently long period of time. + void disable(); + + double getRate() const { return rate; } + double getLimit() const { return limit; } +};