diff --git a/fdbcli/QuotaCommand.actor.cpp b/fdbcli/QuotaCommand.actor.cpp new file mode 100644 index 0000000000..ba8546fa15 --- /dev/null +++ b/fdbcli/QuotaCommand.actor.cpp @@ -0,0 +1,178 @@ +/* + * QuotaCommand.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbcli/fdbcli.actor.h" +#include "flow/actorcompiler.h" // This must be the last include + +namespace { + +enum class LimitType { RESERVED, TOTAL }; + +enum class OpType { READ, WRITE }; + +Optional parseTag(StringRef token) { + if (token.size() > CLIENT_KNOBS->MAX_TRANSACTION_TAG_LENGTH) { + return {}; + } else { + return token; + } +} + +Optional parseLimitType(StringRef token) { + if (token == "reserved"_sr) { + return LimitType::RESERVED; + } else if (token == "total"_sr) { + return LimitType::TOTAL; + } else { + return {}; + } +} + +Optional parseOpType(StringRef token) { + if (token == "read"_sr) { + return OpType::READ; + } else if (token == "write"_sr) { + return OpType::WRITE; + } else { + return {}; + } +} + +Optional parseLimitValue(StringRef token) { + try { + return std::stod(token.toString()); + } catch (...) { + return {}; + } +} + +ACTOR Future getQuota(Reference db, TransactionTag tag, LimitType limitType, OpType opType) { + state Reference tr = db->createTransaction(); + loop { + tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + try { + state ThreadFuture> resultFuture = tr->get(tag.withPrefix(tagQuotaPrefix)); + Optional v = wait(safeThreadFutureToFuture(resultFuture)); + if (!v.present()) { + fmt::print("\n"); + } else { + auto const quota = ThrottleApi::TagQuotaValue::fromValue(v.get()); + if (limitType == LimitType::TOTAL && opType == OpType::READ) { + fmt::print("{}\n", quota.totalReadQuota); + } else if (limitType == LimitType::TOTAL && opType == OpType::WRITE) { + fmt::print("{}\n", quota.totalWriteQuota); + } else if (limitType == LimitType::RESERVED && opType == OpType::READ) { + fmt::print("{}\n", quota.reservedReadQuota); + } else if (limitType == LimitType::RESERVED && opType == OpType::WRITE) { + fmt::print("{}\n", quota.reservedWriteQuota); + } + } + return Void(); + } catch (Error& e) { + wait(safeThreadFutureToFuture(tr->onError(e))); + } + } +} + +ACTOR Future setQuota(Reference db, + TransactionTag tag, + LimitType limitType, + OpType opType, + double value) { + state Reference tr = db->createTransaction(); + state Key key = tag.withPrefix(tagQuotaPrefix); + loop { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + try { + state ThreadFuture> resultFuture = tr->get(key); + Optional v = wait(safeThreadFutureToFuture(resultFuture)); + ThrottleApi::TagQuotaValue quota; + if (v.present()) { + quota = ThrottleApi::TagQuotaValue::fromValue(v.get()); + } + if (limitType == LimitType::TOTAL && opType == OpType::READ) { + quota.totalReadQuota = value; + } else if (limitType == LimitType::TOTAL && opType == OpType::WRITE) { + quota.totalWriteQuota = value; + } else if (limitType == LimitType::RESERVED && opType == OpType::READ) { + quota.reservedReadQuota = value; + } else if (limitType == LimitType::RESERVED && opType == OpType::WRITE) { + quota.reservedWriteQuota = value; + } + ThrottleApi::setTagQuota(tr, + tag, + quota.reservedReadQuota, + quota.totalReadQuota, + quota.reservedWriteQuota, + quota.totalWriteQuota); + wait(safeThreadFutureToFuture(tr->commit())); + return Void(); + } catch (Error& e) { + wait(safeThreadFutureToFuture(tr->onError(e))); + } + } +} + +constexpr auto usage = + "quota [get [reserved|total] [read|write]|set [reserved|total] [read|write] ]"; + +bool exitFailure() { + fmt::print(usage); + return false; +} + +} // namespace + +namespace fdb_cli { + +ACTOR Future quotaCommandActor(Reference db, std::vector tokens) { + state bool result = true; + if (tokens.size() != 5 && tokens.size() != 6) { + return exitFailure(); + } else { + auto tag = parseTag(tokens[2]); + auto limitType = parseLimitType(tokens[3]); + auto opType = parseOpType(tokens[4]); + if (!tag.present() || !limitType.present() || !opType.present()) { + return exitFailure(); + } + if (tokens[1] == "get"_sr) { + if (tokens.size() != 5) { + return exitFailure(); + } + wait(getQuota(db, tag.get(), limitType.get(), opType.get())); + return true; + } else if (tokens[1] == "set"_sr) { + if (tokens.size() != 6) { + return exitFailure(); + } + auto const limitValue = parseLimitValue(tokens[5]); + if (!limitValue.present()) { + return exitFailure(); + } + wait(setQuota(db, tag.get(), limitType.get(), opType.get(), limitValue.get())); + return true; + } else { + return exitFailure(); + } + } +} + +} // namespace fdb_cli diff --git a/fdbcli/fdbcli.actor.cpp b/fdbcli/fdbcli.actor.cpp index 4191f39fa7..9669485154 100644 --- a/fdbcli/fdbcli.actor.cpp +++ b/fdbcli/fdbcli.actor.cpp @@ -509,6 +509,10 @@ void initHelp() { CommandHelp("getversion", "Fetch the current read version", "Displays the current read version of the database or currently running transaction."); + helpMap["quota"] = + CommandHelp("quota", + "quota [get [reserved|total] [read|write]|set [reserved|total] [read|write] ]", + "Get or modify the throughput quota for the specified tag."); helpMap["reset"] = CommandHelp("reset", "reset the current transaction", @@ -1468,6 +1472,14 @@ ACTOR Future cli(CLIOptions opt, LineNoise* plinenoise) { continue; } + if (tokencmp(tokens[0], "quota")) { + bool _result = wait(makeInterruptable(quotaCommandActor(db, tokens))); + if (!_result) { + is_error = true; + } + continue; + } + if (tokencmp(tokens[0], "reset")) { if (tokens.size() != 1) { printUsage(tokens[0]); diff --git a/fdbcli/include/fdbcli/fdbcli.actor.h b/fdbcli/include/fdbcli/fdbcli.actor.h index ec443e4a19..28afe963f0 100644 --- a/fdbcli/include/fdbcli/fdbcli.actor.h +++ b/fdbcli/include/fdbcli/fdbcli.actor.h @@ -218,6 +218,8 @@ ACTOR Future profileCommandActor(Database db, Reference tr, std::vector tokens, bool intrans); +// quota command +ACTOR Future quotaCommandActor(Reference db, std::vector tokens); // setclass command ACTOR Future setClassCommandActor(Reference db, std::vector tokens); // snapshot command diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 1fd857060f..beb0f1fb3d 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -5151,8 +5151,9 @@ Future> Transaction::get(const Key& key, Snapshot snapshot) { if (!ver.isReady() || metadataVersion.isSet()) { return metadataVersion.getFuture(); } else { - if (ver.isError()) + if (ver.isError()) { return ver.getError(); + } if (ver.get() == trState->cx->metadataVersionCache[trState->cx->mvCacheInsertLocation].first) { return trState->cx->metadataVersionCache[trState->cx->mvCacheInsertLocation].second; } @@ -5756,6 +5757,10 @@ void Transaction::resetImpl(bool generateNewSpan) { cancelWatches(); } +TagSet const& Transaction::getTags() const { + return trState->options.tags; +} + void Transaction::reset() { resetImpl(false); } @@ -7060,6 +7065,25 @@ Future DatabaseContext::getClusterProtocol(OptionalTAG_THROTTLE_SMOOTHING_WINDOW; + + if (capacity >= 1) { + return 0.0; + } + + if (tpsRate == 0) { + return std::max(0.0, expiration - now()); + } + + return std::min(expiration - now(), capacity / tpsRate); +} + uint32_t Transaction::getSize() { auto s = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() + tr.transaction.write_conflict_ranges.expectedSize(); diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 39aacdcdb9..cd8b0a97f1 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -662,6 +662,11 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( AUTO_TAG_THROTTLE_UPDATE_FREQUENCY, 10.0 ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLE_UPDATE_FREQUENCY = 0.5; init( TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL, 30.0 ); if(randomize && BUGGIFY) TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL = 1.0; init( AUTO_TAG_THROTTLING_ENABLED, true ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLING_ENABLED = false; + init( SS_THROTTLE_TAGS_TRACKED, 1 ); if(randomize && BUGGIFY) SS_THROTTLE_TAGS_TRACKED = deterministicRandom()->randomInt(1, 10); + init( GLOBAL_TAG_THROTTLING, false ); + init( GLOBAL_TAG_THROTTLING_MIN_RATE, 1.0 ); + init( GLOBAL_TAG_THROTTLING_FOLDING_TIME, 10.0 ); + init( GLOBAL_TAG_THROTTLING_TRACE_INTERVAL, 5.0 ); //Storage Metrics init( STORAGE_METRICS_AVERAGE_INTERVAL, 120.0 ); diff --git a/fdbclient/TagThrottle.actor.cpp b/fdbclient/TagThrottle.actor.cpp index 16c0b0489c..b5205fd153 100644 --- a/fdbclient/TagThrottle.actor.cpp +++ b/fdbclient/TagThrottle.actor.cpp @@ -18,12 +18,16 @@ * limitations under the License. */ -#include "fdbclient/TagThrottle.actor.h" #include "fdbclient/CommitProxyInterface.h" #include "fdbclient/DatabaseContext.h" +#include "fdbclient/SystemData.h" +#include "fdbclient/TagThrottle.actor.h" +#include "fdbclient/Tuple.h" #include "flow/actorcompiler.h" // has to be last include +double const ClientTagThrottleLimits::NO_EXPIRATION = std::numeric_limits::max(); + void TagSet::addTag(TransactionTagRef tag) { ASSERT(CLIENT_KNOBS->MAX_TRANSACTION_TAG_LENGTH < 256); // Tag length is encoded with a single byte ASSERT(CLIENT_KNOBS->MAX_TAGS_PER_TRANSACTION < 256); // Number of tags is encoded with a single byte @@ -124,6 +128,53 @@ TagThrottleValue TagThrottleValue::fromValue(const ValueRef& value) { return throttleValue; } +KeyRangeRef const tagQuotaKeys = KeyRangeRef("\xff/tagQuota/"_sr, "\xff/tagQuota0"_sr); +KeyRef const tagQuotaPrefix = tagQuotaKeys.begin; + +Key ThrottleApi::getTagQuotaKey(TransactionTagRef tag) { + return tag.withPrefix(tagQuotaPrefix); +} + +bool ThrottleApi::TagQuotaValue::isValid() const { + return reservedReadQuota <= totalReadQuota && reservedWriteQuota <= totalWriteQuota && reservedReadQuota >= 0 && + reservedWriteQuota >= 0; +} + +Value ThrottleApi::TagQuotaValue::toValue() const { + Tuple tuple; + tuple.appendDouble(reservedReadQuota); + tuple.appendDouble(totalReadQuota); + tuple.appendDouble(reservedWriteQuota); + tuple.appendDouble(totalWriteQuota); + return tuple.pack(); +} + +ThrottleApi::TagQuotaValue ThrottleApi::TagQuotaValue::fromValue(ValueRef value) { + auto tuple = Tuple::unpack(value); + if (tuple.size() != 4) { + throw invalid_throttle_quota_value(); + } + TagQuotaValue result; + try { + result.reservedReadQuota = tuple.getDouble(0); + result.totalReadQuota = tuple.getDouble(1); + result.reservedWriteQuota = tuple.getDouble(2); + result.totalWriteQuota = tuple.getDouble(3); + } catch (Error& e) { + TraceEvent(SevWarnAlways, "TagQuotaValueFailedToDeserialize").error(e); + throw invalid_throttle_quota_value(); + } + if (!result.isValid()) { + TraceEvent(SevWarnAlways, "TagQuotaValueInvalidQuotas") + .detail("ReservedReadQuota", result.reservedReadQuota) + .detail("TotalReadQuota", result.totalReadQuota) + .detail("ReservedWriteQuota", result.reservedWriteQuota) + .detail("TotalWriteQuota", result.totalWriteQuota); + throw invalid_throttle_quota_value(); + } + return result; +} + FDB_DEFINE_BOOLEAN_PARAM(ContainsRecommended); FDB_DEFINE_BOOLEAN_PARAM(Capitalize); diff --git a/fdbclient/include/fdbclient/DatabaseContext.h b/fdbclient/include/fdbclient/DatabaseContext.h index c2fbb78ed6..ecc0d1436c 100644 --- a/fdbclient/include/fdbclient/DatabaseContext.h +++ b/fdbclient/include/fdbclient/DatabaseContext.h @@ -116,23 +116,7 @@ public: bool canRecheck() const { return lastCheck < now() - CLIENT_KNOBS->TAG_THROTTLE_RECHECK_INTERVAL; } - double throttleDuration() const { - if (expiration <= now()) { - return 0.0; - } - - double capacity = - (smoothRate.smoothTotal() - smoothReleased.smoothRate()) * CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW; - if (capacity >= 1) { - return 0.0; - } - - if (tpsRate == 0) { - return std::max(0.0, expiration - now()); - } - - return std::min(expiration - now(), capacity / tpsRate); - } + double throttleDuration() const; }; struct WatchParameters : public ReferenceCounted { diff --git a/fdbclient/include/fdbclient/NativeAPI.actor.h b/fdbclient/include/fdbclient/NativeAPI.actor.h index ec5ba530a2..009c22d7cd 100644 --- a/fdbclient/include/fdbclient/NativeAPI.actor.h +++ b/fdbclient/include/fdbclient/NativeAPI.actor.h @@ -465,6 +465,7 @@ public: Reference trState; std::vector> watches; + TagSet const& getTags() const; Span span; // used in template functions as returned Future type diff --git a/fdbclient/include/fdbclient/ReadYourWrites.h b/fdbclient/include/fdbclient/ReadYourWrites.h index 6ddf892774..89de979bc1 100644 --- a/fdbclient/include/fdbclient/ReadYourWrites.h +++ b/fdbclient/include/fdbclient/ReadYourWrites.h @@ -196,6 +196,7 @@ public: Transaction& getTransaction() { return tr; } Optional getTenant() { return tr.getTenant(); } + TagSet const& getTags() const { return tr.getTags(); } // used in template functions as returned Future type template diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 4b66bed6cb..c3ba0bf8c6 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -564,6 +564,7 @@ public: int64_t TLOG_RECOVER_MEMORY_LIMIT; double TLOG_IGNORE_POP_AUTO_ENABLE_DELAY; + // Tag throttling int64_t MAX_MANUAL_THROTTLED_TRANSACTION_TAGS; int64_t MAX_AUTO_THROTTLED_TRANSACTION_TAGS; double MIN_TAG_COST; @@ -576,6 +577,17 @@ public: double AUTO_TAG_THROTTLE_UPDATE_FREQUENCY; double TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL; bool AUTO_TAG_THROTTLING_ENABLED; + // Limit to the number of throttling tags each storage server + // will track and send to the ratekeeper + int64_t SS_THROTTLE_TAGS_TRACKED; + // Use global tag throttling strategy. i.e. throttle based on the cluster-wide + // throughput for tags and their associated quotas. + bool GLOBAL_TAG_THROTTLING; + // Minimum number of transactions per second that the global tag throttler must allow for each tag + double GLOBAL_TAG_THROTTLING_MIN_RATE; + // Used by global tag throttling counters + double GLOBAL_TAG_THROTTLING_FOLDING_TIME; + double GLOBAL_TAG_THROTTLING_TRACE_INTERVAL; double MAX_TRANSACTIONS_PER_BYTE; diff --git a/fdbclient/include/fdbclient/SystemData.h b/fdbclient/include/fdbclient/SystemData.h index 33c5ff266d..85516a6bff 100644 --- a/fdbclient/include/fdbclient/SystemData.h +++ b/fdbclient/include/fdbclient/SystemData.h @@ -389,6 +389,8 @@ extern const KeyRef tagThrottleSignalKey; extern const KeyRef tagThrottleAutoEnabledKey; extern const KeyRef tagThrottleLimitKey; extern const KeyRef tagThrottleCountKey; +extern const KeyRangeRef tagQuotaKeys; +extern const KeyRef tagQuotaPrefix; // Log Range constant variables // Used in the backup pipeline to track mutations diff --git a/fdbclient/include/fdbclient/TagThrottle.actor.h b/fdbclient/include/fdbclient/TagThrottle.actor.h index 3330abb4d9..020fcea568 100644 --- a/fdbclient/include/fdbclient/TagThrottle.actor.h +++ b/fdbclient/include/fdbclient/TagThrottle.actor.h @@ -207,6 +207,8 @@ struct ClientTagThrottleLimits { double tpsRate; double expiration; + static double const NO_EXPIRATION; + ClientTagThrottleLimits() : tpsRate(0), expiration(0) {} ClientTagThrottleLimits(double tpsRate, double expiration) : tpsRate(tpsRate), expiration(expiration) {} @@ -595,6 +597,38 @@ Future enableAuto(Reference db, bool enabled) { } } +class TagQuotaValue { +public: + double reservedReadQuota{ 0.0 }; + double totalReadQuota{ 0.0 }; + double reservedWriteQuota{ 0.0 }; + double totalWriteQuota{ 0.0 }; + bool isValid() const; + Value toValue() const; + static TagQuotaValue fromValue(ValueRef); +}; + +Key getTagQuotaKey(TransactionTagRef); + +template +void setTagQuota(Reference tr, + TransactionTagRef tag, + double reservedReadQuota, + double totalReadQuota, + double reservedWriteQuota, + double totalWriteQuota) { + TagQuotaValue tagQuotaValue; + tagQuotaValue.reservedReadQuota = reservedReadQuota; + tagQuotaValue.totalReadQuota = totalReadQuota; + tagQuotaValue.reservedWriteQuota = reservedWriteQuota; + tagQuotaValue.totalWriteQuota = totalWriteQuota; + if (!tagQuotaValue.isValid()) { + throw invalid_throttle_quota_value(); + } + tr->set(getTagQuotaKey(tag), tagQuotaValue.toValue()); + signalThrottleChange(tr); +} + }; // namespace ThrottleApi template diff --git a/fdbserver/GlobalTagThrottler.actor.cpp b/fdbserver/GlobalTagThrottler.actor.cpp new file mode 100644 index 0000000000..2574a96277 --- /dev/null +++ b/fdbserver/GlobalTagThrottler.actor.cpp @@ -0,0 +1,533 @@ +/* + * GlobalTagThrottler.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/FDBTypes.h" +#include "fdbclient/TagThrottle.actor.h" +#include "fdbrpc/Smoother.h" +#include "fdbserver/TagThrottler.h" + +#include + +#include "flow/actorcompiler.h" // must be last include + +class GlobalTagThrottlerImpl { + class QuotaAndCounters { + Optional quota; + std::unordered_map ssToReadCostRate; + std::unordered_map ssToWriteCostRate; + Smoother totalReadCostRate; + Smoother totalWriteCostRate; + Smoother transactionCounter; + Smoother perClientRate; + + Optional getReadTPSLimit() const { + if (totalReadCostRate.smoothTotal() > 0) { + return quota.get().totalReadQuota * transactionCounter.smoothRate() / totalReadCostRate.smoothTotal(); + } else { + return {}; + } + } + + Optional getWriteTPSLimit() const { + if (totalWriteCostRate.smoothTotal() > 0) { + return quota.get().totalWriteQuota * transactionCounter.smoothRate() / totalWriteCostRate.smoothTotal(); + } else { + return {}; + } + } + + public: + QuotaAndCounters() + : totalReadCostRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME), + totalWriteCostRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME), + transactionCounter(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME), + perClientRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME) {} + + void setQuota(ThrottleApi::TagQuotaValue const& quota) { this->quota = quota; } + + void updateReadCostRate(UID ssId, double newReadCostRate) { + auto& currentReadCostRate = ssToReadCostRate[ssId]; + auto diff = newReadCostRate - currentReadCostRate; + currentReadCostRate += diff; + totalReadCostRate.addDelta(diff); + } + + void updateWriteCostRate(UID ssId, double newWriteCostRate) { + auto& currentWriteCostRate = ssToWriteCostRate[ssId]; + auto diff = newWriteCostRate - currentWriteCostRate; + currentWriteCostRate += diff; + totalWriteCostRate.addDelta(diff); + } + + void addTransactions(int count) { transactionCounter.addDelta(count); } + + Optional getTargetTotalTPSLimit() const { + if (!quota.present()) + return {}; + auto readLimit = getReadTPSLimit(); + auto writeLimit = getWriteTPSLimit(); + + // TODO: Implement expiration logic + if (!readLimit.present() && !writeLimit.present()) { + return {}; + } else { + if (!readLimit.present()) { + return writeLimit.get(); + } else if (!writeLimit.present()) { + return readLimit.get(); + } else { + return std::min(readLimit.get(), writeLimit.get()); + } + } + } + + Optional updateAndGetPerClientLimit() { + auto targetRate = getTargetTotalTPSLimit(); + if (targetRate.present() && transactionCounter.smoothRate() > 0) { + auto newPerClientRate = std::max( + SERVER_KNOBS->GLOBAL_TAG_THROTTLING_MIN_RATE, + std::min(targetRate.get(), + (targetRate.get() / transactionCounter.smoothRate()) * perClientRate.smoothTotal())); + perClientRate.setTotal(newPerClientRate); + return ClientTagThrottleLimits(perClientRate.getTotal(), ClientTagThrottleLimits::NO_EXPIRATION); + } else { + return {}; + } + } + + void processTraceEvent(TraceEvent& te) const { + if (quota.present()) { + te.detail("ProvidedReadTPSLimit", getReadTPSLimit()) + .detail("ProvidedWriteTPSLimit", getWriteTPSLimit()) + .detail("ReadCostRate", totalReadCostRate.smoothTotal()) + .detail("WriteCostRate", totalWriteCostRate.smoothTotal()) + .detail("TotalReadQuota", quota.get().totalReadQuota) + .detail("ReservedReadQuota", quota.get().reservedReadQuota) + .detail("TotalWriteQuota", quota.get().totalWriteQuota) + .detail("ReservedWriteQuota", quota.get().reservedWriteQuota); + } + } + }; + + Database db; + UID id; + std::map trackedTags; + uint64_t throttledTagChangeId{ 0 }; + Future traceActor; + + ACTOR static Future tracer(GlobalTagThrottlerImpl const* self) { + loop { + for (const auto& [tag, quotaAndCounters] : self->trackedTags) { + TraceEvent te("GlobalTagThrottling"); + te.detail("Tag", tag); + quotaAndCounters.processTraceEvent(te); + } + wait(delay(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_TRACE_INTERVAL)); + } + } + + ACTOR static Future monitorThrottlingChanges(GlobalTagThrottlerImpl* self) { + loop { + state ReadYourWritesTransaction tr(self->db); + + loop { + // TODO: Clean up quotas that have been removed + try { + tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + + state RangeResult currentQuotas = wait(tr.getRange(tagQuotaKeys, CLIENT_KNOBS->TOO_MANY)); + TraceEvent("GlobalTagThrottler_ReadCurrentQuotas").detail("Size", currentQuotas.size()); + for (auto const kv : currentQuotas) { + auto const tag = kv.key.removePrefix(tagQuotaPrefix); + auto const quota = ThrottleApi::TagQuotaValue::fromValue(kv.value); + self->trackedTags[tag].setQuota(quota); + } + + ++self->throttledTagChangeId; + // FIXME: Should wait on watch instead + // wait(tr.watch(tagThrottleSignalKey)); + wait(delay(5.0)); + TraceEvent("GlobalTagThrottler_ChangeSignaled"); + TEST(true); // Global tag throttler detected quota changes + break; + } catch (Error& e) { + TraceEvent("GlobalTagThrottlerMonitoringChangesError", self->id).error(e); + wait(tr.onError(e)); + } + } + } + } + +public: + GlobalTagThrottlerImpl(Database db, UID id) : db(db), id(id) { traceActor = tracer(this); } + Future monitorThrottlingChanges() { return monitorThrottlingChanges(this); } + void addRequests(TransactionTag tag, int count) { trackedTags[tag].addTransactions(count); } + uint64_t getThrottledTagChangeId() const { return throttledTagChangeId; } + PrioritizedTransactionTagMap getClientRates() { + // TODO: For now, only enforce total throttling rates. + // We should use reserved quotas as well. + PrioritizedTransactionTagMap result; + for (auto& [tag, quotaAndCounters] : trackedTags) { + // Currently there is no differentiation between batch priority and default priority transactions + auto const limit = quotaAndCounters.updateAndGetPerClientLimit(); + if (limit.present()) { + result[TransactionPriority::BATCH][tag] = result[TransactionPriority::DEFAULT][tag] = limit.get(); + } + } + return result; + } + int64_t autoThrottleCount() const { return trackedTags.size(); } + uint32_t busyReadTagCount() const { + // TODO: Implement + return 0; + } + uint32_t busyWriteTagCount() const { + // TODO: Implement + return 0; + } + int64_t manualThrottleCount() const { return trackedTags.size(); } + Future tryUpdateAutoThrottling(StorageQueueInfo const& ss) { + for (const auto& busyReadTag : ss.busiestReadTags) { + trackedTags[busyReadTag.tag].updateReadCostRate(ss.id, busyReadTag.rate); + } + for (const auto& busyWriteTag : ss.busiestWriteTags) { + trackedTags[busyWriteTag.tag].updateWriteCostRate(ss.id, busyWriteTag.rate); + } + // TODO: Call ThrottleApi::throttleTags + return Void(); + } + + void setQuota(TransactionTagRef tag, ThrottleApi::TagQuotaValue const& tagQuotaValue) { + trackedTags[tag].setQuota(tagQuotaValue); + } +}; + +GlobalTagThrottler::GlobalTagThrottler(Database db, UID id) : impl(PImpl::create(db, id)) {} + +GlobalTagThrottler::~GlobalTagThrottler() = default; + +Future GlobalTagThrottler::monitorThrottlingChanges() { + return impl->monitorThrottlingChanges(); +} +void GlobalTagThrottler::addRequests(TransactionTag tag, int count) { + return impl->addRequests(tag, count); +} +uint64_t GlobalTagThrottler::getThrottledTagChangeId() const { + return impl->getThrottledTagChangeId(); +} +PrioritizedTransactionTagMap GlobalTagThrottler::getClientRates() { + return impl->getClientRates(); +} +int64_t GlobalTagThrottler::autoThrottleCount() const { + return impl->autoThrottleCount(); +} +uint32_t GlobalTagThrottler::busyReadTagCount() const { + return impl->busyReadTagCount(); +} +uint32_t GlobalTagThrottler::busyWriteTagCount() const { + return impl->busyWriteTagCount(); +} +int64_t GlobalTagThrottler::manualThrottleCount() const { + return impl->manualThrottleCount(); +} +bool GlobalTagThrottler::isAutoThrottlingEnabled() const { + return true; +} +Future GlobalTagThrottler::tryUpdateAutoThrottling(StorageQueueInfo const& ss) { + return impl->tryUpdateAutoThrottling(ss); +} + +void GlobalTagThrottler::setQuota(TransactionTagRef tag, ThrottleApi::TagQuotaValue const& tagQuotaValue) { + return impl->setQuota(tag, tagQuotaValue); +} + +namespace GlobalTagThrottlerTesting { + +Optional getTPSLimit(GlobalTagThrottler& globalTagThrottler, TransactionTag tag) { + auto clientRates = globalTagThrottler.getClientRates(); + auto it1 = clientRates.find(TransactionPriority::DEFAULT); + if (it1 != clientRates.end()) { + auto it2 = it1->second.find(tag); + if (it2 != it1->second.end()) { + return it2->second.tpsRate; + } + } + return {}; +} + +class StorageServerCollection { + class Cost { + Smoother smoother; + + public: + Cost() : smoother(5.0) {} + Cost& operator+=(double delta) { + smoother.addDelta(delta); + return *this; + } + double smoothRate() const { return smoother.smoothRate(); } + }; + + std::vector> readCosts; + std::vector> writeCosts; + +public: + StorageServerCollection(size_t size) : readCosts(size), writeCosts(size) { ASSERT_GT(size, 0); } + + void addReadCost(TransactionTag tag, double cost) { + auto const costPerSS = cost / readCosts.size(); + for (auto& readCost : readCosts) { + readCost[tag] += costPerSS; + } + } + + void addWriteCost(TransactionTag tag, double cost) { + auto const costPerSS = cost / writeCosts.size(); + for (auto& writeCost : writeCosts) { + writeCost[tag] += costPerSS; + } + } + + std::vector getStorageQueueInfos() const { + std::vector result; + result.reserve(readCosts.size()); + for (int i = 0; i < readCosts.size(); ++i) { + StorageQueueInfo sqInfo(UID(i, i), LocalityData{}); + for (const auto& [tag, readCost] : readCosts[i]) { + double fractionalBusyness{ 0.0 }; // unused for global tag throttling + sqInfo.busiestReadTags.emplace_back(tag, readCost.smoothRate(), fractionalBusyness); + } + for (const auto& [tag, writeCost] : writeCosts[i]) { + double fractionalBusyness{ 0.0 }; // unused for global tag throttling + sqInfo.busiestWriteTags.emplace_back(tag, writeCost.smoothRate(), fractionalBusyness); + } + result.push_back(sqInfo); + } + return result; + } +}; + +ACTOR static Future runClient(GlobalTagThrottler* globalTagThrottler, + StorageServerCollection* storageServers, + TransactionTag tag, + double desiredTpsRate, + double costPerTransaction, + bool write) { + loop { + auto tpsLimit = getTPSLimit(*globalTagThrottler, tag); + state double tpsRate = tpsLimit.present() ? std::min(desiredTpsRate, tpsLimit.get()) : desiredTpsRate; + wait(delay(1 / tpsRate)); + if (write) { + storageServers->addWriteCost(tag, costPerTransaction); + } else { + storageServers->addReadCost(tag, costPerTransaction); + } + globalTagThrottler->addRequests(tag, 1); + } +} + +ACTOR static Future monitorClientRates(GlobalTagThrottler* globalTagThrottler, + TransactionTag tag, + double desiredTPSLimit) { + state int successes = 0; + loop { + wait(delay(1.0)); + auto currentTPSLimit = getTPSLimit(*globalTagThrottler, tag); + if (currentTPSLimit.present()) { + TraceEvent("GlobalTagThrottling_RateMonitor") + .detail("Tag", tag) + .detail("CurrentTPSRate", currentTPSLimit.get()) + .detail("DesiredTPSRate", desiredTPSLimit); + if (abs(currentTPSLimit.get() - desiredTPSLimit) < 0.1) { + if (++successes == 3) { + return Void(); + } + } else { + successes = 0; + } + } else { + successes = 0; + } + } +} + +ACTOR static Future updateGlobalTagThrottler(GlobalTagThrottler* globalTagThrottler, + StorageServerCollection const* storageServers) { + loop { + wait(delay(1.0)); + auto const storageQueueInfos = storageServers->getStorageQueueInfos(); + for (const auto& sq : storageQueueInfos) { + globalTagThrottler->tryUpdateAutoThrottling(sq); + } + } +} + +} // namespace GlobalTagThrottlerTesting + +TEST_CASE("/GlobalTagThrottler/Simple") { + state GlobalTagThrottler globalTagThrottler(Database{}, UID{}); + state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10); + ThrottleApi::TagQuotaValue tagQuotaValue; + TransactionTag testTag = "sampleTag1"_sr; + tagQuotaValue.totalReadQuota = 100.0; + globalTagThrottler.setQuota(testTag, tagQuotaValue); + state Future client = + GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, false); + state Future monitor = + GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 100.0 / 6.0); + state Future updater = + GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers); + wait(timeoutError(monitor || client || updater, 300.0)); + return Void(); +} + +TEST_CASE("/GlobalTagThrottler/WriteThrottling") { + state GlobalTagThrottler globalTagThrottler(Database{}, UID{}); + state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10); + ThrottleApi::TagQuotaValue tagQuotaValue; + TransactionTag testTag = "sampleTag1"_sr; + tagQuotaValue.totalWriteQuota = 100.0; + globalTagThrottler.setQuota(testTag, tagQuotaValue); + state Future client = + GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, true); + state Future monitor = + GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 100.0 / 6.0); + state Future updater = + GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers); + wait(timeoutError(monitor || client || updater, 300.0)); + return Void(); +} + +TEST_CASE("/GlobalTagThrottler/MultiTagThrottling") { + state GlobalTagThrottler globalTagThrottler(Database{}, UID{}); + state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10); + ThrottleApi::TagQuotaValue tagQuotaValue; + TransactionTag testTag1 = "sampleTag1"_sr; + TransactionTag testTag2 = "sampleTag2"_sr; + tagQuotaValue.totalReadQuota = 100.0; + globalTagThrottler.setQuota(testTag1, tagQuotaValue); + globalTagThrottler.setQuota(testTag2, tagQuotaValue); + state std::vector> futures; + state std::vector> monitorFutures; + futures.push_back( + GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag1, 5.0, 6.0, false)); + futures.push_back( + GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag2, 5.0, 6.0, false)); + futures.push_back(GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers)); + monitorFutures.push_back(GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag1, 100.0 / 6.0)); + monitorFutures.push_back(GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag2, 100.0 / 6.0)); + wait(timeoutError(waitForAny(futures) || waitForAll(monitorFutures), 300.0)); + return Void(); +} + +TEST_CASE("/GlobalTagThrottler/ActiveThrottling") { + state GlobalTagThrottler globalTagThrottler(Database{}, UID{}); + state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10); + ThrottleApi::TagQuotaValue tagQuotaValue; + TransactionTag testTag = "sampleTag1"_sr; + tagQuotaValue.totalReadQuota = 100.0; + globalTagThrottler.setQuota(testTag, tagQuotaValue); + state Future client = + GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 20.0, 10.0, false); + state Future monitor = GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 10.0); + state Future updater = + GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers); + wait(timeoutError(monitor || client || updater, 300.0)); + return Void(); +} + +TEST_CASE("/GlobalTagThrottler/MultiClientThrottling") { + state GlobalTagThrottler globalTagThrottler(Database{}, UID{}); + state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10); + ThrottleApi::TagQuotaValue tagQuotaValue; + TransactionTag testTag = "sampleTag1"_sr; + tagQuotaValue.totalReadQuota = 100.0; + globalTagThrottler.setQuota(testTag, tagQuotaValue); + state Future client = + GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, false); + state Future client2 = + GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, false); + state Future monitor = + GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 100.0 / 6.0); + state Future updater = + GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers); + wait(timeoutError(monitor || client || updater, 300.0)); + return Void(); +} + +TEST_CASE("/GlobalTagThrottler/MultiClientActiveThrottling") { + state GlobalTagThrottler globalTagThrottler(Database{}, UID{}); + state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10); + ThrottleApi::TagQuotaValue tagQuotaValue; + TransactionTag testTag = "sampleTag1"_sr; + tagQuotaValue.totalReadQuota = 100.0; + globalTagThrottler.setQuota(testTag, tagQuotaValue); + state Future client = + GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 20.0, 10.0, false); + state Future client2 = + GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 20.0, 10.0, false); + state Future monitor = GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 5.0); + state Future updater = + GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers); + wait(timeoutError(monitor || client || updater, 300.0)); + return Void(); +} + +// Global transaction rate should be 20.0, with a distribution of (5, 15) between the 2 clients +TEST_CASE("/GlobalTagThrottler/SkewedMultiClientActiveThrottling") { + state GlobalTagThrottler globalTagThrottler(Database{}, UID{}); + state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10); + ThrottleApi::TagQuotaValue tagQuotaValue; + TransactionTag testTag = "sampleTag1"_sr; + tagQuotaValue.totalReadQuota = 100.0; + globalTagThrottler.setQuota(testTag, tagQuotaValue); + state Future client = + GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 5.0, false); + state Future client2 = + GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 25.0, 5.0, false); + state Future monitor = GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 15.0); + state Future updater = + GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers); + wait(timeoutError(monitor || client || updater, 300.0)); + return Void(); +} + +// Test that the tag throttler can reach equilibrium, then adjust to a new equilibrium once the quota is changed +TEST_CASE("/GlobalTagThrottler/UpdateQuota") { + state GlobalTagThrottler globalTagThrottler(Database{}, UID{}); + state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10); + state ThrottleApi::TagQuotaValue tagQuotaValue; + state TransactionTag testTag = "sampleTag1"_sr; + tagQuotaValue.totalReadQuota = 100.0; + globalTagThrottler.setQuota(testTag, tagQuotaValue); + state Future client = + GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, false); + state Future monitor = + GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 100.0 / 6.0); + state Future updater = + GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers); + wait(timeoutError(monitor || client || updater, 300.0)); + tagQuotaValue.totalReadQuota = 50.0; + globalTagThrottler.setQuota(testTag, tagQuotaValue); + monitor = GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 50.0 / 6.0); + wait(timeoutError(monitor || client || updater, 300.0)); + return Void(); +} diff --git a/fdbserver/Ratekeeper.actor.cpp b/fdbserver/Ratekeeper.actor.cpp index e1f1d0a163..73345b2e75 100644 --- a/fdbserver/Ratekeeper.actor.cpp +++ b/fdbserver/Ratekeeper.actor.cpp @@ -227,11 +227,6 @@ public: } } - ACTOR static Future monitorThrottlingChanges(Ratekeeper* self) { - wait(self->tagThrottler->monitorThrottlingChanges()); - return Void(); - } - ACTOR static Future run(RatekeeperInterface rkInterf, Reference const> dbInfo) { state Ratekeeper self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True)); state Future timeout = Void(); @@ -408,7 +403,7 @@ Future Ratekeeper::trackTLogQueueInfo(TLogInterface tli) { } Future Ratekeeper::monitorThrottlingChanges() { - return RatekeeperImpl::monitorThrottlingChanges(this); + return tagThrottler->monitorThrottlingChanges(); } Future Ratekeeper::run(RatekeeperInterface rkInterf, Reference const> dbInfo) { @@ -436,7 +431,11 @@ Ratekeeper::Ratekeeper(UID id, Database db) SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH, SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH, SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH) { - tagThrottler = std::make_unique(db, id); + if (SERVER_KNOBS->GLOBAL_TAG_THROTTLING) { + tagThrottler = std::make_unique(db, id); + } else { + tagThrottler = std::make_unique(db, id); + } } void Ratekeeper::updateCommitCostEstimation( diff --git a/fdbserver/RkTagThrottleCollection.cpp b/fdbserver/RkTagThrottleCollection.cpp index 11c376f57d..d0e8cb9892 100644 --- a/fdbserver/RkTagThrottleCollection.cpp +++ b/fdbserver/RkTagThrottleCollection.cpp @@ -22,7 +22,7 @@ #include "fdbserver/Knobs.h" #include "fdbserver/RkTagThrottleCollection.h" -double RkTagThrottleCollection::RkTagThrottleData::getTargetRate(Optional requestRate) { +double RkTagThrottleCollection::RkTagThrottleData::getTargetRate(Optional requestRate) const { if (limits.tpsRate == 0.0 || !requestRate.present() || requestRate.get() == 0.0 || !rateSet) { return limits.tpsRate; } else { @@ -347,10 +347,12 @@ int64_t RkTagThrottleCollection::manualThrottleCount() const { return count; } -void RkTagThrottleCollection::updateBusyTagCount(TagThrottledReason reason) { +void RkTagThrottleCollection::incrementBusyTagCount(TagThrottledReason reason) { if (reason == TagThrottledReason::BUSY_READ) { ++busyReadTagCount; } else if (reason == TagThrottledReason::BUSY_WRITE) { ++busyWriteTagCount; + } else { + ASSERT(false); } } diff --git a/fdbserver/TagThrottler.actor.cpp b/fdbserver/TagThrottler.actor.cpp index d85e021544..432aa459b1 100644 --- a/fdbserver/TagThrottler.actor.cpp +++ b/fdbserver/TagThrottler.actor.cpp @@ -21,6 +21,7 @@ #include "fdbserver/TagThrottler.h" #include "fdbserver/RkTagThrottleCollection.h" +#include "flow/actorcompiler.h" // must be last include class TagThrottlerImpl { Database db; @@ -106,7 +107,7 @@ class TagThrottlerImpl { if (tagKey.throttleType == TagThrottleType::AUTO) { updatedTagThrottles.autoThrottleTag( self->id, tag, 0, tagValue.tpsRate, tagValue.expirationTime); - updatedTagThrottles.updateBusyTagCount(tagValue.reason); + updatedTagThrottles.incrementBusyTagCount(tagValue.reason); } else { updatedTagThrottles.manualThrottleTag(self->id, tag, @@ -143,6 +144,7 @@ class TagThrottlerImpl { if (busyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && rate > SERVER_KNOBS->MIN_TAG_COST) { TEST(true); // Transaction tag auto-throttled Optional clientRate = throttledTags.autoThrottleTag(id, tag, busyness); + // TODO: Increment tag throttle counts here? if (clientRate.present()) { TagSet tags; tags.addTag(tag); @@ -185,23 +187,21 @@ public: // the future auto storageQueue = ss.getStorageQueueBytes(); auto storageDurabilityLag = ss.getDurabilityLag(); + std::vector> futures; if (storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES || storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS) { - // TODO: Update once size is potentially > 1 - ASSERT_WE_THINK(ss.busiestWriteTags.size() <= 1); - ASSERT_WE_THINK(ss.busiestReadTags.size() <= 1); for (const auto& busyWriteTag : ss.busiestWriteTags) { - return tryUpdateAutoThrottling(busyWriteTag.tag, - busyWriteTag.rate, - busyWriteTag.fractionalBusyness, - TagThrottledReason::BUSY_WRITE); + futures.push_back(tryUpdateAutoThrottling(busyWriteTag.tag, + busyWriteTag.rate, + busyWriteTag.fractionalBusyness, + TagThrottledReason::BUSY_WRITE)); } for (const auto& busyReadTag : ss.busiestReadTags) { - return tryUpdateAutoThrottling( - busyReadTag.tag, busyReadTag.rate, busyReadTag.fractionalBusyness, TagThrottledReason::BUSY_READ); + futures.push_back(tryUpdateAutoThrottling( + busyReadTag.tag, busyReadTag.rate, busyReadTag.fractionalBusyness, TagThrottledReason::BUSY_READ)); } } - return Void(); + return waitForAll(futures); } }; // class TagThrottlerImpl diff --git a/fdbserver/TransactionTagCounter.cpp b/fdbserver/TransactionTagCounter.cpp index 1f0a25c2cc..7b7829f676 100644 --- a/fdbserver/TransactionTagCounter.cpp +++ b/fdbserver/TransactionTagCounter.cpp @@ -18,50 +18,193 @@ * limitations under the License. */ +#include "fdbserver/Knobs.h" #include "fdbserver/TransactionTagCounter.h" #include "flow/Trace.h" -TransactionTagCounter::TransactionTagCounter(UID thisServerID) - : thisServerID(thisServerID), - busiestReadTagEventHolder(makeReference(thisServerID.toString() + "/BusiestReadTag")) {} +namespace { -void TransactionTagCounter::addRequest(Optional const& tags, int64_t bytes) { - if (tags.present()) { - TEST(true); // Tracking transaction tag in counter - double cost = costFunction(bytes); - for (auto& tag : tags.get()) { - int64_t& count = intervalCounts[TransactionTag(tag, tags.get().getArena())]; - count += cost; - if (count > busiestTagCount) { - busiestTagCount = count; - busiestTag = tag; +class TopKTags { +public: + struct TagAndCount { + TransactionTag tag; + int64_t count; + bool operator<(TagAndCount const& other) const { return count < other.count; } + explicit TagAndCount(TransactionTag tag, int64_t count) : tag(tag), count(count) {} + }; + +private: + // Because the number of tracked is expected to be small, they can be tracked + // in a simple vector. If the number of tracked tags increases, a more sophisticated + // data structure will be required. + std::vector topTags; + int limit; + +public: + explicit TopKTags(int limit) : limit(limit) { + ASSERT_GT(limit, 0); + topTags.reserve(limit); + } + + void incrementCount(TransactionTag tag, int previousCount, int increase) { + auto iter = std::find_if(topTags.begin(), topTags.end(), [tag](const auto& tc) { return tc.tag == tag; }); + if (iter != topTags.end()) { + ASSERT_EQ(previousCount, iter->count); + iter->count += increase; + } else if (topTags.size() < limit) { + ASSERT_EQ(previousCount, 0); + topTags.emplace_back(tag, increase); + } else { + auto toReplace = std::min_element(topTags.begin(), topTags.end()); + ASSERT_GE(toReplace->count, previousCount); + if (toReplace->count < previousCount + increase) { + toReplace->tag = tag; + toReplace->count = previousCount + increase; } } - - intervalTotalSampledCount += cost; } + + std::vector getBusiestTags(double elapsed, double totalSampleCount) const { + std::vector result; + for (auto const& tagAndCounter : topTags) { + auto rate = (tagAndCounter.count / CLIENT_KNOBS->READ_TAG_SAMPLE_RATE) / elapsed; + if (rate > SERVER_KNOBS->MIN_TAG_READ_PAGES_RATE) { + result.emplace_back(tagAndCounter.tag, rate, tagAndCounter.count / totalSampleCount); + } + } + return result; + } + + void clear() { topTags.clear(); } +}; + +} // namespace + +class TransactionTagCounterImpl { + UID thisServerID; + TransactionTagMap intervalCounts; + int64_t intervalTotalSampledCount = 0; + TopKTags topTags; + double intervalStart = 0; + + std::vector previousBusiestTags; + Reference busiestReadTagEventHolder; + + static int64_t costFunction(int64_t bytes) { return bytes / SERVER_KNOBS->READ_COST_BYTE_FACTOR + 1; } + +public: + TransactionTagCounterImpl(UID thisServerID) + : thisServerID(thisServerID), topTags(SERVER_KNOBS->SS_THROTTLE_TAGS_TRACKED), + busiestReadTagEventHolder(makeReference(thisServerID.toString() + "/BusiestReadTag")) {} + + void addRequest(Optional const& tags, int64_t bytes) { + if (tags.present()) { + TEST(true); // Tracking transaction tag in counter + double cost = costFunction(bytes); + for (auto& tag : tags.get()) { + int64_t& count = intervalCounts[TransactionTag(tag, tags.get().getArena())]; + topTags.incrementCount(tag, count, cost); + count += cost; + } + + intervalTotalSampledCount += cost; + } + } + + void startNewInterval() { + double elapsed = now() - intervalStart; + previousBusiestTags.clear(); + if (intervalStart > 0 && CLIENT_KNOBS->READ_TAG_SAMPLE_RATE > 0 && elapsed > 0) { + previousBusiestTags = topTags.getBusiestTags(elapsed, intervalTotalSampledCount); + + TraceEvent("BusiestReadTag", thisServerID) + .detail("Elapsed", elapsed) + //.detail("Tag", printable(busiestTag)) + //.detail("TagCost", busiestTagCount) + .detail("TotalSampledCost", intervalTotalSampledCount) + .detail("Reported", previousBusiestTags.size()) + .trackLatest(busiestReadTagEventHolder->trackingKey); + } + + intervalCounts.clear(); + intervalTotalSampledCount = 0; + topTags.clear(); + intervalStart = now(); + } + + std::vector const& getBusiestTags() const { return previousBusiestTags; } +}; + +TransactionTagCounter::TransactionTagCounter(UID thisServerID) + : impl(PImpl::create(thisServerID)) {} + +TransactionTagCounter::~TransactionTagCounter() = default; + +void TransactionTagCounter::addRequest(Optional const& tags, int64_t bytes) { + return impl->addRequest(tags, bytes); } void TransactionTagCounter::startNewInterval() { - double elapsed = now() - intervalStart; - previousBusiestTags.clear(); - if (intervalStart > 0 && CLIENT_KNOBS->READ_TAG_SAMPLE_RATE > 0 && elapsed > 0) { - double rate = busiestTagCount / CLIENT_KNOBS->READ_TAG_SAMPLE_RATE / elapsed; - if (rate > SERVER_KNOBS->MIN_TAG_READ_PAGES_RATE) { - previousBusiestTags.emplace_back(busiestTag, rate, (double)busiestTagCount / intervalTotalSampledCount); - } - - TraceEvent("BusiestReadTag", thisServerID) - .detail("Elapsed", elapsed) - .detail("Tag", printable(busiestTag)) - .detail("TagCost", busiestTagCount) - .detail("TotalSampledCost", intervalTotalSampledCount) - .detail("Reported", !previousBusiestTags.empty()) - .trackLatest(busiestReadTagEventHolder->trackingKey); - } - - intervalCounts.clear(); - intervalTotalSampledCount = 0; - busiestTagCount = 0; - intervalStart = now(); + return impl->startNewInterval(); +} + +std::vector const& TransactionTagCounter::getBusiestTags() const { + return impl->getBusiestTags(); +} + +TEST_CASE("/TransactionTagCounter/TopKTags") { + TopKTags topTags(2); + + // Ensure that costs are larger enough to show up + auto const costMultiplier = + std::max(1.0, 2 * SERVER_KNOBS->MIN_TAG_READ_PAGES_RATE * CLIENT_KNOBS->READ_TAG_SAMPLE_RATE); + + ASSERT_EQ(topTags.getBusiestTags(1.0, 0).size(), 0); + topTags.incrementCount("a"_sr, 0, 1 * costMultiplier); + { + auto const busiestTags = topTags.getBusiestTags(1.0, 1 * costMultiplier); + ASSERT_EQ(busiestTags.size(), 1); + ASSERT_EQ(std::count_if(busiestTags.begin(), + busiestTags.end(), + [](auto const& tagInfo) { return tagInfo.tag == "a"_sr; }), + 1); + } + topTags.incrementCount("b"_sr, 0, 2 * costMultiplier); + topTags.incrementCount("c"_sr, 0, 3 * costMultiplier); + { + auto busiestTags = topTags.getBusiestTags(1.0, 6 * costMultiplier); + ASSERT_EQ(busiestTags.size(), 2); + ASSERT_EQ(std::count_if(busiestTags.begin(), + busiestTags.end(), + [](auto const& tagInfo) { return tagInfo.tag == "a"_sr; }), + 0); + ASSERT_EQ(std::count_if(busiestTags.begin(), + busiestTags.end(), + [](auto const& tagInfo) { return tagInfo.tag == "b"_sr; }), + 1); + ASSERT_EQ(std::count_if(busiestTags.begin(), + busiestTags.end(), + [](auto const& tagInfo) { return tagInfo.tag == "c"_sr; }), + 1); + } + topTags.incrementCount("a"_sr, 1 * costMultiplier, 3 * costMultiplier); + { + auto busiestTags = topTags.getBusiestTags(1.0, 9 * costMultiplier); + ASSERT_EQ(busiestTags.size(), 2); + ASSERT_EQ(std::count_if(busiestTags.begin(), + busiestTags.end(), + [](auto const& tagInfo) { return tagInfo.tag == "a"_sr; }), + 1); + ASSERT_EQ(std::count_if(busiestTags.begin(), + busiestTags.end(), + [](auto const& tagInfo) { return tagInfo.tag == "b"_sr; }), + 0); + ASSERT_EQ(std::count_if(busiestTags.begin(), + busiestTags.end(), + [](auto const& tagInfo) { return tagInfo.tag == "c"_sr; }), + 1); + } + topTags.clear(); + ASSERT_EQ(topTags.getBusiestTags(1.0, 0).size(), 0); + return Void(); } diff --git a/fdbserver/include/fdbserver/Ratekeeper.h b/fdbserver/include/fdbserver/Ratekeeper.h index c0b1769c90..948cca851b 100644 --- a/fdbserver/include/fdbserver/Ratekeeper.h +++ b/fdbserver/include/fdbserver/Ratekeeper.h @@ -148,7 +148,7 @@ class Ratekeeper { double lastWarning; double lastSSListFetchedTimestamp; - std::unique_ptr tagThrottler; + std::unique_ptr tagThrottler; RatekeeperLimits normalLimits; RatekeeperLimits batchLimits; diff --git a/fdbserver/include/fdbserver/RkTagThrottleCollection.h b/fdbserver/include/fdbserver/RkTagThrottleCollection.h index 35062cdb7c..ee064685fa 100644 --- a/fdbserver/include/fdbserver/RkTagThrottleCollection.h +++ b/fdbserver/include/fdbserver/RkTagThrottleCollection.h @@ -42,7 +42,7 @@ class RkTagThrottleCollection : NonCopyable { bool rateSet = false; RkTagThrottleData() : clientRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {} - double getTargetRate(Optional requestRate); + double getTargetRate(Optional requestRate) const; Optional updateAndGetClientRate(Optional requestRate); }; @@ -83,7 +83,7 @@ public: void addRequests(TransactionTag const& tag, int requests); int64_t autoThrottleCount() const { return autoThrottledTags.size(); } int64_t manualThrottleCount() const; - void updateBusyTagCount(TagThrottledReason); + void incrementBusyTagCount(TagThrottledReason); auto getBusyReadTagCount() const { return busyReadTagCount; } auto getBusyWriteTagCount() const { return busyWriteTagCount; } }; diff --git a/fdbserver/include/fdbserver/TagThrottler.h b/fdbserver/include/fdbserver/TagThrottler.h index 69e3909c7d..830c2fea06 100644 --- a/fdbserver/include/fdbserver/TagThrottler.h +++ b/fdbserver/include/fdbserver/TagThrottler.h @@ -23,32 +23,72 @@ #include "fdbclient/PImpl.h" #include "fdbserver/Ratekeeper.h" -class TagThrottler { +class ITagThrottler { +public: + virtual ~ITagThrottler() = default; + + // Poll the system keyspace looking for updates made through the tag throttling API + virtual Future monitorThrottlingChanges() = 0; + + // Increment the number of known requests associated with the specified tag + virtual void addRequests(TransactionTag tag, int count) = 0; + + // This throttled tag change ID is used to coordinate updates with the GRV proxies + virtual uint64_t getThrottledTagChangeId() const = 0; + + // For each tag and priority combination, return the throughput limit and expiration time + // Also, erase expired tags + virtual PrioritizedTransactionTagMap getClientRates() = 0; + + virtual int64_t autoThrottleCount() const = 0; + virtual uint32_t busyReadTagCount() const = 0; + virtual uint32_t busyWriteTagCount() const = 0; + virtual int64_t manualThrottleCount() const = 0; + virtual bool isAutoThrottlingEnabled() const = 0; + + // Based on the busiest read and write tags in the provided storage queue info, update + // tag throttling limits. + virtual Future tryUpdateAutoThrottling(StorageQueueInfo const&) = 0; +}; + +class TagThrottler : public ITagThrottler { PImpl impl; public: TagThrottler(Database db, UID id); ~TagThrottler(); - // Poll the system keyspace looking for updates made through the tag throttling API - Future monitorThrottlingChanges(); - - // Increment the number of known requests associated with the specified tag - void addRequests(TransactionTag tag, int count); - - // This throttled tag change ID is used to coordinate updates with the GRV proxies - uint64_t getThrottledTagChangeId() const; - - // For each tag and priority combination, return the throughput limit and expiration time - PrioritizedTransactionTagMap getClientRates(); - - int64_t autoThrottleCount() const; - uint32_t busyReadTagCount() const; - uint32_t busyWriteTagCount() const; - int64_t manualThrottleCount() const; - bool isAutoThrottlingEnabled() const; - - // Based on the busiest read and write tags in the provided storage queue info, update - // tag throttling limits. - Future tryUpdateAutoThrottling(StorageQueueInfo const&); + Future monitorThrottlingChanges() override; + void addRequests(TransactionTag tag, int count) override; + uint64_t getThrottledTagChangeId() const override; + PrioritizedTransactionTagMap getClientRates() override; + int64_t autoThrottleCount() const override; + uint32_t busyReadTagCount() const override; + uint32_t busyWriteTagCount() const override; + int64_t manualThrottleCount() const override; + bool isAutoThrottlingEnabled() const override; + Future tryUpdateAutoThrottling(StorageQueueInfo const&) override; +}; + +class GlobalTagThrottler : public ITagThrottler { + PImpl impl; + +public: + GlobalTagThrottler(Database db, UID id); + ~GlobalTagThrottler(); + + Future monitorThrottlingChanges() override; + void addRequests(TransactionTag tag, int count) override; + uint64_t getThrottledTagChangeId() const override; + PrioritizedTransactionTagMap getClientRates() override; + int64_t autoThrottleCount() const override; + uint32_t busyReadTagCount() const override; + uint32_t busyWriteTagCount() const override; + int64_t manualThrottleCount() const override; + bool isAutoThrottlingEnabled() const override; + Future tryUpdateAutoThrottling(StorageQueueInfo const&) override; + + // testing only +public: + void setQuota(TransactionTagRef, ThrottleApi::TagQuotaValue const&); }; diff --git a/fdbserver/include/fdbserver/TransactionTagCounter.h b/fdbserver/include/fdbserver/TransactionTagCounter.h index d520259c5c..6e2b424e6f 100644 --- a/fdbserver/include/fdbserver/TransactionTagCounter.h +++ b/fdbserver/include/fdbserver/TransactionTagCounter.h @@ -20,25 +20,23 @@ #pragma once +#include "fdbclient/PImpl.h" #include "fdbclient/StorageServerInterface.h" #include "fdbclient/TagThrottle.actor.h" -#include "fdbserver/Knobs.h" class TransactionTagCounter { - TransactionTagMap intervalCounts; - int64_t intervalTotalSampledCount = 0; - TransactionTag busiestTag; - int64_t busiestTagCount = 0; - double intervalStart = 0; - - std::vector previousBusiestTags; - UID thisServerID; - Reference busiestReadTagEventHolder; + PImpl impl; public: TransactionTagCounter(UID thisServerID); - static int64_t costFunction(int64_t bytes) { return bytes / SERVER_KNOBS->READ_COST_BYTE_FACTOR + 1; } + ~TransactionTagCounter(); + + // Update counters tracking the busyness of each tag in the current interval void addRequest(Optional const& tags, int64_t bytes); + + // Save current set of busy tags and reset counters for next interval void startNewInterval(); - std::vector const& getBusiestTags() const { return previousBusiestTags; } + + // Returns the set of busiest tags as of the end of the last interval + std::vector const& getBusiestTags() const; }; diff --git a/fdbserver/workloads/GlobalTagThrottling.actor.cpp b/fdbserver/workloads/GlobalTagThrottling.actor.cpp new file mode 100644 index 0000000000..41ea8a630e --- /dev/null +++ b/fdbserver/workloads/GlobalTagThrottling.actor.cpp @@ -0,0 +1,74 @@ +/* + * GlobalTagThrottling.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/TagThrottle.actor.h" +#include "fdbserver/workloads/workloads.actor.h" + +#include "flow/actorcompiler.h" // This must be the last #include. + +class GlobalTagThrottlingWorkload : public TestWorkload { + TransactionTag transactionTag; + double reservedReadQuota{ 0.0 }; + double totalReadQuota{ 0.0 }; + double reservedWriteQuota{ 0.0 }; + double totalWriteQuota{ 0.0 }; + + ACTOR static Future setup(GlobalTagThrottlingWorkload* self, Database cx) { + state Reference tr = makeReference(cx); + loop { + try { + tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + TraceEvent("GlobalTagThrottlingWorkload_SettingTagQuota") + .detail("Tag", self->transactionTag) + .detail("ReservedReadQuota", self->reservedReadQuota) + .detail("TotalReadQuota", self->totalReadQuota) + .detail("ReservedWriteQuota", self->reservedWriteQuota) + .detail("TotalWriteQuota", self->totalWriteQuota); + ThrottleApi::setTagQuota(tr, + self->transactionTag, + self->reservedReadQuota, + self->totalReadQuota, + self->reservedWriteQuota, + self->totalWriteQuota); + wait(tr->commit()); + return Void(); + } catch (Error& e) { + wait(tr->onError(e)); + } + }; + } + +public: + explicit GlobalTagThrottlingWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) { + transactionTag = getOption(options, "transactionTag"_sr, "sampleTag"_sr); + reservedReadQuota = getOption(options, "reservedReadQuota"_sr, 0.0); + totalReadQuota = getOption(options, "totalReadQuota"_sr, 0.0); + reservedWriteQuota = getOption(options, "reservedWriteQuota"_sr, 0.0); + totalWriteQuota = getOption(options, "totalWriteQuota"_sr, 0.0); + } + + std::string description() const override { return "GlobalTagThrottling"; } + Future setup(Database const& cx) override { return clientId ? Void() : setup(this, cx); } + Future start(Database const& cx) override { return Void(); } + Future check(Database const& cx) override { return true; } + void getMetrics(std::vector& m) override {} +}; + +WorkloadFactory GlobalTagThrottlingWorkloadFactory("GlobalTagThrottling"); diff --git a/fdbserver/workloads/ReadWrite.actor.cpp b/fdbserver/workloads/ReadWrite.actor.cpp index 475c3a023c..ef77eaae76 100644 --- a/fdbserver/workloads/ReadWrite.actor.cpp +++ b/fdbserver/workloads/ReadWrite.actor.cpp @@ -61,6 +61,7 @@ struct ReadWriteCommonImpl { throw; } } + ACTOR static Future tracePeriodically(ReadWriteCommon* self) { state double start = now(); state double elapsed = 0.0; @@ -376,6 +377,9 @@ struct ReadWriteWorkload : ReadWriteCommon { bool adjacentReads; // keys are adjacent within a transaction bool adjacentWrites; int extraReadConflictRangesPerTransaction, extraWriteConflictRangesPerTransaction; + Optional transactionTag; + + int transactionsTagThrottled{ 0 }; // hot traffic pattern double hotKeyFraction, forceHotProbability = 0; // key based hot traffic setting @@ -397,6 +401,9 @@ struct ReadWriteWorkload : ReadWriteCommon { rampUpConcurrency = getOption(options, LiteralStringRef("rampUpConcurrency"), false); batchPriority = getOption(options, LiteralStringRef("batchPriority"), false); descriptionString = getOption(options, LiteralStringRef("description"), LiteralStringRef("ReadWrite")); + if (hasOption(options, LiteralStringRef("transactionTag"))) { + transactionTag = getOption(options, LiteralStringRef("transactionTag"), ""_sr); + } if (rampUpConcurrency) ASSERT(rampSweepCount == 2); // Implementation is hard coded to ramp up and down @@ -415,15 +422,18 @@ struct ReadWriteWorkload : ReadWriteCommon { } } - std::string description() const override { return descriptionString.toString(); } - template - void setupTransaction(Trans* tr) { + void setupTransaction(Trans& tr) { if (batchPriority) { - tr->setOption(FDBTransactionOptions::PRIORITY_BATCH); + tr.setOption(FDBTransactionOptions::PRIORITY_BATCH); + } + if (transactionTag.present() && tr.getTags().size() == 0) { + tr.setOption(FDBTransactionOptions::AUTO_THROTTLE_TAG, transactionTag.get()); } } + std::string description() const override { return descriptionString.toString(); } + void getMetrics(std::vector& m) override { ReadWriteCommon::getMetrics(m); if (!rampUpLoad) { @@ -449,6 +459,9 @@ struct ReadWriteWorkload : ReadWriteCommon { m.emplace_back("Mean Commit Latency (ms)", 1000 * commitLatencies.mean(), Averaged::True); m.emplace_back("Median Commit Latency (ms, averaged)", 1000 * commitLatencies.median(), Averaged::True); m.emplace_back("Max Commit Latency (ms, averaged)", 1000 * commitLatencies.max(), Averaged::True); + if (transactionTag.present()) { + m.emplace_back("Transaction Tag Throttled", transactionsTagThrottled, Averaged::False); + } } } @@ -494,11 +507,14 @@ struct ReadWriteWorkload : ReadWriteCommon { state Transaction tr(cx); try { - self->setupTransaction(&tr); + self->setupTransaction(tr); wait(self->readOp(&tr, keys, self, false)); wait(tr.warmRange(allKeys)); break; } catch (Error& e) { + if (e.code() == error_code_tag_throttled) { + ++self->transactionsTagThrottled; + } wait(tr.onError(e)); } } @@ -625,7 +641,7 @@ struct ReadWriteWorkload : ReadWriteCommon { loop { try { - self->setupTransaction(&tr); + self->setupTransaction(tr); GRVStartTime = now(); self->transactionFailureMetric->startLatency = -1; diff --git a/flow/include/flow/error_definitions.h b/flow/include/flow/error_definitions.h index f296bc943a..46e0d81c14 100755 --- a/flow/include/flow/error_definitions.h +++ b/flow/include/flow/error_definitions.h @@ -196,7 +196,7 @@ ERROR( key_not_tuple, 2041, "The key cannot be parsed as a tuple" ); ERROR( value_not_tuple, 2042, "The value cannot be parsed as a tuple" ); ERROR( mapper_not_tuple, 2043, "The mapper cannot be parsed as a tuple" ); ERROR( invalid_checkpoint_format, 2044, "Invalid checkpoint format" ) - +ERROR( invalid_throttle_quota_value, 2045, "Failed to deserialize or initialize throttle quota value" ) ERROR( incompatible_protocol_version, 2100, "Incompatible protocol version" ) ERROR( transaction_too_large, 2101, "Transaction exceeds byte limit" ) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index dcdb46ffb3..5e16edebd5 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -208,6 +208,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES rare/CycleWithKills.toml) add_fdb_test(TEST_FILES rare/CycleWithDeadHall.toml) add_fdb_test(TEST_FILES rare/FuzzTest.toml) + add_fdb_test(TEST_FILES rare/GlobalTagThrottling.toml IGNORE) add_fdb_test(TEST_FILES rare/HighContentionPrefixAllocator.toml) add_fdb_test(TEST_FILES rare/InventoryTestHeavyWrites.toml) add_fdb_test(TEST_FILES rare/LargeApiCorrectness.toml) diff --git a/tests/rare/GlobalTagThrottling.toml b/tests/rare/GlobalTagThrottling.toml new file mode 100644 index 0000000000..58cda2312e --- /dev/null +++ b/tests/rare/GlobalTagThrottling.toml @@ -0,0 +1,41 @@ +[[test]] +testTitle='GlobalTagThrottling' + + [[test.knobs]] + min_tag_read_pages_rate=1.0 + global_tag_throttling=true + + [[test.workload]] + testName='GlobalTagThrottling' + transactionTag='sampleTag1' + totalReadQuota=1.0 + + [[test.workload]] + testName='ReadWrite' + testDuration=600.0 + transactionsPerSecond=100 + writesPerTransactionA=0 + readsPerTransactionA=10 + writesPerTransactionB=0 + readsPerTransactionB=0 + alpha=0.0 + nodeCount=10000 + valueBytes=1000 + minValueBytes=1000 + warmingDelay=60.0 + transactionTag='sampleTag1' + + [[test.workload]] + testName='ReadWrite' + testDuration=600.0 + transactionsPerSecond=100 + writesPerTransactionA=0 + readsPerTransactionA=10 + writesPerTransactionB=0 + readsPerTransactionB=0 + alpha=0.0 + nodeCount=10000 + valueBytes=1000 + minValueBytes=1000 + warmingDelay=60.0 + transactionTag='sampleTag2'