New GlobalTagThrottler implementation tracking more per-storage

server metrics
This commit is contained in:
sfc-gh-tclinkenbeard 2022-07-10 23:22:58 -07:00
parent 6a63db08c8
commit e128c517d3
2 changed files with 344 additions and 179 deletions

View File

@ -27,189 +27,357 @@
#include "flow/actorcompiler.h" // must be last include
// In the function names below, several terms are used repeatedly. The context-specific are defined here:
// Cost: Every read or write operation has an associated cost, determined by the number of bytes accessed.
// Global tag throttling quotas are specified in terms of the amount of this cost that can be consumed
// per second. In the global tag throttler, cost refers to the per second rate of cost consumption.
// TPS: Transactions per second. Quotas are not specified in terms of TPS, but the limits given to clients must
// be specified in terms of TPS because throttling is performed at the front end of transactions (before costs are
// known).
// Total: Refers to the total quota specified by clients through the global tag throttling API. The sum of the
// costs of all operations (cluster-wide) with a particular tag cannot exceed the tag's specified total quota,
// even if the cluster has no saturated processes.
// Desired TPS: Assuming that a tag is able to achieve its total quota, this is the TPS it would be able to perform.
// Reserved: Refers to the reserved quota specified by clients through the global tag throttling API. As long as the
// sum of the costs of all operations (cluster-wide) with a particular tag are not above the tag's
// specified reserved quota, the tag should not experience any throttling from the global tag throttler.
// Current [Cost|TPS]: Measuring the current throughput on the cluster, independent of any specified quotas.
// ThrottlingRatio: Based on the health of each storage server, a throttling ratio is provided,
// informing the global tag throttler what ratio of the current throughput can be maintained.
// Limiting [Cost|TPS]: Based on the health of storage servers, a limiting throughput may be enforced.
// Target [Cost|TPS]: Based on reserved, limiting, and desired throughputs, this is the target throughput
// that the global tag throttler aims to achieve (across all clients).
// PerClient TPS: Because the target throughput must be shared across multiple clients, and all clients must
// be given the same limits, a per-client limit is calculated based on the current and target throughputs.
class GlobalTagThrottlerImpl {
enum class LimitType { RESERVED, TOTAL };
enum class OpType { READ, WRITE };
class QuotaAndCounters {
template <class K, class V>
static Optional<V> get(std::unordered_map<K, V> const& m, K const& k) {
auto it = m.find(k);
if (it == m.end()) {
return {};
} else {
return it->second;
class ThroughputCounters {
Smoother readCost;
Smoother writeCost;
// Returns difference between new and current rates
double updateCost(double newCost, OpType opType) {
if (opType == OpType::READ) {
auto const currentReadCost = readCost.getTotal();
return newCost - currentReadCost;
} else {
auto const currentWriteCost = writeCost.getTotal();
return newCost - currentWriteCost;
double getCost(OpType opType) const {
if (opType == OpType::READ) {
return readCost.smoothTotal();
} else {
return writeCost.smoothTotal();
// Track various statistics per tag, aggregated across all storage servers
class PerTagStatistics {
Optional<ThrottleApi::TagQuotaValue> quota;
std::unordered_map<UID, double> ssToReadCostRate;
std::unordered_map<UID, double> ssToWriteCostRate;
Smoother totalReadCostRate;
Smoother totalWriteCostRate;
Smoother transactionCounter;
Smoother perClientRate;
Optional<double> getReadTPSLimit(Optional<double> maxDesiredCost) const {
if (totalReadCostRate.smoothTotal() > 0) {
auto const desiredReadCost =
maxDesiredCost.present() ? std::min(maxDesiredCost.get(), quota.get().totalReadQuota)
: quota.get().totalReadQuota);
auto const averageCostPerTransaction =
totalReadCostRate.smoothTotal() / transactionCounter.smoothRate();
return desiredReadCost / averageCostPerTransaction;
} else {
return {};
Optional<double> getWriteTPSLimit(Optional<double> maxDesiredCost) const {
if (totalWriteCostRate.smoothTotal() > 0) {
auto const desiredWriteCost =
maxDesiredCost.present() ? std::min(maxDesiredCost.get(), quota.get().totalWriteQuota)
: quota.get().totalWriteQuota);
auto const averageCostPerTransaction = transactionCounter.smoothRate() / totalWriteCostRate.smoothTotal();
return desiredWriteCost * averageCostPerTransaction;
} else {
return {};
explicit PerTagStatistics()
void setQuota(ThrottleApi::TagQuotaValue const& quota) { this->quota = quota; }
Optional<ThrottleApi::TagQuotaValue> getQuota() const { return quota; }
Optional<ThrottleApi::TagQuotaValue> const &getQuota() const {
return quota;
void setQuota(ThrottleApi::TagQuotaValue quota) { this->quota = quota; }
double getReadCostRate() const {
return totalReadCostRate.smoothTotal();
double getWriteCostRate() const {
return totalWriteCostRate.smoothTotal();
void updateReadCostRate(UID ssId, double newReadCostRate) {
auto& currentReadCostRate = ssToReadCostRate[ssId];
auto diff = newReadCostRate - currentReadCostRate;
currentReadCostRate += diff;
void updateWriteCostRate(UID ssId, double newWriteCostRate) {
auto& currentWriteCostRate = ssToWriteCostRate[ssId];
auto diff = newWriteCostRate - currentWriteCostRate;
currentWriteCostRate += diff;
void clearQuota() { quota = {}; }
void addTransactions(int count) { transactionCounter.addDelta(count); }
Optional<double> getTargetTotalTPSLimit(Optional<double> maxReadCostRate, Optional<double> maxWriteCostRate) const {
if (!quota.present())
return {};
auto readLimit = getReadTPSLimit(maxReadCostRate);
auto writeLimit = getWriteTPSLimit(maxWriteCostRate);
double getTransactionRate() const { return transactionCounter.smoothRate(); }
if (!readLimit.present() && !writeLimit.present()) {
return {};
} else {
if (!readLimit.present()) {
return writeLimit.get();
} else if (!writeLimit.present()) {
return readLimit.get();
} else {
return std::min(readLimit.get(), writeLimit.get());
Optional<ClientTagThrottleLimits> updateAndGetPerClientLimit(Optional<double> maxReadCostRate, Optional<double> maxWriteCostRate) {
auto targetRate = getTargetTotalTPSLimit(maxReadCostRate, maxWriteCostRate);
if (targetRate.present() && transactionCounter.smoothRate() > 0) {
Optional<ClientTagThrottleLimits> updateAndGetPerClientLimit(Optional<double> targetCost) {
if (targetCost.present() && transactionCounter.smoothRate() > 0) {
auto newPerClientRate = std::max(
(targetRate.get() / transactionCounter.smoothRate()) * perClientRate.smoothTotal()));
(targetCost.get() / transactionCounter.smoothRate()) * perClientRate.smoothTotal()));
return ClientTagThrottleLimits(perClientRate.getTotal(), ClientTagThrottleLimits::NO_EXPIRATION);
} else {
return {};
void processTraceEvent(TraceEvent& te) const {
if (quota.present()) {
te.detail("ProvidedReadTPSLimit", getReadTPSLimit({}))
.detail("ProvidedWriteTPSLimit", getWriteTPSLimit({}))
.detail("ReadCostRate", totalReadCostRate.smoothTotal())
.detail("WriteCostRate", totalWriteCostRate.smoothTotal())
.detail("TotalReadQuota", quota.get().totalReadQuota)
.detail("ReservedReadQuota", quota.get().reservedReadQuota)
.detail("TotalWriteQuota", quota.get().totalWriteQuota)
.detail("ReservedWriteQuota", quota.get().reservedWriteQuota);
Database db;
UID id;
std::map<TransactionTag, QuotaAndCounters> trackedTags;
uint64_t throttledTagChangeId{ 0 };
Future<Void> traceActor;
Optional<double> throttlingRatio;
double getQuotaRatio(TransactionTagRef tag, OpType opType) const {
double sumQuota{ 0.0 };
double tagQuota{ 0.0 };
for (const auto &[tag2, quotaAndCounters] : trackedTags) {
if (!quotaAndCounters.getQuota().present()) {
int64_t quota{ 0 };
if (opType == OpType::READ) {
quota = quotaAndCounters.getQuota().get().totalReadQuota;
} else {
quota = quotaAndCounters.getQuota().get().totalWriteQuota;
sumQuota += quota;
if ( == 0) {
tagQuota = quota;
std::unordered_map<UID, Optional<double>> throttlingRatios;
std::unordered_map<TransactionTag, PerTagStatistics> tagStatistics;
std::unordered_map<UID, std::unordered_map<TransactionTag, ThroughputCounters>> throughput;
// Returns the cost rate for the given tag on the given storage server
Optional<double> getCurrentCost(UID storageServerId, TransactionTag tag, OpType opType) const {
auto const tagToThroughputCounters = get(throughput, storageServerId);
if (!tagToThroughputCounters.present()) {
return {};
if (tagQuota == 0) return 0;
ASSERT_GT(sumQuota, 0.0);
return tagQuota / sumQuota;
auto const throughputCounter = get(tagToThroughputCounters.get(), tag);
if (!throughputCounter.present()) {
return {};
return throughputCounter.get().getCost(opType);
// Returns the total cost rate (summed across all tags)
double getTotalCostRate(OpType opType) const {
double result{ 0 };
for (const auto &[tag, quotaAndCounters] : trackedTags) {
result +=
(opType == OpType::READ) ? quotaAndCounters.getReadCostRate() : quotaAndCounters.getWriteCostRate();
// Return the cost rate on the given storage server, summed across all tags
Optional<double> getCurrentCost(UID storageServerId, OpType opType) const {
auto tagToPerTagThroughput = get(throughput, storageServerId);
if (!tagToPerTagThroughput.present()) {
return {};
double result = 0;
for (const auto& [tag, perTagThroughput] : tagToPerTagThroughput.get()) {
result += perTagThroughput.getCost(opType);
return result;
ACTOR static Future<Void> tracer(GlobalTagThrottlerImpl const* self) {
loop {
for (const auto& [tag, quotaAndCounters] : self->trackedTags) {
TraceEvent te("GlobalTagThrottling");
te.detail("Tag", tag);
// Return the cost rate for the given tag, summed across all storage servers
double getCurrentCost(TransactionTag tag, OpType opType) const {
double result{ 0.0 };
for (const auto& [id, _] : throughput) {
result += getCurrentCost(id, tag, opType).orDefault(0);
return result;
// For transactions with the provided tag, returns the average cost that gets associated with the provided storage
// server
Optional<double> getAverageTransactionCost(TransactionTag tag, UID storageServerId, OpType opType) const {
auto const cost = getCurrentCost(storageServerId, tag, opType);
if (!cost.present()) {
return {};
auto const stats = get(tagStatistics, tag);
if (!stats.present()) {
return {};
auto const transactionRate = stats.get().getTransactionRate();
if (transactionRate == 0.0) {
return {};
} else {
return cost.get() / transactionRate;
// For transactions with the provided tag, returns the average cost
Optional<double> getAverageTransactionCost(TransactionTag tag, OpType opType) const {
auto const cost = getCurrentCost(tag, opType);
auto const stats = get(tagStatistics, tag);
if (!stats.present()) {
return {};
auto const transactionRate = stats.get().getTransactionRate();
if (transactionRate == 0.0) {
return {};
} else {
return cost / transactionRate;
// Returns the list of all tags performing meaningful work on the given storage server
std::vector<TransactionTag> getTagsAffectingStorageServer(UID storageServerId) const {
std::vector<TransactionTag> result;
auto const tagToThroughputCounters = get(throughput, storageServerId);
if (!tagToThroughputCounters.present()) {
return {};
} else {
for (const auto& [t, _] : tagToThroughputCounters.get()) {
return result;
Optional<double> getQuota(TransactionTag tag, OpType opType, LimitType limitType) const {
auto const stats = get(tagStatistics, tag);
if (!stats.present()) {
return {};
auto const quota = stats.get().getQuota();
if (!quota.present()) {
return {};
if (limitType == LimitType::TOTAL) {
return (opType == OpType::READ) ? quota.get().totalReadQuota : quota.get().totalWriteQuota;
} else {
return (opType == OpType::READ) ? quota.get().reservedReadQuota : quota.get().reservedWriteQuota;
// Of all tags meaningfully performing workload on the given storage server,
// returns the ratio of total quota allocated to the specified tag
double getQuotaRatio(TransactionTagRef tag, UID storageServerId, OpType opType) const {
double sumQuota{ 0.0 };
double tagQuota{ 0.0 };
auto const tagsAffectingStorageServer = getTagsAffectingStorageServer(storageServerId);
for (const auto& t : tagsAffectingStorageServer) {
auto const tQuota = getQuota(t, opType, LimitType::TOTAL);
sumQuota += tQuota.orDefault(0);
if ( == 0) {
tagQuota = tQuota.orDefault(0);
if (tagQuota == 0.0) {
return 0;
ASSERT_GT(sumQuota, 0.0);
return tagQuota / sumQuota;
// Returns the desired cost for a storage server, based on its current
// cost and throttling ratio
Optional<double> getLimitingCost(UID storageServerId, OpType opType) const {
auto const throttlingRatio = get(throttlingRatios, storageServerId);
auto const currentCost = getCurrentCost(storageServerId, opType);
if (!throttlingRatio.present() || currentCost.present() || !throttlingRatio.get().present()) {
return {};
return throttlingRatio.get().get() * currentCost.get();
// For a given storage server and tag combination, return the limiting transaction rate.
Optional<double> getLimitingTps(UID storageServerId, TransactionTag tag, OpType opType) {
auto const quotaRatio = getQuotaRatio(tag, storageServerId, opType);
auto const limitingCost = getLimitingCost(storageServerId, opType);
auto const averageTransactionCost = getAverageTransactionCost(tag, storageServerId, opType);
if (!limitingCost.present() || !averageTransactionCost.present()) {
return {};
auto const limitingCostForTag = limitingCost.get() * quotaRatio;
return limitingCostForTag / averageTransactionCost.get();
// Return the limiting transaction rate, aggregated across all storage servers
Optional<double> getLimitingTps(TransactionTag tag, OpType opType) {
Optional<double> result;
for (const auto& [id, _] : throttlingRatios) {
auto const targetTpsForSS = getLimitingTps(id, tag, opType);
if (result.present() && targetTpsForSS.present()) {
result = std::min(result.get(), targetTpsForSS.get());
} else {
result = targetTpsForSS;
return result;
Optional<double> getLimitingTps(TransactionTag tag) {
auto const readLimitingTps = getLimitingTps(tag, OpType::READ);
auto const writeLimitingTps = getLimitingTps(tag, OpType::WRITE);
if (readLimitingTps.present() && writeLimitingTps.present()) {
return std::min(readLimitingTps.get(), writeLimitingTps.get());
} else if (readLimitingTps.present()) {
return readLimitingTps;
} else {
return writeLimitingTps;
Optional<double> getDesiredTps(TransactionTag tag, OpType opType) const {
auto const averageTransactionCost = getAverageTransactionCost(tag, opType);
if (!averageTransactionCost.present() || averageTransactionCost.get() == 0) {
return {};
auto const stats = get(tagStatistics, tag);
if (!stats.present()) {
return {};
auto const quota = stats.get().getQuota();
if (!quota.present()) {
return {};
auto const desiredCost = (opType == OpType::READ) ? quota.get().totalReadQuota : quota.get().totalWriteQuota;
return desiredCost / averageTransactionCost.get();
Optional<double> getDesiredTps(TransactionTag tag) const {
auto const readDesiredTps = getDesiredTps(tag, OpType::READ);
auto const writeDesiredTps = getDesiredTps(tag, OpType::WRITE);
if (readDesiredTps.present() && writeDesiredTps.present()) {
return std::min(readDesiredTps.get(), writeDesiredTps.get());
} else if (readDesiredTps.present()) {
return readDesiredTps;
} else {
return writeDesiredTps;
Optional<double> getReservedTps(TransactionTag tag, OpType opType) const {
auto const reservedCost = getQuota(tag, opType, LimitType::RESERVED);
auto const averageTransactionCost = getAverageTransactionCost(tag, opType);
if (!reservedCost.present() || !averageTransactionCost.present() || averageTransactionCost.get() == 0) {
return {};
} else {
return reservedCost.get() / averageTransactionCost.get();
Optional<double> getReservedTps(TransactionTag tag) const {
auto const readReservedTps = getReservedTps(tag, OpType::READ);
auto const writeReservedTps = getReservedTps(tag, OpType::WRITE);
if (readReservedTps.present() && writeReservedTps.present()) {
return std::max(readReservedTps.get(), writeReservedTps.get());
} else if (readReservedTps.present()) {
return readReservedTps;
} else {
return writeReservedTps;
void removeUnseenTags(std::unordered_set<TransactionTag> const& seenTags) {
std::map<TransactionTag, QuotaAndCounters>::iterator it = trackedTags.begin();
while (it != trackedTags.end()) {
std::unordered_map<TransactionTag, PerTagStatistics>::iterator it = tagStatistics.begin();
while (it != tagStatistics.end()) {
auto current = it++;
auto const tag = current->first;
if (seenTags.find(tag) == seenTags.end()) {
if (tagStatistics.find(tag) == tagStatistics.end()) {
@ -230,7 +398,7 @@ class GlobalTagThrottlerImpl {
for (auto const kv : currentQuotas) {
auto const tag = kv.key.removePrefix(tagQuotaPrefix);
auto const quota = ThrottleApi::TagQuotaValue::fromValue(kv.value);
@ -247,33 +415,31 @@ class GlobalTagThrottlerImpl {
Optional<double> getMaxCostRate(TransactionTagRef tag, OpType opType) const {
if (throttlingRatio.present()) {
auto const desiredTotalCostRate = throttlingRatio.get() * getTotalCostRate(opType);
return desiredTotalCostRate * getQuotaRatio(tag, opType);
} else {
return {};
GlobalTagThrottlerImpl(Database db, UID id) : db(db), id(id) { traceActor = tracer(this); }
GlobalTagThrottlerImpl(Database db, UID id) : db(db), id(id) {}
Future<Void> monitorThrottlingChanges() { return monitorThrottlingChanges(this); }
void addRequests(TransactionTag tag, int count) { trackedTags[tag].addTransactions(count); }
void addRequests(TransactionTag tag, int count) { tagStatistics[tag].addTransactions(count); }
uint64_t getThrottledTagChangeId() const { return throttledTagChangeId; }
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() {
PrioritizedTransactionTagMap<ClientTagThrottleLimits> result;
for (auto& [tag, quotaAndCounters] : trackedTags) {
for (auto& [tag, stats] : tagStatistics) {
// Currently there is no differentiation between batch priority and default priority transactions
auto const limit = quotaAndCounters.updateAndGetPerClientLimit(getMaxCostRate(tag, OpType::READ),
getMaxCostRate(tag, OpType::WRITE));
if (limit.present()) {
result[TransactionPriority::BATCH][tag] = result[TransactionPriority::DEFAULT][tag] = limit.get();
auto const limitingTps = getLimitingTps(tag);
auto const desiredTps = getDesiredTps(tag);
auto const reservedTps = getReservedTps(tag);
if (!limitingTps.present() || !desiredTps.present() || !reservedTps.present()) {
return {};
} else {
auto const targetCost = std::max(reservedTps.get(), std::min(limitingTps.get(), desiredTps.get()));
auto const perClientLimit = stats.updateAndGetPerClientLimit(targetCost);
result[TransactionPriority::BATCH][tag] = result[TransactionPriority::DEFAULT][tag] =
return result;
int64_t autoThrottleCount() const { return trackedTags.size(); }
// FIXME: Only count tags that have quota set
int64_t autoThrottleCount() const { return tagStatistics.size(); }
uint32_t busyReadTagCount() const {
// TODO: Implement
return 0;
@ -282,27 +448,25 @@ public:
// TODO: Implement
return 0;
int64_t manualThrottleCount() const { return trackedTags.size(); }
int64_t manualThrottleCount() const { return 0; }
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const& ss) {
for (const auto& busyReadTag : ss.busiestReadTags) {
trackedTags[busyReadTag.tag].updateReadCostRate(, busyReadTag.rate);
throughput[][busyReadTag.tag].updateCost(busyReadTag.rate, OpType::READ);
for (const auto& busyWriteTag : ss.busiestWriteTags) {
trackedTags[busyWriteTag.tag].updateWriteCostRate(, busyWriteTag.rate);
throughput[][busyWriteTag.tag].updateCost(busyWriteTag.rate, OpType::WRITE);
return Void();
void setThrottlingRatio(Optional<double> ratio) {
throttlingRatio = ratio;
void setThrottlingRatio(UID storageServerId, Optional<double> ratio) { throttlingRatios[storageServerId] = ratio; }
void setQuota(TransactionTagRef tag, ThrottleApi::TagQuotaValue const& tagQuotaValue) {
void removeQuota(TransactionTagRef tag) { trackedTags.erase(tag); }
void removeQuota(TransactionTagRef tag) { tagStatistics[tag].clearQuota(); }
GlobalTagThrottler::GlobalTagThrottler(Database db, UID id) : impl(PImpl<GlobalTagThrottlerImpl>::create(db, id)) {}
@ -339,8 +503,9 @@ bool GlobalTagThrottler::isAutoThrottlingEnabled() const {
Future<Void> GlobalTagThrottler::tryUpdateAutoThrottling(StorageQueueInfo const& ss) {
return impl->tryUpdateAutoThrottling(ss);
void GlobalTagThrottler::setThrottlingRatio(Optional<double> ratio) {
return impl->setThrottlingRatio(ratio);
void GlobalTagThrottler::setThrottlingRatio(UID storageServerId, Optional<double> ratio) {
return impl->setThrottlingRatio(storageServerId, ratio);
void GlobalTagThrottler::setQuota(TransactionTagRef tag, ThrottleApi::TagQuotaValue const& tagQuotaValue) {
@ -455,15 +620,10 @@ public:
return result;
Optional<double> getWorstThrottlingRatio() const {
Optional<double> result;
for (const auto& storageServer : storageServers) {
auto const throttlingRatio = storageServer.getThrottlingRatio();
if (result.present() && throttlingRatio.present()) {
result = std::max(result.get(), throttlingRatio.get());
} else if (throttlingRatio.present()) {
result = throttlingRatio.get();
std::map<UID, Optional<double>> getThrottlingRatios() const {
std::map<UID, Optional<double>> result;
for (int i = 0; i < storageServers.size(); ++i) {
result[UID(i, i)] = storageServers[i].getThrottlingRatio();
return result;
@ -531,7 +691,10 @@ ACTOR static Future<Void> updateGlobalTagThrottler(GlobalTagThrottler* globalTag
for (const auto& sq : storageQueueInfos) {
auto const throttlingRatios = storageServers->getThrottlingRatios();
for (const auto& [id, ratio] : throttlingRatios) {
globalTagThrottler->setThrottlingRatio(id, ratio);

View File

@ -80,19 +80,21 @@ public:
Future<Void> monitorThrottlingChanges() override;
void addRequests(TransactionTag tag, int count) override;
uint64_t getThrottledTagChangeId() const override;
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() override;
int64_t autoThrottleCount() const override;
uint32_t busyReadTagCount() const override;
uint32_t busyWriteTagCount() const override;
int64_t manualThrottleCount() const override;
bool isAutoThrottlingEnabled() const override;
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const&) override;
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() override;
// Based on limiting storage queue size, set a ratio by which total throughput needs to be
// Based on limiting storage queue size, set a ratio by which total throughput on the storage server needs to be
// adjusted
void setThrottlingRatio(Optional<double>);
void setThrottlingRatio(UID storageServerId, Optional<double> ratio);
// testing only
// Testing only:
void setQuota(TransactionTagRef, ThrottleApi::TagQuotaValue const&);
void removeQuota(TransactionTagRef);