Merge pull request #7112 from sfc-gh-tclinkenbeard/global-tag-throttling3
Create Global Tag Throttler
This commit is contained in:
commit
db769667ae
|
@ -0,0 +1,178 @@
|
|||
/*
|
||||
* QuotaCommand.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbcli/fdbcli.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last include
|
||||
|
||||
namespace {
|
||||
|
||||
enum class LimitType { RESERVED, TOTAL };
|
||||
|
||||
enum class OpType { READ, WRITE };
|
||||
|
||||
Optional<TransactionTag> parseTag(StringRef token) {
|
||||
if (token.size() > CLIENT_KNOBS->MAX_TRANSACTION_TAG_LENGTH) {
|
||||
return {};
|
||||
} else {
|
||||
return token;
|
||||
}
|
||||
}
|
||||
|
||||
Optional<LimitType> parseLimitType(StringRef token) {
|
||||
if (token == "reserved"_sr) {
|
||||
return LimitType::RESERVED;
|
||||
} else if (token == "total"_sr) {
|
||||
return LimitType::TOTAL;
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
Optional<OpType> parseOpType(StringRef token) {
|
||||
if (token == "read"_sr) {
|
||||
return OpType::READ;
|
||||
} else if (token == "write"_sr) {
|
||||
return OpType::WRITE;
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
Optional<double> parseLimitValue(StringRef token) {
|
||||
try {
|
||||
return std::stod(token.toString());
|
||||
} catch (...) {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> getQuota(Reference<IDatabase> db, TransactionTag tag, LimitType limitType, OpType opType) {
|
||||
state Reference<ITransaction> tr = db->createTransaction();
|
||||
loop {
|
||||
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
|
||||
try {
|
||||
state ThreadFuture<Optional<Value>> resultFuture = tr->get(tag.withPrefix(tagQuotaPrefix));
|
||||
Optional<Value> v = wait(safeThreadFutureToFuture(resultFuture));
|
||||
if (!v.present()) {
|
||||
fmt::print("<empty>\n");
|
||||
} else {
|
||||
auto const quota = ThrottleApi::TagQuotaValue::fromValue(v.get());
|
||||
if (limitType == LimitType::TOTAL && opType == OpType::READ) {
|
||||
fmt::print("{}\n", quota.totalReadQuota);
|
||||
} else if (limitType == LimitType::TOTAL && opType == OpType::WRITE) {
|
||||
fmt::print("{}\n", quota.totalWriteQuota);
|
||||
} else if (limitType == LimitType::RESERVED && opType == OpType::READ) {
|
||||
fmt::print("{}\n", quota.reservedReadQuota);
|
||||
} else if (limitType == LimitType::RESERVED && opType == OpType::WRITE) {
|
||||
fmt::print("{}\n", quota.reservedWriteQuota);
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(safeThreadFutureToFuture(tr->onError(e)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> setQuota(Reference<IDatabase> db,
|
||||
TransactionTag tag,
|
||||
LimitType limitType,
|
||||
OpType opType,
|
||||
double value) {
|
||||
state Reference<ITransaction> tr = db->createTransaction();
|
||||
state Key key = tag.withPrefix(tagQuotaPrefix);
|
||||
loop {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
try {
|
||||
state ThreadFuture<Optional<Value>> resultFuture = tr->get(key);
|
||||
Optional<Value> v = wait(safeThreadFutureToFuture(resultFuture));
|
||||
ThrottleApi::TagQuotaValue quota;
|
||||
if (v.present()) {
|
||||
quota = ThrottleApi::TagQuotaValue::fromValue(v.get());
|
||||
}
|
||||
if (limitType == LimitType::TOTAL && opType == OpType::READ) {
|
||||
quota.totalReadQuota = value;
|
||||
} else if (limitType == LimitType::TOTAL && opType == OpType::WRITE) {
|
||||
quota.totalWriteQuota = value;
|
||||
} else if (limitType == LimitType::RESERVED && opType == OpType::READ) {
|
||||
quota.reservedReadQuota = value;
|
||||
} else if (limitType == LimitType::RESERVED && opType == OpType::WRITE) {
|
||||
quota.reservedWriteQuota = value;
|
||||
}
|
||||
ThrottleApi::setTagQuota(tr,
|
||||
tag,
|
||||
quota.reservedReadQuota,
|
||||
quota.totalReadQuota,
|
||||
quota.reservedWriteQuota,
|
||||
quota.totalWriteQuota);
|
||||
wait(safeThreadFutureToFuture(tr->commit()));
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(safeThreadFutureToFuture(tr->onError(e)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
constexpr auto usage =
|
||||
"quota [get <tag> [reserved|total] [read|write]|set <tag> [reserved|total] [read|write] <value>]";
|
||||
|
||||
bool exitFailure() {
|
||||
fmt::print(usage);
|
||||
return false;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
namespace fdb_cli {
|
||||
|
||||
ACTOR Future<bool> quotaCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens) {
|
||||
state bool result = true;
|
||||
if (tokens.size() != 5 && tokens.size() != 6) {
|
||||
return exitFailure();
|
||||
} else {
|
||||
auto tag = parseTag(tokens[2]);
|
||||
auto limitType = parseLimitType(tokens[3]);
|
||||
auto opType = parseOpType(tokens[4]);
|
||||
if (!tag.present() || !limitType.present() || !opType.present()) {
|
||||
return exitFailure();
|
||||
}
|
||||
if (tokens[1] == "get"_sr) {
|
||||
if (tokens.size() != 5) {
|
||||
return exitFailure();
|
||||
}
|
||||
wait(getQuota(db, tag.get(), limitType.get(), opType.get()));
|
||||
return true;
|
||||
} else if (tokens[1] == "set"_sr) {
|
||||
if (tokens.size() != 6) {
|
||||
return exitFailure();
|
||||
}
|
||||
auto const limitValue = parseLimitValue(tokens[5]);
|
||||
if (!limitValue.present()) {
|
||||
return exitFailure();
|
||||
}
|
||||
wait(setQuota(db, tag.get(), limitType.get(), opType.get(), limitValue.get()));
|
||||
return true;
|
||||
} else {
|
||||
return exitFailure();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace fdb_cli
|
|
@ -509,6 +509,10 @@ void initHelp() {
|
|||
CommandHelp("getversion",
|
||||
"Fetch the current read version",
|
||||
"Displays the current read version of the database or currently running transaction.");
|
||||
helpMap["quota"] =
|
||||
CommandHelp("quota",
|
||||
"quota [get <tag> [reserved|total] [read|write]|set <tag> [reserved|total] [read|write] <value>]",
|
||||
"Get or modify the throughput quota for the specified tag.");
|
||||
helpMap["reset"] =
|
||||
CommandHelp("reset",
|
||||
"reset the current transaction",
|
||||
|
@ -1468,6 +1472,14 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (tokencmp(tokens[0], "quota")) {
|
||||
bool _result = wait(makeInterruptable(quotaCommandActor(db, tokens)));
|
||||
if (!_result) {
|
||||
is_error = true;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tokencmp(tokens[0], "reset")) {
|
||||
if (tokens.size() != 1) {
|
||||
printUsage(tokens[0]);
|
||||
|
|
|
@ -218,6 +218,8 @@ ACTOR Future<bool> profileCommandActor(Database db,
|
|||
Reference<ITransaction> tr,
|
||||
std::vector<StringRef> tokens,
|
||||
bool intrans);
|
||||
// quota command
|
||||
ACTOR Future<bool> quotaCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
|
||||
// setclass command
|
||||
ACTOR Future<bool> setClassCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens);
|
||||
// snapshot command
|
||||
|
|
|
@ -5151,8 +5151,9 @@ Future<Optional<Value>> Transaction::get(const Key& key, Snapshot snapshot) {
|
|||
if (!ver.isReady() || metadataVersion.isSet()) {
|
||||
return metadataVersion.getFuture();
|
||||
} else {
|
||||
if (ver.isError())
|
||||
if (ver.isError()) {
|
||||
return ver.getError();
|
||||
}
|
||||
if (ver.get() == trState->cx->metadataVersionCache[trState->cx->mvCacheInsertLocation].first) {
|
||||
return trState->cx->metadataVersionCache[trState->cx->mvCacheInsertLocation].second;
|
||||
}
|
||||
|
@ -5756,6 +5757,10 @@ void Transaction::resetImpl(bool generateNewSpan) {
|
|||
cancelWatches();
|
||||
}
|
||||
|
||||
TagSet const& Transaction::getTags() const {
|
||||
return trState->options.tags;
|
||||
}
|
||||
|
||||
void Transaction::reset() {
|
||||
resetImpl(false);
|
||||
}
|
||||
|
@ -7060,6 +7065,25 @@ Future<ProtocolVersion> DatabaseContext::getClusterProtocol(Optional<ProtocolVer
|
|||
return getClusterProtocolImpl(coordinator, expectedVersion);
|
||||
}
|
||||
|
||||
double ClientTagThrottleData::throttleDuration() const {
|
||||
if (expiration <= now()) {
|
||||
return 0.0;
|
||||
}
|
||||
|
||||
double capacity =
|
||||
(smoothRate.smoothTotal() - smoothReleased.smoothRate()) * CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW;
|
||||
|
||||
if (capacity >= 1) {
|
||||
return 0.0;
|
||||
}
|
||||
|
||||
if (tpsRate == 0) {
|
||||
return std::max(0.0, expiration - now());
|
||||
}
|
||||
|
||||
return std::min(expiration - now(), capacity / tpsRate);
|
||||
}
|
||||
|
||||
uint32_t Transaction::getSize() {
|
||||
auto s = tr.transaction.mutations.expectedSize() + tr.transaction.read_conflict_ranges.expectedSize() +
|
||||
tr.transaction.write_conflict_ranges.expectedSize();
|
||||
|
|
|
@ -662,6 +662,11 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
|
|||
init( AUTO_TAG_THROTTLE_UPDATE_FREQUENCY, 10.0 ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLE_UPDATE_FREQUENCY = 0.5;
|
||||
init( TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL, 30.0 ); if(randomize && BUGGIFY) TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL = 1.0;
|
||||
init( AUTO_TAG_THROTTLING_ENABLED, true ); if(randomize && BUGGIFY) AUTO_TAG_THROTTLING_ENABLED = false;
|
||||
init( SS_THROTTLE_TAGS_TRACKED, 1 ); if(randomize && BUGGIFY) SS_THROTTLE_TAGS_TRACKED = deterministicRandom()->randomInt(1, 10);
|
||||
init( GLOBAL_TAG_THROTTLING, false );
|
||||
init( GLOBAL_TAG_THROTTLING_MIN_RATE, 1.0 );
|
||||
init( GLOBAL_TAG_THROTTLING_FOLDING_TIME, 10.0 );
|
||||
init( GLOBAL_TAG_THROTTLING_TRACE_INTERVAL, 5.0 );
|
||||
|
||||
//Storage Metrics
|
||||
init( STORAGE_METRICS_AVERAGE_INTERVAL, 120.0 );
|
||||
|
|
|
@ -18,12 +18,16 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/TagThrottle.actor.h"
|
||||
#include "fdbclient/CommitProxyInterface.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/TagThrottle.actor.h"
|
||||
#include "fdbclient/Tuple.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
|
||||
double const ClientTagThrottleLimits::NO_EXPIRATION = std::numeric_limits<double>::max();
|
||||
|
||||
void TagSet::addTag(TransactionTagRef tag) {
|
||||
ASSERT(CLIENT_KNOBS->MAX_TRANSACTION_TAG_LENGTH < 256); // Tag length is encoded with a single byte
|
||||
ASSERT(CLIENT_KNOBS->MAX_TAGS_PER_TRANSACTION < 256); // Number of tags is encoded with a single byte
|
||||
|
@ -124,6 +128,53 @@ TagThrottleValue TagThrottleValue::fromValue(const ValueRef& value) {
|
|||
return throttleValue;
|
||||
}
|
||||
|
||||
KeyRangeRef const tagQuotaKeys = KeyRangeRef("\xff/tagQuota/"_sr, "\xff/tagQuota0"_sr);
|
||||
KeyRef const tagQuotaPrefix = tagQuotaKeys.begin;
|
||||
|
||||
Key ThrottleApi::getTagQuotaKey(TransactionTagRef tag) {
|
||||
return tag.withPrefix(tagQuotaPrefix);
|
||||
}
|
||||
|
||||
bool ThrottleApi::TagQuotaValue::isValid() const {
|
||||
return reservedReadQuota <= totalReadQuota && reservedWriteQuota <= totalWriteQuota && reservedReadQuota >= 0 &&
|
||||
reservedWriteQuota >= 0;
|
||||
}
|
||||
|
||||
Value ThrottleApi::TagQuotaValue::toValue() const {
|
||||
Tuple tuple;
|
||||
tuple.appendDouble(reservedReadQuota);
|
||||
tuple.appendDouble(totalReadQuota);
|
||||
tuple.appendDouble(reservedWriteQuota);
|
||||
tuple.appendDouble(totalWriteQuota);
|
||||
return tuple.pack();
|
||||
}
|
||||
|
||||
ThrottleApi::TagQuotaValue ThrottleApi::TagQuotaValue::fromValue(ValueRef value) {
|
||||
auto tuple = Tuple::unpack(value);
|
||||
if (tuple.size() != 4) {
|
||||
throw invalid_throttle_quota_value();
|
||||
}
|
||||
TagQuotaValue result;
|
||||
try {
|
||||
result.reservedReadQuota = tuple.getDouble(0);
|
||||
result.totalReadQuota = tuple.getDouble(1);
|
||||
result.reservedWriteQuota = tuple.getDouble(2);
|
||||
result.totalWriteQuota = tuple.getDouble(3);
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevWarnAlways, "TagQuotaValueFailedToDeserialize").error(e);
|
||||
throw invalid_throttle_quota_value();
|
||||
}
|
||||
if (!result.isValid()) {
|
||||
TraceEvent(SevWarnAlways, "TagQuotaValueInvalidQuotas")
|
||||
.detail("ReservedReadQuota", result.reservedReadQuota)
|
||||
.detail("TotalReadQuota", result.totalReadQuota)
|
||||
.detail("ReservedWriteQuota", result.reservedWriteQuota)
|
||||
.detail("TotalWriteQuota", result.totalWriteQuota);
|
||||
throw invalid_throttle_quota_value();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
FDB_DEFINE_BOOLEAN_PARAM(ContainsRecommended);
|
||||
FDB_DEFINE_BOOLEAN_PARAM(Capitalize);
|
||||
|
||||
|
|
|
@ -116,23 +116,7 @@ public:
|
|||
|
||||
bool canRecheck() const { return lastCheck < now() - CLIENT_KNOBS->TAG_THROTTLE_RECHECK_INTERVAL; }
|
||||
|
||||
double throttleDuration() const {
|
||||
if (expiration <= now()) {
|
||||
return 0.0;
|
||||
}
|
||||
|
||||
double capacity =
|
||||
(smoothRate.smoothTotal() - smoothReleased.smoothRate()) * CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW;
|
||||
if (capacity >= 1) {
|
||||
return 0.0;
|
||||
}
|
||||
|
||||
if (tpsRate == 0) {
|
||||
return std::max(0.0, expiration - now());
|
||||
}
|
||||
|
||||
return std::min(expiration - now(), capacity / tpsRate);
|
||||
}
|
||||
double throttleDuration() const;
|
||||
};
|
||||
|
||||
struct WatchParameters : public ReferenceCounted<WatchParameters> {
|
||||
|
|
|
@ -465,6 +465,7 @@ public:
|
|||
|
||||
Reference<TransactionState> trState;
|
||||
std::vector<Reference<Watch>> watches;
|
||||
TagSet const& getTags() const;
|
||||
Span span;
|
||||
|
||||
// used in template functions as returned Future type
|
||||
|
|
|
@ -196,6 +196,7 @@ public:
|
|||
Transaction& getTransaction() { return tr; }
|
||||
|
||||
Optional<TenantName> getTenant() { return tr.getTenant(); }
|
||||
TagSet const& getTags() const { return tr.getTags(); }
|
||||
|
||||
// used in template functions as returned Future type
|
||||
template <typename Type>
|
||||
|
|
|
@ -564,6 +564,7 @@ public:
|
|||
int64_t TLOG_RECOVER_MEMORY_LIMIT;
|
||||
double TLOG_IGNORE_POP_AUTO_ENABLE_DELAY;
|
||||
|
||||
// Tag throttling
|
||||
int64_t MAX_MANUAL_THROTTLED_TRANSACTION_TAGS;
|
||||
int64_t MAX_AUTO_THROTTLED_TRANSACTION_TAGS;
|
||||
double MIN_TAG_COST;
|
||||
|
@ -576,6 +577,17 @@ public:
|
|||
double AUTO_TAG_THROTTLE_UPDATE_FREQUENCY;
|
||||
double TAG_THROTTLE_EXPIRED_CLEANUP_INTERVAL;
|
||||
bool AUTO_TAG_THROTTLING_ENABLED;
|
||||
// Limit to the number of throttling tags each storage server
|
||||
// will track and send to the ratekeeper
|
||||
int64_t SS_THROTTLE_TAGS_TRACKED;
|
||||
// Use global tag throttling strategy. i.e. throttle based on the cluster-wide
|
||||
// throughput for tags and their associated quotas.
|
||||
bool GLOBAL_TAG_THROTTLING;
|
||||
// Minimum number of transactions per second that the global tag throttler must allow for each tag
|
||||
double GLOBAL_TAG_THROTTLING_MIN_RATE;
|
||||
// Used by global tag throttling counters
|
||||
double GLOBAL_TAG_THROTTLING_FOLDING_TIME;
|
||||
double GLOBAL_TAG_THROTTLING_TRACE_INTERVAL;
|
||||
|
||||
double MAX_TRANSACTIONS_PER_BYTE;
|
||||
|
||||
|
|
|
@ -389,6 +389,8 @@ extern const KeyRef tagThrottleSignalKey;
|
|||
extern const KeyRef tagThrottleAutoEnabledKey;
|
||||
extern const KeyRef tagThrottleLimitKey;
|
||||
extern const KeyRef tagThrottleCountKey;
|
||||
extern const KeyRangeRef tagQuotaKeys;
|
||||
extern const KeyRef tagQuotaPrefix;
|
||||
|
||||
// Log Range constant variables
|
||||
// Used in the backup pipeline to track mutations
|
||||
|
|
|
@ -207,6 +207,8 @@ struct ClientTagThrottleLimits {
|
|||
double tpsRate;
|
||||
double expiration;
|
||||
|
||||
static double const NO_EXPIRATION;
|
||||
|
||||
ClientTagThrottleLimits() : tpsRate(0), expiration(0) {}
|
||||
ClientTagThrottleLimits(double tpsRate, double expiration) : tpsRate(tpsRate), expiration(expiration) {}
|
||||
|
||||
|
@ -595,6 +597,38 @@ Future<Void> enableAuto(Reference<DB> db, bool enabled) {
|
|||
}
|
||||
}
|
||||
|
||||
class TagQuotaValue {
|
||||
public:
|
||||
double reservedReadQuota{ 0.0 };
|
||||
double totalReadQuota{ 0.0 };
|
||||
double reservedWriteQuota{ 0.0 };
|
||||
double totalWriteQuota{ 0.0 };
|
||||
bool isValid() const;
|
||||
Value toValue() const;
|
||||
static TagQuotaValue fromValue(ValueRef);
|
||||
};
|
||||
|
||||
Key getTagQuotaKey(TransactionTagRef);
|
||||
|
||||
template <class Tr>
|
||||
void setTagQuota(Reference<Tr> tr,
|
||||
TransactionTagRef tag,
|
||||
double reservedReadQuota,
|
||||
double totalReadQuota,
|
||||
double reservedWriteQuota,
|
||||
double totalWriteQuota) {
|
||||
TagQuotaValue tagQuotaValue;
|
||||
tagQuotaValue.reservedReadQuota = reservedReadQuota;
|
||||
tagQuotaValue.totalReadQuota = totalReadQuota;
|
||||
tagQuotaValue.reservedWriteQuota = reservedWriteQuota;
|
||||
tagQuotaValue.totalWriteQuota = totalWriteQuota;
|
||||
if (!tagQuotaValue.isValid()) {
|
||||
throw invalid_throttle_quota_value();
|
||||
}
|
||||
tr->set(getTagQuotaKey(tag), tagQuotaValue.toValue());
|
||||
signalThrottleChange(tr);
|
||||
}
|
||||
|
||||
}; // namespace ThrottleApi
|
||||
|
||||
template <class Value>
|
||||
|
|
|
@ -0,0 +1,533 @@
|
|||
/*
|
||||
* GlobalTagThrottler.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/TagThrottle.actor.h"
|
||||
#include "fdbrpc/Smoother.h"
|
||||
#include "fdbserver/TagThrottler.h"
|
||||
|
||||
#include <limits>
|
||||
|
||||
#include "flow/actorcompiler.h" // must be last include
|
||||
|
||||
class GlobalTagThrottlerImpl {
|
||||
class QuotaAndCounters {
|
||||
Optional<ThrottleApi::TagQuotaValue> quota;
|
||||
std::unordered_map<UID, double> ssToReadCostRate;
|
||||
std::unordered_map<UID, double> ssToWriteCostRate;
|
||||
Smoother totalReadCostRate;
|
||||
Smoother totalWriteCostRate;
|
||||
Smoother transactionCounter;
|
||||
Smoother perClientRate;
|
||||
|
||||
Optional<double> getReadTPSLimit() const {
|
||||
if (totalReadCostRate.smoothTotal() > 0) {
|
||||
return quota.get().totalReadQuota * transactionCounter.smoothRate() / totalReadCostRate.smoothTotal();
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
Optional<double> getWriteTPSLimit() const {
|
||||
if (totalWriteCostRate.smoothTotal() > 0) {
|
||||
return quota.get().totalWriteQuota * transactionCounter.smoothRate() / totalWriteCostRate.smoothTotal();
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
QuotaAndCounters()
|
||||
: totalReadCostRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME),
|
||||
totalWriteCostRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME),
|
||||
transactionCounter(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME),
|
||||
perClientRate(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_FOLDING_TIME) {}
|
||||
|
||||
void setQuota(ThrottleApi::TagQuotaValue const& quota) { this->quota = quota; }
|
||||
|
||||
void updateReadCostRate(UID ssId, double newReadCostRate) {
|
||||
auto& currentReadCostRate = ssToReadCostRate[ssId];
|
||||
auto diff = newReadCostRate - currentReadCostRate;
|
||||
currentReadCostRate += diff;
|
||||
totalReadCostRate.addDelta(diff);
|
||||
}
|
||||
|
||||
void updateWriteCostRate(UID ssId, double newWriteCostRate) {
|
||||
auto& currentWriteCostRate = ssToWriteCostRate[ssId];
|
||||
auto diff = newWriteCostRate - currentWriteCostRate;
|
||||
currentWriteCostRate += diff;
|
||||
totalWriteCostRate.addDelta(diff);
|
||||
}
|
||||
|
||||
void addTransactions(int count) { transactionCounter.addDelta(count); }
|
||||
|
||||
Optional<double> getTargetTotalTPSLimit() const {
|
||||
if (!quota.present())
|
||||
return {};
|
||||
auto readLimit = getReadTPSLimit();
|
||||
auto writeLimit = getWriteTPSLimit();
|
||||
|
||||
// TODO: Implement expiration logic
|
||||
if (!readLimit.present() && !writeLimit.present()) {
|
||||
return {};
|
||||
} else {
|
||||
if (!readLimit.present()) {
|
||||
return writeLimit.get();
|
||||
} else if (!writeLimit.present()) {
|
||||
return readLimit.get();
|
||||
} else {
|
||||
return std::min(readLimit.get(), writeLimit.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Optional<ClientTagThrottleLimits> updateAndGetPerClientLimit() {
|
||||
auto targetRate = getTargetTotalTPSLimit();
|
||||
if (targetRate.present() && transactionCounter.smoothRate() > 0) {
|
||||
auto newPerClientRate = std::max(
|
||||
SERVER_KNOBS->GLOBAL_TAG_THROTTLING_MIN_RATE,
|
||||
std::min(targetRate.get(),
|
||||
(targetRate.get() / transactionCounter.smoothRate()) * perClientRate.smoothTotal()));
|
||||
perClientRate.setTotal(newPerClientRate);
|
||||
return ClientTagThrottleLimits(perClientRate.getTotal(), ClientTagThrottleLimits::NO_EXPIRATION);
|
||||
} else {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
void processTraceEvent(TraceEvent& te) const {
|
||||
if (quota.present()) {
|
||||
te.detail("ProvidedReadTPSLimit", getReadTPSLimit())
|
||||
.detail("ProvidedWriteTPSLimit", getWriteTPSLimit())
|
||||
.detail("ReadCostRate", totalReadCostRate.smoothTotal())
|
||||
.detail("WriteCostRate", totalWriteCostRate.smoothTotal())
|
||||
.detail("TotalReadQuota", quota.get().totalReadQuota)
|
||||
.detail("ReservedReadQuota", quota.get().reservedReadQuota)
|
||||
.detail("TotalWriteQuota", quota.get().totalWriteQuota)
|
||||
.detail("ReservedWriteQuota", quota.get().reservedWriteQuota);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Database db;
|
||||
UID id;
|
||||
std::map<TransactionTag, QuotaAndCounters> trackedTags;
|
||||
uint64_t throttledTagChangeId{ 0 };
|
||||
Future<Void> traceActor;
|
||||
|
||||
ACTOR static Future<Void> tracer(GlobalTagThrottlerImpl const* self) {
|
||||
loop {
|
||||
for (const auto& [tag, quotaAndCounters] : self->trackedTags) {
|
||||
TraceEvent te("GlobalTagThrottling");
|
||||
te.detail("Tag", tag);
|
||||
quotaAndCounters.processTraceEvent(te);
|
||||
}
|
||||
wait(delay(SERVER_KNOBS->GLOBAL_TAG_THROTTLING_TRACE_INTERVAL));
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> monitorThrottlingChanges(GlobalTagThrottlerImpl* self) {
|
||||
loop {
|
||||
state ReadYourWritesTransaction tr(self->db);
|
||||
|
||||
loop {
|
||||
// TODO: Clean up quotas that have been removed
|
||||
try {
|
||||
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
|
||||
|
||||
state RangeResult currentQuotas = wait(tr.getRange(tagQuotaKeys, CLIENT_KNOBS->TOO_MANY));
|
||||
TraceEvent("GlobalTagThrottler_ReadCurrentQuotas").detail("Size", currentQuotas.size());
|
||||
for (auto const kv : currentQuotas) {
|
||||
auto const tag = kv.key.removePrefix(tagQuotaPrefix);
|
||||
auto const quota = ThrottleApi::TagQuotaValue::fromValue(kv.value);
|
||||
self->trackedTags[tag].setQuota(quota);
|
||||
}
|
||||
|
||||
++self->throttledTagChangeId;
|
||||
// FIXME: Should wait on watch instead
|
||||
// wait(tr.watch(tagThrottleSignalKey));
|
||||
wait(delay(5.0));
|
||||
TraceEvent("GlobalTagThrottler_ChangeSignaled");
|
||||
TEST(true); // Global tag throttler detected quota changes
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
TraceEvent("GlobalTagThrottlerMonitoringChangesError", self->id).error(e);
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
GlobalTagThrottlerImpl(Database db, UID id) : db(db), id(id) { traceActor = tracer(this); }
|
||||
Future<Void> monitorThrottlingChanges() { return monitorThrottlingChanges(this); }
|
||||
void addRequests(TransactionTag tag, int count) { trackedTags[tag].addTransactions(count); }
|
||||
uint64_t getThrottledTagChangeId() const { return throttledTagChangeId; }
|
||||
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() {
|
||||
// TODO: For now, only enforce total throttling rates.
|
||||
// We should use reserved quotas as well.
|
||||
PrioritizedTransactionTagMap<ClientTagThrottleLimits> result;
|
||||
for (auto& [tag, quotaAndCounters] : trackedTags) {
|
||||
// Currently there is no differentiation between batch priority and default priority transactions
|
||||
auto const limit = quotaAndCounters.updateAndGetPerClientLimit();
|
||||
if (limit.present()) {
|
||||
result[TransactionPriority::BATCH][tag] = result[TransactionPriority::DEFAULT][tag] = limit.get();
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
int64_t autoThrottleCount() const { return trackedTags.size(); }
|
||||
uint32_t busyReadTagCount() const {
|
||||
// TODO: Implement
|
||||
return 0;
|
||||
}
|
||||
uint32_t busyWriteTagCount() const {
|
||||
// TODO: Implement
|
||||
return 0;
|
||||
}
|
||||
int64_t manualThrottleCount() const { return trackedTags.size(); }
|
||||
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const& ss) {
|
||||
for (const auto& busyReadTag : ss.busiestReadTags) {
|
||||
trackedTags[busyReadTag.tag].updateReadCostRate(ss.id, busyReadTag.rate);
|
||||
}
|
||||
for (const auto& busyWriteTag : ss.busiestWriteTags) {
|
||||
trackedTags[busyWriteTag.tag].updateWriteCostRate(ss.id, busyWriteTag.rate);
|
||||
}
|
||||
// TODO: Call ThrottleApi::throttleTags
|
||||
return Void();
|
||||
}
|
||||
|
||||
void setQuota(TransactionTagRef tag, ThrottleApi::TagQuotaValue const& tagQuotaValue) {
|
||||
trackedTags[tag].setQuota(tagQuotaValue);
|
||||
}
|
||||
};
|
||||
|
||||
GlobalTagThrottler::GlobalTagThrottler(Database db, UID id) : impl(PImpl<GlobalTagThrottlerImpl>::create(db, id)) {}
|
||||
|
||||
GlobalTagThrottler::~GlobalTagThrottler() = default;
|
||||
|
||||
Future<Void> GlobalTagThrottler::monitorThrottlingChanges() {
|
||||
return impl->monitorThrottlingChanges();
|
||||
}
|
||||
void GlobalTagThrottler::addRequests(TransactionTag tag, int count) {
|
||||
return impl->addRequests(tag, count);
|
||||
}
|
||||
uint64_t GlobalTagThrottler::getThrottledTagChangeId() const {
|
||||
return impl->getThrottledTagChangeId();
|
||||
}
|
||||
PrioritizedTransactionTagMap<ClientTagThrottleLimits> GlobalTagThrottler::getClientRates() {
|
||||
return impl->getClientRates();
|
||||
}
|
||||
int64_t GlobalTagThrottler::autoThrottleCount() const {
|
||||
return impl->autoThrottleCount();
|
||||
}
|
||||
uint32_t GlobalTagThrottler::busyReadTagCount() const {
|
||||
return impl->busyReadTagCount();
|
||||
}
|
||||
uint32_t GlobalTagThrottler::busyWriteTagCount() const {
|
||||
return impl->busyWriteTagCount();
|
||||
}
|
||||
int64_t GlobalTagThrottler::manualThrottleCount() const {
|
||||
return impl->manualThrottleCount();
|
||||
}
|
||||
bool GlobalTagThrottler::isAutoThrottlingEnabled() const {
|
||||
return true;
|
||||
}
|
||||
Future<Void> GlobalTagThrottler::tryUpdateAutoThrottling(StorageQueueInfo const& ss) {
|
||||
return impl->tryUpdateAutoThrottling(ss);
|
||||
}
|
||||
|
||||
void GlobalTagThrottler::setQuota(TransactionTagRef tag, ThrottleApi::TagQuotaValue const& tagQuotaValue) {
|
||||
return impl->setQuota(tag, tagQuotaValue);
|
||||
}
|
||||
|
||||
namespace GlobalTagThrottlerTesting {
|
||||
|
||||
Optional<double> getTPSLimit(GlobalTagThrottler& globalTagThrottler, TransactionTag tag) {
|
||||
auto clientRates = globalTagThrottler.getClientRates();
|
||||
auto it1 = clientRates.find(TransactionPriority::DEFAULT);
|
||||
if (it1 != clientRates.end()) {
|
||||
auto it2 = it1->second.find(tag);
|
||||
if (it2 != it1->second.end()) {
|
||||
return it2->second.tpsRate;
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
class StorageServerCollection {
|
||||
class Cost {
|
||||
Smoother smoother;
|
||||
|
||||
public:
|
||||
Cost() : smoother(5.0) {}
|
||||
Cost& operator+=(double delta) {
|
||||
smoother.addDelta(delta);
|
||||
return *this;
|
||||
}
|
||||
double smoothRate() const { return smoother.smoothRate(); }
|
||||
};
|
||||
|
||||
std::vector<std::map<TransactionTag, Cost>> readCosts;
|
||||
std::vector<std::map<TransactionTag, Cost>> writeCosts;
|
||||
|
||||
public:
|
||||
StorageServerCollection(size_t size) : readCosts(size), writeCosts(size) { ASSERT_GT(size, 0); }
|
||||
|
||||
void addReadCost(TransactionTag tag, double cost) {
|
||||
auto const costPerSS = cost / readCosts.size();
|
||||
for (auto& readCost : readCosts) {
|
||||
readCost[tag] += costPerSS;
|
||||
}
|
||||
}
|
||||
|
||||
void addWriteCost(TransactionTag tag, double cost) {
|
||||
auto const costPerSS = cost / writeCosts.size();
|
||||
for (auto& writeCost : writeCosts) {
|
||||
writeCost[tag] += costPerSS;
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<StorageQueueInfo> getStorageQueueInfos() const {
|
||||
std::vector<StorageQueueInfo> result;
|
||||
result.reserve(readCosts.size());
|
||||
for (int i = 0; i < readCosts.size(); ++i) {
|
||||
StorageQueueInfo sqInfo(UID(i, i), LocalityData{});
|
||||
for (const auto& [tag, readCost] : readCosts[i]) {
|
||||
double fractionalBusyness{ 0.0 }; // unused for global tag throttling
|
||||
sqInfo.busiestReadTags.emplace_back(tag, readCost.smoothRate(), fractionalBusyness);
|
||||
}
|
||||
for (const auto& [tag, writeCost] : writeCosts[i]) {
|
||||
double fractionalBusyness{ 0.0 }; // unused for global tag throttling
|
||||
sqInfo.busiestWriteTags.emplace_back(tag, writeCost.smoothRate(), fractionalBusyness);
|
||||
}
|
||||
result.push_back(sqInfo);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
};
|
||||
|
||||
ACTOR static Future<Void> runClient(GlobalTagThrottler* globalTagThrottler,
|
||||
StorageServerCollection* storageServers,
|
||||
TransactionTag tag,
|
||||
double desiredTpsRate,
|
||||
double costPerTransaction,
|
||||
bool write) {
|
||||
loop {
|
||||
auto tpsLimit = getTPSLimit(*globalTagThrottler, tag);
|
||||
state double tpsRate = tpsLimit.present() ? std::min<double>(desiredTpsRate, tpsLimit.get()) : desiredTpsRate;
|
||||
wait(delay(1 / tpsRate));
|
||||
if (write) {
|
||||
storageServers->addWriteCost(tag, costPerTransaction);
|
||||
} else {
|
||||
storageServers->addReadCost(tag, costPerTransaction);
|
||||
}
|
||||
globalTagThrottler->addRequests(tag, 1);
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> monitorClientRates(GlobalTagThrottler* globalTagThrottler,
|
||||
TransactionTag tag,
|
||||
double desiredTPSLimit) {
|
||||
state int successes = 0;
|
||||
loop {
|
||||
wait(delay(1.0));
|
||||
auto currentTPSLimit = getTPSLimit(*globalTagThrottler, tag);
|
||||
if (currentTPSLimit.present()) {
|
||||
TraceEvent("GlobalTagThrottling_RateMonitor")
|
||||
.detail("Tag", tag)
|
||||
.detail("CurrentTPSRate", currentTPSLimit.get())
|
||||
.detail("DesiredTPSRate", desiredTPSLimit);
|
||||
if (abs(currentTPSLimit.get() - desiredTPSLimit) < 0.1) {
|
||||
if (++successes == 3) {
|
||||
return Void();
|
||||
}
|
||||
} else {
|
||||
successes = 0;
|
||||
}
|
||||
} else {
|
||||
successes = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> updateGlobalTagThrottler(GlobalTagThrottler* globalTagThrottler,
|
||||
StorageServerCollection const* storageServers) {
|
||||
loop {
|
||||
wait(delay(1.0));
|
||||
auto const storageQueueInfos = storageServers->getStorageQueueInfos();
|
||||
for (const auto& sq : storageQueueInfos) {
|
||||
globalTagThrottler->tryUpdateAutoThrottling(sq);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace GlobalTagThrottlerTesting
|
||||
|
||||
TEST_CASE("/GlobalTagThrottler/Simple") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
|
||||
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
|
||||
ThrottleApi::TagQuotaValue tagQuotaValue;
|
||||
TransactionTag testTag = "sampleTag1"_sr;
|
||||
tagQuotaValue.totalReadQuota = 100.0;
|
||||
globalTagThrottler.setQuota(testTag, tagQuotaValue);
|
||||
state Future<Void> client =
|
||||
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, false);
|
||||
state Future<Void> monitor =
|
||||
GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 100.0 / 6.0);
|
||||
state Future<Void> updater =
|
||||
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
|
||||
wait(timeoutError(monitor || client || updater, 300.0));
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/GlobalTagThrottler/WriteThrottling") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
|
||||
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
|
||||
ThrottleApi::TagQuotaValue tagQuotaValue;
|
||||
TransactionTag testTag = "sampleTag1"_sr;
|
||||
tagQuotaValue.totalWriteQuota = 100.0;
|
||||
globalTagThrottler.setQuota(testTag, tagQuotaValue);
|
||||
state Future<Void> client =
|
||||
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, true);
|
||||
state Future<Void> monitor =
|
||||
GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 100.0 / 6.0);
|
||||
state Future<Void> updater =
|
||||
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
|
||||
wait(timeoutError(monitor || client || updater, 300.0));
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/GlobalTagThrottler/MultiTagThrottling") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
|
||||
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
|
||||
ThrottleApi::TagQuotaValue tagQuotaValue;
|
||||
TransactionTag testTag1 = "sampleTag1"_sr;
|
||||
TransactionTag testTag2 = "sampleTag2"_sr;
|
||||
tagQuotaValue.totalReadQuota = 100.0;
|
||||
globalTagThrottler.setQuota(testTag1, tagQuotaValue);
|
||||
globalTagThrottler.setQuota(testTag2, tagQuotaValue);
|
||||
state std::vector<Future<Void>> futures;
|
||||
state std::vector<Future<Void>> monitorFutures;
|
||||
futures.push_back(
|
||||
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag1, 5.0, 6.0, false));
|
||||
futures.push_back(
|
||||
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag2, 5.0, 6.0, false));
|
||||
futures.push_back(GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers));
|
||||
monitorFutures.push_back(GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag1, 100.0 / 6.0));
|
||||
monitorFutures.push_back(GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag2, 100.0 / 6.0));
|
||||
wait(timeoutError(waitForAny(futures) || waitForAll(monitorFutures), 300.0));
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/GlobalTagThrottler/ActiveThrottling") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
|
||||
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
|
||||
ThrottleApi::TagQuotaValue tagQuotaValue;
|
||||
TransactionTag testTag = "sampleTag1"_sr;
|
||||
tagQuotaValue.totalReadQuota = 100.0;
|
||||
globalTagThrottler.setQuota(testTag, tagQuotaValue);
|
||||
state Future<Void> client =
|
||||
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 20.0, 10.0, false);
|
||||
state Future<Void> monitor = GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 10.0);
|
||||
state Future<Void> updater =
|
||||
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
|
||||
wait(timeoutError(monitor || client || updater, 300.0));
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/GlobalTagThrottler/MultiClientThrottling") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
|
||||
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
|
||||
ThrottleApi::TagQuotaValue tagQuotaValue;
|
||||
TransactionTag testTag = "sampleTag1"_sr;
|
||||
tagQuotaValue.totalReadQuota = 100.0;
|
||||
globalTagThrottler.setQuota(testTag, tagQuotaValue);
|
||||
state Future<Void> client =
|
||||
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, false);
|
||||
state Future<Void> client2 =
|
||||
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, false);
|
||||
state Future<Void> monitor =
|
||||
GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 100.0 / 6.0);
|
||||
state Future<Void> updater =
|
||||
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
|
||||
wait(timeoutError(monitor || client || updater, 300.0));
|
||||
return Void();
|
||||
}
|
||||
|
||||
TEST_CASE("/GlobalTagThrottler/MultiClientActiveThrottling") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
|
||||
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
|
||||
ThrottleApi::TagQuotaValue tagQuotaValue;
|
||||
TransactionTag testTag = "sampleTag1"_sr;
|
||||
tagQuotaValue.totalReadQuota = 100.0;
|
||||
globalTagThrottler.setQuota(testTag, tagQuotaValue);
|
||||
state Future<Void> client =
|
||||
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 20.0, 10.0, false);
|
||||
state Future<Void> client2 =
|
||||
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 20.0, 10.0, false);
|
||||
state Future<Void> monitor = GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 5.0);
|
||||
state Future<Void> updater =
|
||||
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
|
||||
wait(timeoutError(monitor || client || updater, 300.0));
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Global transaction rate should be 20.0, with a distribution of (5, 15) between the 2 clients
|
||||
TEST_CASE("/GlobalTagThrottler/SkewedMultiClientActiveThrottling") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
|
||||
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
|
||||
ThrottleApi::TagQuotaValue tagQuotaValue;
|
||||
TransactionTag testTag = "sampleTag1"_sr;
|
||||
tagQuotaValue.totalReadQuota = 100.0;
|
||||
globalTagThrottler.setQuota(testTag, tagQuotaValue);
|
||||
state Future<Void> client =
|
||||
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 5.0, false);
|
||||
state Future<Void> client2 =
|
||||
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 25.0, 5.0, false);
|
||||
state Future<Void> monitor = GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 15.0);
|
||||
state Future<Void> updater =
|
||||
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
|
||||
wait(timeoutError(monitor || client || updater, 300.0));
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Test that the tag throttler can reach equilibrium, then adjust to a new equilibrium once the quota is changed
|
||||
TEST_CASE("/GlobalTagThrottler/UpdateQuota") {
|
||||
state GlobalTagThrottler globalTagThrottler(Database{}, UID{});
|
||||
state GlobalTagThrottlerTesting::StorageServerCollection storageServers(10);
|
||||
state ThrottleApi::TagQuotaValue tagQuotaValue;
|
||||
state TransactionTag testTag = "sampleTag1"_sr;
|
||||
tagQuotaValue.totalReadQuota = 100.0;
|
||||
globalTagThrottler.setQuota(testTag, tagQuotaValue);
|
||||
state Future<Void> client =
|
||||
GlobalTagThrottlerTesting::runClient(&globalTagThrottler, &storageServers, testTag, 5.0, 6.0, false);
|
||||
state Future<Void> monitor =
|
||||
GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 100.0 / 6.0);
|
||||
state Future<Void> updater =
|
||||
GlobalTagThrottlerTesting::updateGlobalTagThrottler(&globalTagThrottler, &storageServers);
|
||||
wait(timeoutError(monitor || client || updater, 300.0));
|
||||
tagQuotaValue.totalReadQuota = 50.0;
|
||||
globalTagThrottler.setQuota(testTag, tagQuotaValue);
|
||||
monitor = GlobalTagThrottlerTesting::monitorClientRates(&globalTagThrottler, testTag, 50.0 / 6.0);
|
||||
wait(timeoutError(monitor || client || updater, 300.0));
|
||||
return Void();
|
||||
}
|
|
@ -227,11 +227,6 @@ public:
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> monitorThrottlingChanges(Ratekeeper* self) {
|
||||
wait(self->tagThrottler->monitorThrottlingChanges());
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
||||
state Ratekeeper self(rkInterf.id(), openDBOnServer(dbInfo, TaskPriority::DefaultEndpoint, LockAware::True));
|
||||
state Future<Void> timeout = Void();
|
||||
|
@ -408,7 +403,7 @@ Future<Void> Ratekeeper::trackTLogQueueInfo(TLogInterface tli) {
|
|||
}
|
||||
|
||||
Future<Void> Ratekeeper::monitorThrottlingChanges() {
|
||||
return RatekeeperImpl::monitorThrottlingChanges(this);
|
||||
return tagThrottler->monitorThrottlingChanges();
|
||||
}
|
||||
|
||||
Future<Void> Ratekeeper::run(RatekeeperInterface rkInterf, Reference<AsyncVar<ServerDBInfo> const> dbInfo) {
|
||||
|
@ -436,7 +431,11 @@ Ratekeeper::Ratekeeper(UID id, Database db)
|
|||
SERVER_KNOBS->SPRING_BYTES_TLOG_BATCH,
|
||||
SERVER_KNOBS->MAX_TL_SS_VERSION_DIFFERENCE_BATCH,
|
||||
SERVER_KNOBS->TARGET_DURABILITY_LAG_VERSIONS_BATCH) {
|
||||
tagThrottler = std::make_unique<TagThrottler>(db, id);
|
||||
if (SERVER_KNOBS->GLOBAL_TAG_THROTTLING) {
|
||||
tagThrottler = std::make_unique<GlobalTagThrottler>(db, id);
|
||||
} else {
|
||||
tagThrottler = std::make_unique<TagThrottler>(db, id);
|
||||
}
|
||||
}
|
||||
|
||||
void Ratekeeper::updateCommitCostEstimation(
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/RkTagThrottleCollection.h"
|
||||
|
||||
double RkTagThrottleCollection::RkTagThrottleData::getTargetRate(Optional<double> requestRate) {
|
||||
double RkTagThrottleCollection::RkTagThrottleData::getTargetRate(Optional<double> requestRate) const {
|
||||
if (limits.tpsRate == 0.0 || !requestRate.present() || requestRate.get() == 0.0 || !rateSet) {
|
||||
return limits.tpsRate;
|
||||
} else {
|
||||
|
@ -347,10 +347,12 @@ int64_t RkTagThrottleCollection::manualThrottleCount() const {
|
|||
return count;
|
||||
}
|
||||
|
||||
void RkTagThrottleCollection::updateBusyTagCount(TagThrottledReason reason) {
|
||||
void RkTagThrottleCollection::incrementBusyTagCount(TagThrottledReason reason) {
|
||||
if (reason == TagThrottledReason::BUSY_READ) {
|
||||
++busyReadTagCount;
|
||||
} else if (reason == TagThrottledReason::BUSY_WRITE) {
|
||||
++busyWriteTagCount;
|
||||
} else {
|
||||
ASSERT(false);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
|
||||
#include "fdbserver/TagThrottler.h"
|
||||
#include "fdbserver/RkTagThrottleCollection.h"
|
||||
#include "flow/actorcompiler.h" // must be last include
|
||||
|
||||
class TagThrottlerImpl {
|
||||
Database db;
|
||||
|
@ -106,7 +107,7 @@ class TagThrottlerImpl {
|
|||
if (tagKey.throttleType == TagThrottleType::AUTO) {
|
||||
updatedTagThrottles.autoThrottleTag(
|
||||
self->id, tag, 0, tagValue.tpsRate, tagValue.expirationTime);
|
||||
updatedTagThrottles.updateBusyTagCount(tagValue.reason);
|
||||
updatedTagThrottles.incrementBusyTagCount(tagValue.reason);
|
||||
} else {
|
||||
updatedTagThrottles.manualThrottleTag(self->id,
|
||||
tag,
|
||||
|
@ -143,6 +144,7 @@ class TagThrottlerImpl {
|
|||
if (busyness > SERVER_KNOBS->AUTO_THROTTLE_TARGET_TAG_BUSYNESS && rate > SERVER_KNOBS->MIN_TAG_COST) {
|
||||
TEST(true); // Transaction tag auto-throttled
|
||||
Optional<double> clientRate = throttledTags.autoThrottleTag(id, tag, busyness);
|
||||
// TODO: Increment tag throttle counts here?
|
||||
if (clientRate.present()) {
|
||||
TagSet tags;
|
||||
tags.addTag(tag);
|
||||
|
@ -185,23 +187,21 @@ public:
|
|||
// the future
|
||||
auto storageQueue = ss.getStorageQueueBytes();
|
||||
auto storageDurabilityLag = ss.getDurabilityLag();
|
||||
std::vector<Future<Void>> futures;
|
||||
if (storageQueue > SERVER_KNOBS->AUTO_TAG_THROTTLE_STORAGE_QUEUE_BYTES ||
|
||||
storageDurabilityLag > SERVER_KNOBS->AUTO_TAG_THROTTLE_DURABILITY_LAG_VERSIONS) {
|
||||
// TODO: Update once size is potentially > 1
|
||||
ASSERT_WE_THINK(ss.busiestWriteTags.size() <= 1);
|
||||
ASSERT_WE_THINK(ss.busiestReadTags.size() <= 1);
|
||||
for (const auto& busyWriteTag : ss.busiestWriteTags) {
|
||||
return tryUpdateAutoThrottling(busyWriteTag.tag,
|
||||
busyWriteTag.rate,
|
||||
busyWriteTag.fractionalBusyness,
|
||||
TagThrottledReason::BUSY_WRITE);
|
||||
futures.push_back(tryUpdateAutoThrottling(busyWriteTag.tag,
|
||||
busyWriteTag.rate,
|
||||
busyWriteTag.fractionalBusyness,
|
||||
TagThrottledReason::BUSY_WRITE));
|
||||
}
|
||||
for (const auto& busyReadTag : ss.busiestReadTags) {
|
||||
return tryUpdateAutoThrottling(
|
||||
busyReadTag.tag, busyReadTag.rate, busyReadTag.fractionalBusyness, TagThrottledReason::BUSY_READ);
|
||||
futures.push_back(tryUpdateAutoThrottling(
|
||||
busyReadTag.tag, busyReadTag.rate, busyReadTag.fractionalBusyness, TagThrottledReason::BUSY_READ));
|
||||
}
|
||||
}
|
||||
return Void();
|
||||
return waitForAll(futures);
|
||||
}
|
||||
|
||||
}; // class TagThrottlerImpl
|
||||
|
|
|
@ -18,50 +18,193 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "fdbserver/TransactionTagCounter.h"
|
||||
#include "flow/Trace.h"
|
||||
|
||||
TransactionTagCounter::TransactionTagCounter(UID thisServerID)
|
||||
: thisServerID(thisServerID),
|
||||
busiestReadTagEventHolder(makeReference<EventCacheHolder>(thisServerID.toString() + "/BusiestReadTag")) {}
|
||||
namespace {
|
||||
|
||||
void TransactionTagCounter::addRequest(Optional<TagSet> const& tags, int64_t bytes) {
|
||||
if (tags.present()) {
|
||||
TEST(true); // Tracking transaction tag in counter
|
||||
double cost = costFunction(bytes);
|
||||
for (auto& tag : tags.get()) {
|
||||
int64_t& count = intervalCounts[TransactionTag(tag, tags.get().getArena())];
|
||||
count += cost;
|
||||
if (count > busiestTagCount) {
|
||||
busiestTagCount = count;
|
||||
busiestTag = tag;
|
||||
class TopKTags {
|
||||
public:
|
||||
struct TagAndCount {
|
||||
TransactionTag tag;
|
||||
int64_t count;
|
||||
bool operator<(TagAndCount const& other) const { return count < other.count; }
|
||||
explicit TagAndCount(TransactionTag tag, int64_t count) : tag(tag), count(count) {}
|
||||
};
|
||||
|
||||
private:
|
||||
// Because the number of tracked is expected to be small, they can be tracked
|
||||
// in a simple vector. If the number of tracked tags increases, a more sophisticated
|
||||
// data structure will be required.
|
||||
std::vector<TagAndCount> topTags;
|
||||
int limit;
|
||||
|
||||
public:
|
||||
explicit TopKTags(int limit) : limit(limit) {
|
||||
ASSERT_GT(limit, 0);
|
||||
topTags.reserve(limit);
|
||||
}
|
||||
|
||||
void incrementCount(TransactionTag tag, int previousCount, int increase) {
|
||||
auto iter = std::find_if(topTags.begin(), topTags.end(), [tag](const auto& tc) { return tc.tag == tag; });
|
||||
if (iter != topTags.end()) {
|
||||
ASSERT_EQ(previousCount, iter->count);
|
||||
iter->count += increase;
|
||||
} else if (topTags.size() < limit) {
|
||||
ASSERT_EQ(previousCount, 0);
|
||||
topTags.emplace_back(tag, increase);
|
||||
} else {
|
||||
auto toReplace = std::min_element(topTags.begin(), topTags.end());
|
||||
ASSERT_GE(toReplace->count, previousCount);
|
||||
if (toReplace->count < previousCount + increase) {
|
||||
toReplace->tag = tag;
|
||||
toReplace->count = previousCount + increase;
|
||||
}
|
||||
}
|
||||
|
||||
intervalTotalSampledCount += cost;
|
||||
}
|
||||
|
||||
std::vector<StorageQueuingMetricsReply::TagInfo> getBusiestTags(double elapsed, double totalSampleCount) const {
|
||||
std::vector<StorageQueuingMetricsReply::TagInfo> result;
|
||||
for (auto const& tagAndCounter : topTags) {
|
||||
auto rate = (tagAndCounter.count / CLIENT_KNOBS->READ_TAG_SAMPLE_RATE) / elapsed;
|
||||
if (rate > SERVER_KNOBS->MIN_TAG_READ_PAGES_RATE) {
|
||||
result.emplace_back(tagAndCounter.tag, rate, tagAndCounter.count / totalSampleCount);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void clear() { topTags.clear(); }
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
class TransactionTagCounterImpl {
|
||||
UID thisServerID;
|
||||
TransactionTagMap<int64_t> intervalCounts;
|
||||
int64_t intervalTotalSampledCount = 0;
|
||||
TopKTags topTags;
|
||||
double intervalStart = 0;
|
||||
|
||||
std::vector<StorageQueuingMetricsReply::TagInfo> previousBusiestTags;
|
||||
Reference<EventCacheHolder> busiestReadTagEventHolder;
|
||||
|
||||
static int64_t costFunction(int64_t bytes) { return bytes / SERVER_KNOBS->READ_COST_BYTE_FACTOR + 1; }
|
||||
|
||||
public:
|
||||
TransactionTagCounterImpl(UID thisServerID)
|
||||
: thisServerID(thisServerID), topTags(SERVER_KNOBS->SS_THROTTLE_TAGS_TRACKED),
|
||||
busiestReadTagEventHolder(makeReference<EventCacheHolder>(thisServerID.toString() + "/BusiestReadTag")) {}
|
||||
|
||||
void addRequest(Optional<TagSet> const& tags, int64_t bytes) {
|
||||
if (tags.present()) {
|
||||
TEST(true); // Tracking transaction tag in counter
|
||||
double cost = costFunction(bytes);
|
||||
for (auto& tag : tags.get()) {
|
||||
int64_t& count = intervalCounts[TransactionTag(tag, tags.get().getArena())];
|
||||
topTags.incrementCount(tag, count, cost);
|
||||
count += cost;
|
||||
}
|
||||
|
||||
intervalTotalSampledCount += cost;
|
||||
}
|
||||
}
|
||||
|
||||
void startNewInterval() {
|
||||
double elapsed = now() - intervalStart;
|
||||
previousBusiestTags.clear();
|
||||
if (intervalStart > 0 && CLIENT_KNOBS->READ_TAG_SAMPLE_RATE > 0 && elapsed > 0) {
|
||||
previousBusiestTags = topTags.getBusiestTags(elapsed, intervalTotalSampledCount);
|
||||
|
||||
TraceEvent("BusiestReadTag", thisServerID)
|
||||
.detail("Elapsed", elapsed)
|
||||
//.detail("Tag", printable(busiestTag))
|
||||
//.detail("TagCost", busiestTagCount)
|
||||
.detail("TotalSampledCost", intervalTotalSampledCount)
|
||||
.detail("Reported", previousBusiestTags.size())
|
||||
.trackLatest(busiestReadTagEventHolder->trackingKey);
|
||||
}
|
||||
|
||||
intervalCounts.clear();
|
||||
intervalTotalSampledCount = 0;
|
||||
topTags.clear();
|
||||
intervalStart = now();
|
||||
}
|
||||
|
||||
std::vector<StorageQueuingMetricsReply::TagInfo> const& getBusiestTags() const { return previousBusiestTags; }
|
||||
};
|
||||
|
||||
TransactionTagCounter::TransactionTagCounter(UID thisServerID)
|
||||
: impl(PImpl<TransactionTagCounterImpl>::create(thisServerID)) {}
|
||||
|
||||
TransactionTagCounter::~TransactionTagCounter() = default;
|
||||
|
||||
void TransactionTagCounter::addRequest(Optional<TagSet> const& tags, int64_t bytes) {
|
||||
return impl->addRequest(tags, bytes);
|
||||
}
|
||||
|
||||
void TransactionTagCounter::startNewInterval() {
|
||||
double elapsed = now() - intervalStart;
|
||||
previousBusiestTags.clear();
|
||||
if (intervalStart > 0 && CLIENT_KNOBS->READ_TAG_SAMPLE_RATE > 0 && elapsed > 0) {
|
||||
double rate = busiestTagCount / CLIENT_KNOBS->READ_TAG_SAMPLE_RATE / elapsed;
|
||||
if (rate > SERVER_KNOBS->MIN_TAG_READ_PAGES_RATE) {
|
||||
previousBusiestTags.emplace_back(busiestTag, rate, (double)busiestTagCount / intervalTotalSampledCount);
|
||||
}
|
||||
|
||||
TraceEvent("BusiestReadTag", thisServerID)
|
||||
.detail("Elapsed", elapsed)
|
||||
.detail("Tag", printable(busiestTag))
|
||||
.detail("TagCost", busiestTagCount)
|
||||
.detail("TotalSampledCost", intervalTotalSampledCount)
|
||||
.detail("Reported", !previousBusiestTags.empty())
|
||||
.trackLatest(busiestReadTagEventHolder->trackingKey);
|
||||
}
|
||||
|
||||
intervalCounts.clear();
|
||||
intervalTotalSampledCount = 0;
|
||||
busiestTagCount = 0;
|
||||
intervalStart = now();
|
||||
return impl->startNewInterval();
|
||||
}
|
||||
|
||||
std::vector<StorageQueuingMetricsReply::TagInfo> const& TransactionTagCounter::getBusiestTags() const {
|
||||
return impl->getBusiestTags();
|
||||
}
|
||||
|
||||
TEST_CASE("/TransactionTagCounter/TopKTags") {
|
||||
TopKTags topTags(2);
|
||||
|
||||
// Ensure that costs are larger enough to show up
|
||||
auto const costMultiplier =
|
||||
std::max<double>(1.0, 2 * SERVER_KNOBS->MIN_TAG_READ_PAGES_RATE * CLIENT_KNOBS->READ_TAG_SAMPLE_RATE);
|
||||
|
||||
ASSERT_EQ(topTags.getBusiestTags(1.0, 0).size(), 0);
|
||||
topTags.incrementCount("a"_sr, 0, 1 * costMultiplier);
|
||||
{
|
||||
auto const busiestTags = topTags.getBusiestTags(1.0, 1 * costMultiplier);
|
||||
ASSERT_EQ(busiestTags.size(), 1);
|
||||
ASSERT_EQ(std::count_if(busiestTags.begin(),
|
||||
busiestTags.end(),
|
||||
[](auto const& tagInfo) { return tagInfo.tag == "a"_sr; }),
|
||||
1);
|
||||
}
|
||||
topTags.incrementCount("b"_sr, 0, 2 * costMultiplier);
|
||||
topTags.incrementCount("c"_sr, 0, 3 * costMultiplier);
|
||||
{
|
||||
auto busiestTags = topTags.getBusiestTags(1.0, 6 * costMultiplier);
|
||||
ASSERT_EQ(busiestTags.size(), 2);
|
||||
ASSERT_EQ(std::count_if(busiestTags.begin(),
|
||||
busiestTags.end(),
|
||||
[](auto const& tagInfo) { return tagInfo.tag == "a"_sr; }),
|
||||
0);
|
||||
ASSERT_EQ(std::count_if(busiestTags.begin(),
|
||||
busiestTags.end(),
|
||||
[](auto const& tagInfo) { return tagInfo.tag == "b"_sr; }),
|
||||
1);
|
||||
ASSERT_EQ(std::count_if(busiestTags.begin(),
|
||||
busiestTags.end(),
|
||||
[](auto const& tagInfo) { return tagInfo.tag == "c"_sr; }),
|
||||
1);
|
||||
}
|
||||
topTags.incrementCount("a"_sr, 1 * costMultiplier, 3 * costMultiplier);
|
||||
{
|
||||
auto busiestTags = topTags.getBusiestTags(1.0, 9 * costMultiplier);
|
||||
ASSERT_EQ(busiestTags.size(), 2);
|
||||
ASSERT_EQ(std::count_if(busiestTags.begin(),
|
||||
busiestTags.end(),
|
||||
[](auto const& tagInfo) { return tagInfo.tag == "a"_sr; }),
|
||||
1);
|
||||
ASSERT_EQ(std::count_if(busiestTags.begin(),
|
||||
busiestTags.end(),
|
||||
[](auto const& tagInfo) { return tagInfo.tag == "b"_sr; }),
|
||||
0);
|
||||
ASSERT_EQ(std::count_if(busiestTags.begin(),
|
||||
busiestTags.end(),
|
||||
[](auto const& tagInfo) { return tagInfo.tag == "c"_sr; }),
|
||||
1);
|
||||
}
|
||||
topTags.clear();
|
||||
ASSERT_EQ(topTags.getBusiestTags(1.0, 0).size(), 0);
|
||||
return Void();
|
||||
}
|
||||
|
|
|
@ -148,7 +148,7 @@ class Ratekeeper {
|
|||
double lastWarning;
|
||||
double lastSSListFetchedTimestamp;
|
||||
|
||||
std::unique_ptr<class TagThrottler> tagThrottler;
|
||||
std::unique_ptr<class ITagThrottler> tagThrottler;
|
||||
|
||||
RatekeeperLimits normalLimits;
|
||||
RatekeeperLimits batchLimits;
|
||||
|
|
|
@ -42,7 +42,7 @@ class RkTagThrottleCollection : NonCopyable {
|
|||
bool rateSet = false;
|
||||
|
||||
RkTagThrottleData() : clientRate(CLIENT_KNOBS->TAG_THROTTLE_SMOOTHING_WINDOW) {}
|
||||
double getTargetRate(Optional<double> requestRate);
|
||||
double getTargetRate(Optional<double> requestRate) const;
|
||||
Optional<double> updateAndGetClientRate(Optional<double> requestRate);
|
||||
};
|
||||
|
||||
|
@ -83,7 +83,7 @@ public:
|
|||
void addRequests(TransactionTag const& tag, int requests);
|
||||
int64_t autoThrottleCount() const { return autoThrottledTags.size(); }
|
||||
int64_t manualThrottleCount() const;
|
||||
void updateBusyTagCount(TagThrottledReason);
|
||||
void incrementBusyTagCount(TagThrottledReason);
|
||||
auto getBusyReadTagCount() const { return busyReadTagCount; }
|
||||
auto getBusyWriteTagCount() const { return busyWriteTagCount; }
|
||||
};
|
||||
|
|
|
@ -23,32 +23,72 @@
|
|||
#include "fdbclient/PImpl.h"
|
||||
#include "fdbserver/Ratekeeper.h"
|
||||
|
||||
class TagThrottler {
|
||||
class ITagThrottler {
|
||||
public:
|
||||
virtual ~ITagThrottler() = default;
|
||||
|
||||
// Poll the system keyspace looking for updates made through the tag throttling API
|
||||
virtual Future<Void> monitorThrottlingChanges() = 0;
|
||||
|
||||
// Increment the number of known requests associated with the specified tag
|
||||
virtual void addRequests(TransactionTag tag, int count) = 0;
|
||||
|
||||
// This throttled tag change ID is used to coordinate updates with the GRV proxies
|
||||
virtual uint64_t getThrottledTagChangeId() const = 0;
|
||||
|
||||
// For each tag and priority combination, return the throughput limit and expiration time
|
||||
// Also, erase expired tags
|
||||
virtual PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() = 0;
|
||||
|
||||
virtual int64_t autoThrottleCount() const = 0;
|
||||
virtual uint32_t busyReadTagCount() const = 0;
|
||||
virtual uint32_t busyWriteTagCount() const = 0;
|
||||
virtual int64_t manualThrottleCount() const = 0;
|
||||
virtual bool isAutoThrottlingEnabled() const = 0;
|
||||
|
||||
// Based on the busiest read and write tags in the provided storage queue info, update
|
||||
// tag throttling limits.
|
||||
virtual Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const&) = 0;
|
||||
};
|
||||
|
||||
class TagThrottler : public ITagThrottler {
|
||||
PImpl<class TagThrottlerImpl> impl;
|
||||
|
||||
public:
|
||||
TagThrottler(Database db, UID id);
|
||||
~TagThrottler();
|
||||
|
||||
// Poll the system keyspace looking for updates made through the tag throttling API
|
||||
Future<Void> monitorThrottlingChanges();
|
||||
|
||||
// Increment the number of known requests associated with the specified tag
|
||||
void addRequests(TransactionTag tag, int count);
|
||||
|
||||
// This throttled tag change ID is used to coordinate updates with the GRV proxies
|
||||
uint64_t getThrottledTagChangeId() const;
|
||||
|
||||
// For each tag and priority combination, return the throughput limit and expiration time
|
||||
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates();
|
||||
|
||||
int64_t autoThrottleCount() const;
|
||||
uint32_t busyReadTagCount() const;
|
||||
uint32_t busyWriteTagCount() const;
|
||||
int64_t manualThrottleCount() const;
|
||||
bool isAutoThrottlingEnabled() const;
|
||||
|
||||
// Based on the busiest read and write tags in the provided storage queue info, update
|
||||
// tag throttling limits.
|
||||
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const&);
|
||||
Future<Void> monitorThrottlingChanges() override;
|
||||
void addRequests(TransactionTag tag, int count) override;
|
||||
uint64_t getThrottledTagChangeId() const override;
|
||||
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() override;
|
||||
int64_t autoThrottleCount() const override;
|
||||
uint32_t busyReadTagCount() const override;
|
||||
uint32_t busyWriteTagCount() const override;
|
||||
int64_t manualThrottleCount() const override;
|
||||
bool isAutoThrottlingEnabled() const override;
|
||||
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const&) override;
|
||||
};
|
||||
|
||||
class GlobalTagThrottler : public ITagThrottler {
|
||||
PImpl<class GlobalTagThrottlerImpl> impl;
|
||||
|
||||
public:
|
||||
GlobalTagThrottler(Database db, UID id);
|
||||
~GlobalTagThrottler();
|
||||
|
||||
Future<Void> monitorThrottlingChanges() override;
|
||||
void addRequests(TransactionTag tag, int count) override;
|
||||
uint64_t getThrottledTagChangeId() const override;
|
||||
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() override;
|
||||
int64_t autoThrottleCount() const override;
|
||||
uint32_t busyReadTagCount() const override;
|
||||
uint32_t busyWriteTagCount() const override;
|
||||
int64_t manualThrottleCount() const override;
|
||||
bool isAutoThrottlingEnabled() const override;
|
||||
Future<Void> tryUpdateAutoThrottling(StorageQueueInfo const&) override;
|
||||
|
||||
// testing only
|
||||
public:
|
||||
void setQuota(TransactionTagRef, ThrottleApi::TagQuotaValue const&);
|
||||
};
|
||||
|
|
|
@ -20,25 +20,23 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include "fdbclient/PImpl.h"
|
||||
#include "fdbclient/StorageServerInterface.h"
|
||||
#include "fdbclient/TagThrottle.actor.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
|
||||
class TransactionTagCounter {
|
||||
TransactionTagMap<int64_t> intervalCounts;
|
||||
int64_t intervalTotalSampledCount = 0;
|
||||
TransactionTag busiestTag;
|
||||
int64_t busiestTagCount = 0;
|
||||
double intervalStart = 0;
|
||||
|
||||
std::vector<StorageQueuingMetricsReply::TagInfo> previousBusiestTags;
|
||||
UID thisServerID;
|
||||
Reference<EventCacheHolder> busiestReadTagEventHolder;
|
||||
PImpl<class TransactionTagCounterImpl> impl;
|
||||
|
||||
public:
|
||||
TransactionTagCounter(UID thisServerID);
|
||||
static int64_t costFunction(int64_t bytes) { return bytes / SERVER_KNOBS->READ_COST_BYTE_FACTOR + 1; }
|
||||
~TransactionTagCounter();
|
||||
|
||||
// Update counters tracking the busyness of each tag in the current interval
|
||||
void addRequest(Optional<TagSet> const& tags, int64_t bytes);
|
||||
|
||||
// Save current set of busy tags and reset counters for next interval
|
||||
void startNewInterval();
|
||||
std::vector<StorageQueuingMetricsReply::TagInfo> const& getBusiestTags() const { return previousBusiestTags; }
|
||||
|
||||
// Returns the set of busiest tags as of the end of the last interval
|
||||
std::vector<StorageQueuingMetricsReply::TagInfo> const& getBusiestTags() const;
|
||||
};
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* GlobalTagThrottling.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/TagThrottle.actor.h"
|
||||
#include "fdbserver/workloads/workloads.actor.h"
|
||||
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
class GlobalTagThrottlingWorkload : public TestWorkload {
|
||||
TransactionTag transactionTag;
|
||||
double reservedReadQuota{ 0.0 };
|
||||
double totalReadQuota{ 0.0 };
|
||||
double reservedWriteQuota{ 0.0 };
|
||||
double totalWriteQuota{ 0.0 };
|
||||
|
||||
ACTOR static Future<Void> setup(GlobalTagThrottlingWorkload* self, Database cx) {
|
||||
state Reference<ReadYourWritesTransaction> tr = makeReference<ReadYourWritesTransaction>(cx);
|
||||
loop {
|
||||
try {
|
||||
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
|
||||
TraceEvent("GlobalTagThrottlingWorkload_SettingTagQuota")
|
||||
.detail("Tag", self->transactionTag)
|
||||
.detail("ReservedReadQuota", self->reservedReadQuota)
|
||||
.detail("TotalReadQuota", self->totalReadQuota)
|
||||
.detail("ReservedWriteQuota", self->reservedWriteQuota)
|
||||
.detail("TotalWriteQuota", self->totalWriteQuota);
|
||||
ThrottleApi::setTagQuota(tr,
|
||||
self->transactionTag,
|
||||
self->reservedReadQuota,
|
||||
self->totalReadQuota,
|
||||
self->reservedWriteQuota,
|
||||
self->totalWriteQuota);
|
||||
wait(tr->commit());
|
||||
return Void();
|
||||
} catch (Error& e) {
|
||||
wait(tr->onError(e));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public:
|
||||
explicit GlobalTagThrottlingWorkload(WorkloadContext const& wcx) : TestWorkload(wcx) {
|
||||
transactionTag = getOption(options, "transactionTag"_sr, "sampleTag"_sr);
|
||||
reservedReadQuota = getOption(options, "reservedReadQuota"_sr, 0.0);
|
||||
totalReadQuota = getOption(options, "totalReadQuota"_sr, 0.0);
|
||||
reservedWriteQuota = getOption(options, "reservedWriteQuota"_sr, 0.0);
|
||||
totalWriteQuota = getOption(options, "totalWriteQuota"_sr, 0.0);
|
||||
}
|
||||
|
||||
std::string description() const override { return "GlobalTagThrottling"; }
|
||||
Future<Void> setup(Database const& cx) override { return clientId ? Void() : setup(this, cx); }
|
||||
Future<Void> start(Database const& cx) override { return Void(); }
|
||||
Future<bool> check(Database const& cx) override { return true; }
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {}
|
||||
};
|
||||
|
||||
WorkloadFactory<GlobalTagThrottlingWorkload> GlobalTagThrottlingWorkloadFactory("GlobalTagThrottling");
|
|
@ -61,6 +61,7 @@ struct ReadWriteCommonImpl {
|
|||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR static Future<Void> tracePeriodically(ReadWriteCommon* self) {
|
||||
state double start = now();
|
||||
state double elapsed = 0.0;
|
||||
|
@ -376,6 +377,9 @@ struct ReadWriteWorkload : ReadWriteCommon {
|
|||
bool adjacentReads; // keys are adjacent within a transaction
|
||||
bool adjacentWrites;
|
||||
int extraReadConflictRangesPerTransaction, extraWriteConflictRangesPerTransaction;
|
||||
Optional<Key> transactionTag;
|
||||
|
||||
int transactionsTagThrottled{ 0 };
|
||||
|
||||
// hot traffic pattern
|
||||
double hotKeyFraction, forceHotProbability = 0; // key based hot traffic setting
|
||||
|
@ -397,6 +401,9 @@ struct ReadWriteWorkload : ReadWriteCommon {
|
|||
rampUpConcurrency = getOption(options, LiteralStringRef("rampUpConcurrency"), false);
|
||||
batchPriority = getOption(options, LiteralStringRef("batchPriority"), false);
|
||||
descriptionString = getOption(options, LiteralStringRef("description"), LiteralStringRef("ReadWrite"));
|
||||
if (hasOption(options, LiteralStringRef("transactionTag"))) {
|
||||
transactionTag = getOption(options, LiteralStringRef("transactionTag"), ""_sr);
|
||||
}
|
||||
|
||||
if (rampUpConcurrency)
|
||||
ASSERT(rampSweepCount == 2); // Implementation is hard coded to ramp up and down
|
||||
|
@ -415,15 +422,18 @@ struct ReadWriteWorkload : ReadWriteCommon {
|
|||
}
|
||||
}
|
||||
|
||||
std::string description() const override { return descriptionString.toString(); }
|
||||
|
||||
template <class Trans>
|
||||
void setupTransaction(Trans* tr) {
|
||||
void setupTransaction(Trans& tr) {
|
||||
if (batchPriority) {
|
||||
tr->setOption(FDBTransactionOptions::PRIORITY_BATCH);
|
||||
tr.setOption(FDBTransactionOptions::PRIORITY_BATCH);
|
||||
}
|
||||
if (transactionTag.present() && tr.getTags().size() == 0) {
|
||||
tr.setOption(FDBTransactionOptions::AUTO_THROTTLE_TAG, transactionTag.get());
|
||||
}
|
||||
}
|
||||
|
||||
std::string description() const override { return descriptionString.toString(); }
|
||||
|
||||
void getMetrics(std::vector<PerfMetric>& m) override {
|
||||
ReadWriteCommon::getMetrics(m);
|
||||
if (!rampUpLoad) {
|
||||
|
@ -449,6 +459,9 @@ struct ReadWriteWorkload : ReadWriteCommon {
|
|||
m.emplace_back("Mean Commit Latency (ms)", 1000 * commitLatencies.mean(), Averaged::True);
|
||||
m.emplace_back("Median Commit Latency (ms, averaged)", 1000 * commitLatencies.median(), Averaged::True);
|
||||
m.emplace_back("Max Commit Latency (ms, averaged)", 1000 * commitLatencies.max(), Averaged::True);
|
||||
if (transactionTag.present()) {
|
||||
m.emplace_back("Transaction Tag Throttled", transactionsTagThrottled, Averaged::False);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -494,11 +507,14 @@ struct ReadWriteWorkload : ReadWriteCommon {
|
|||
state Transaction tr(cx);
|
||||
|
||||
try {
|
||||
self->setupTransaction(&tr);
|
||||
self->setupTransaction(tr);
|
||||
wait(self->readOp(&tr, keys, self, false));
|
||||
wait(tr.warmRange(allKeys));
|
||||
break;
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_tag_throttled) {
|
||||
++self->transactionsTagThrottled;
|
||||
}
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
|
@ -625,7 +641,7 @@ struct ReadWriteWorkload : ReadWriteCommon {
|
|||
|
||||
loop {
|
||||
try {
|
||||
self->setupTransaction(&tr);
|
||||
self->setupTransaction(tr);
|
||||
|
||||
GRVStartTime = now();
|
||||
self->transactionFailureMetric->startLatency = -1;
|
||||
|
|
|
@ -196,7 +196,7 @@ ERROR( key_not_tuple, 2041, "The key cannot be parsed as a tuple" );
|
|||
ERROR( value_not_tuple, 2042, "The value cannot be parsed as a tuple" );
|
||||
ERROR( mapper_not_tuple, 2043, "The mapper cannot be parsed as a tuple" );
|
||||
ERROR( invalid_checkpoint_format, 2044, "Invalid checkpoint format" )
|
||||
|
||||
ERROR( invalid_throttle_quota_value, 2045, "Failed to deserialize or initialize throttle quota value" )
|
||||
|
||||
ERROR( incompatible_protocol_version, 2100, "Incompatible protocol version" )
|
||||
ERROR( transaction_too_large, 2101, "Transaction exceeds byte limit" )
|
||||
|
|
|
@ -208,6 +208,7 @@ if(WITH_PYTHON)
|
|||
add_fdb_test(TEST_FILES rare/CycleWithKills.toml)
|
||||
add_fdb_test(TEST_FILES rare/CycleWithDeadHall.toml)
|
||||
add_fdb_test(TEST_FILES rare/FuzzTest.toml)
|
||||
add_fdb_test(TEST_FILES rare/GlobalTagThrottling.toml IGNORE)
|
||||
add_fdb_test(TEST_FILES rare/HighContentionPrefixAllocator.toml)
|
||||
add_fdb_test(TEST_FILES rare/InventoryTestHeavyWrites.toml)
|
||||
add_fdb_test(TEST_FILES rare/LargeApiCorrectness.toml)
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
[[test]]
|
||||
testTitle='GlobalTagThrottling'
|
||||
|
||||
[[test.knobs]]
|
||||
min_tag_read_pages_rate=1.0
|
||||
global_tag_throttling=true
|
||||
|
||||
[[test.workload]]
|
||||
testName='GlobalTagThrottling'
|
||||
transactionTag='sampleTag1'
|
||||
totalReadQuota=1.0
|
||||
|
||||
[[test.workload]]
|
||||
testName='ReadWrite'
|
||||
testDuration=600.0
|
||||
transactionsPerSecond=100
|
||||
writesPerTransactionA=0
|
||||
readsPerTransactionA=10
|
||||
writesPerTransactionB=0
|
||||
readsPerTransactionB=0
|
||||
alpha=0.0
|
||||
nodeCount=10000
|
||||
valueBytes=1000
|
||||
minValueBytes=1000
|
||||
warmingDelay=60.0
|
||||
transactionTag='sampleTag1'
|
||||
|
||||
[[test.workload]]
|
||||
testName='ReadWrite'
|
||||
testDuration=600.0
|
||||
transactionsPerSecond=100
|
||||
writesPerTransactionA=0
|
||||
readsPerTransactionA=10
|
||||
writesPerTransactionB=0
|
||||
readsPerTransactionB=0
|
||||
alpha=0.0
|
||||
nodeCount=10000
|
||||
valueBytes=1000
|
||||
minValueBytes=1000
|
||||
warmingDelay=60.0
|
||||
transactionTag='sampleTag2'
|
Loading…
Reference in New Issue