Merge pull request #3669 from sfc-gh-xwang/tag-report
Report recommended tx tag to be throttled to status json
This commit is contained in:
commit
e7662eecda
|
@ -456,16 +456,20 @@ disable
|
|||
|
||||
``throttle disable auto``
|
||||
|
||||
Disables cluster auto-throttling for busy transaction tags. This does not disable any currently active throttles. To do so, run the following command after disabling auto-throttling::
|
||||
|
||||
> throttle off auto
|
||||
Disables cluster auto-throttling for busy transaction tags. This may not disable currently active throttles immediately, seconds of delay is expected.
|
||||
|
||||
list
|
||||
^^^^
|
||||
|
||||
``throttle list [LIMIT]``
|
||||
``throttle list [throttled|recommended|all] [LIMIT]``
|
||||
|
||||
Prints a list of currently active transaction tag throttles.
|
||||
Prints a list of currently active transaction tag throttles, or recommended transaction tag throttles if auto-throttling is disabled.
|
||||
|
||||
``throttled`` - list active transaction tag throttles.
|
||||
|
||||
``recommended`` - list transaction tag throttles recommended by the ratekeeper, but not active yet.
|
||||
|
||||
``all`` - list both active and recommended transaction tag throttles.
|
||||
|
||||
``LIMIT`` - The number of throttles to print. Defaults to 100.
|
||||
|
||||
|
|
|
@ -313,11 +313,18 @@
|
|||
"batch_released_transactions_per_second":0,
|
||||
"released_transactions_per_second":0,
|
||||
"throttled_tags":{
|
||||
"auto":{
|
||||
"count":0
|
||||
"auto" : {
|
||||
"busy_read" : 0,
|
||||
"busy_write" : 0,
|
||||
"count" : 0
|
||||
},
|
||||
"manual":{
|
||||
"count":0
|
||||
"manual" : {
|
||||
"count" : 1
|
||||
},
|
||||
"recommend" : {
|
||||
"busy_read" : 0,
|
||||
"busy_write" : 0,
|
||||
"count" : 0
|
||||
}
|
||||
},
|
||||
"limiting_queue_bytes_storage_server":0,
|
||||
|
|
|
@ -132,3 +132,13 @@ log_server_min_free_space Log server running out of space (approaching
|
|||
log_server_min_free_space_ratio Log server running out of space (approaching 5% limit).
|
||||
storage_server_durability_lag Storage server durable version falling behind.
|
||||
=================================== ====================================================
|
||||
|
||||
The JSON path ``cluster.qos.throttled_tags``, when it exists, is an Object containing ``"auto"`` , ``"manual"`` and ``"recommended"``. The possible fields for those object are in the following table:
|
||||
|
||||
=================================== ====================================================
|
||||
Name Description
|
||||
=================================== ====================================================
|
||||
count How many tags are throttled
|
||||
busy_read How many tags are throttled because of busy read
|
||||
busy_write How many tags are throttled because of busy write
|
||||
=================================== ====================================================
|
|
@ -2541,6 +2541,16 @@ void throttleGenerator(const char* text, const char *line, std::vector<std::stri
|
|||
const char* opts[] = { "auto", nullptr };
|
||||
arrayGenerator(text, line, opts, lc);
|
||||
}
|
||||
else if(tokens.size() >= 2 && tokencmp(tokens[1], "list")) {
|
||||
if(tokens.size() == 2) {
|
||||
const char* opts[] = { "throttled", "recommended", "all", nullptr };
|
||||
arrayGenerator(text, line, opts, lc);
|
||||
}
|
||||
else if(tokens.size() == 3) {
|
||||
const char* opts[] = {"LIMITS", nullptr};
|
||||
arrayGenerator(text, line, opts, lc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void fdbcliCompCmd(std::string const& text, std::vector<std::string>& lc) {
|
||||
|
@ -2661,6 +2671,14 @@ std::vector<const char*> throttleHintGenerator(std::vector<StringRef> const& tok
|
|||
else if((tokencmp(tokens[1], "enable") || tokencmp(tokens[1], "disable")) && tokens.size() == 2) {
|
||||
return { "auto" };
|
||||
}
|
||||
else if(tokens.size() >= 2 && tokencmp(tokens[1], "list")) {
|
||||
if(tokens.size() == 2) {
|
||||
return { "[throttled|recommended|all]", "[LIMITS]" };
|
||||
}
|
||||
else if(tokens.size() == 3 && (tokencmp(tokens[2], "throttled") || tokencmp(tokens[2], "recommended") || tokencmp(tokens[2], "all"))){
|
||||
return {"[LIMITS]"};
|
||||
}
|
||||
}
|
||||
else if(tokens.size() == 2 && inArgument) {
|
||||
return { "[ARGS]" };
|
||||
}
|
||||
|
@ -4077,8 +4095,8 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
continue;
|
||||
}
|
||||
else if(tokencmp(tokens[1], "list")) {
|
||||
if(tokens.size() > 3) {
|
||||
printf("Usage: throttle list [LIMIT]\n");
|
||||
if(tokens.size() > 4) {
|
||||
printf("Usage: throttle list [throttled|recommended|all] [LIMIT]\n");
|
||||
printf("\n");
|
||||
printf("Lists tags that are currently throttled.\n");
|
||||
printf("The default LIMIT is 100 tags.\n");
|
||||
|
@ -4086,36 +4104,72 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
continue;
|
||||
}
|
||||
|
||||
state int throttleListLimit = 100;
|
||||
state bool reportThrottled = true;
|
||||
state bool reportRecommended = false;
|
||||
if(tokens.size() >= 3) {
|
||||
char *end;
|
||||
throttleListLimit = std::strtol((const char*)tokens[2].begin(), &end, 10);
|
||||
if ((tokens.size() > 3 && !std::isspace(*end)) || (tokens.size() == 3 && *end != '\0')) {
|
||||
printf("ERROR: failed to parse limit `%s'.\n", printable(tokens[2]).c_str());
|
||||
if(tokencmp(tokens[2], "recommended")) {
|
||||
reportThrottled = false; reportRecommended = true;
|
||||
}
|
||||
else if(tokencmp(tokens[2], "all")){
|
||||
reportThrottled = true; reportRecommended = true;
|
||||
}
|
||||
else if(!tokencmp(tokens[2], "throttled")){
|
||||
printf("ERROR: failed to parse `%s'.\n", printable(tokens[2]).c_str());
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<TagThrottleInfo> tags = wait(ThrottleApi::getThrottledTags(db, throttleListLimit));
|
||||
state int throttleListLimit = 100;
|
||||
if(tokens.size() >= 4) {
|
||||
char *end;
|
||||
throttleListLimit = std::strtol((const char*)tokens[3].begin(), &end, 10);
|
||||
if ((tokens.size() > 4 && !std::isspace(*end)) || (tokens.size() == 4 && *end != '\0')) {
|
||||
printf("ERROR: failed to parse limit `%s'.\n", printable(tokens[3]).c_str());
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
state std::vector<TagThrottleInfo> tags;
|
||||
if(reportThrottled && reportRecommended) {
|
||||
wait(store(tags, ThrottleApi::getThrottledTags(db, throttleListLimit, true)));
|
||||
}
|
||||
else if(reportThrottled) {
|
||||
wait(store(tags, ThrottleApi::getThrottledTags(db, throttleListLimit)));
|
||||
}
|
||||
else if(reportRecommended) {
|
||||
wait(store(tags, ThrottleApi::getRecommendedTags(db, throttleListLimit)));
|
||||
}
|
||||
|
||||
bool anyLogged = false;
|
||||
for(auto itr = tags.begin(); itr != tags.end(); ++itr) {
|
||||
if(itr->expirationTime > now()) {
|
||||
if(!anyLogged) {
|
||||
printf("Throttled tags:\n\n");
|
||||
printf(" Rate (txn/s) | Expiration (s) | Priority | Type | Tag\n");
|
||||
printf(" --------------+----------------+-----------+--------+------------------\n");
|
||||
printf(" Rate (txn/s) | Expiration (s) | Priority | Type | Reason |Tag\n");
|
||||
printf(" --------------+----------------+-----------+--------+------------+------\n");
|
||||
|
||||
anyLogged = true;
|
||||
}
|
||||
|
||||
printf(" %12d | %13ds | %9s | %6s | %s\n",
|
||||
(int)(itr->tpsRate),
|
||||
std::min((int)(itr->expirationTime-now()), (int)(itr->initialDuration)),
|
||||
transactionPriorityToString(itr->priority, false),
|
||||
itr->throttleType == TagThrottleType::AUTO ? "auto" : "manual",
|
||||
itr->tag.toString().c_str());
|
||||
std::string reasonStr = "unset";
|
||||
if(itr->reason == TagThrottledReason::MANUAL){
|
||||
reasonStr = "manual";
|
||||
}
|
||||
else if(itr->reason == TagThrottledReason::BUSY_WRITE) {
|
||||
reasonStr = "busy write";
|
||||
}
|
||||
else if(itr->reason == TagThrottledReason::BUSY_READ) {
|
||||
reasonStr = "busy read";
|
||||
}
|
||||
|
||||
printf(" %12d | %13ds | %9s | %6s | %10s |%s\n", (int)(itr->tpsRate),
|
||||
std::min((int)(itr->expirationTime - now()), (int)(itr->initialDuration)),
|
||||
transactionPriorityToString(itr->priority, false),
|
||||
itr->throttleType == TagThrottleType::AUTO ? "auto" : "manual",
|
||||
reasonStr.c_str(),
|
||||
itr->tag.toString().c_str());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4124,7 +4178,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
printf("Usage: throttle list [LIMIT]\n");
|
||||
}
|
||||
if(!anyLogged) {
|
||||
printf("There are no throttled tags\n");
|
||||
printf("There are no %s tags\n", reportThrottled ? "throttled" : "recommended");
|
||||
}
|
||||
}
|
||||
else if(tokencmp(tokens[1], "on")) {
|
||||
|
|
|
@ -110,6 +110,31 @@ TagThrottleValue TagThrottleValue::fromValue(const ValueRef& value) {
|
|||
}
|
||||
|
||||
namespace ThrottleApi {
|
||||
ACTOR Future<bool> getValidAutoEnabled(Transaction* tr, Database db) {
|
||||
state bool result;
|
||||
loop {
|
||||
Optional<Value> value = wait(tr->get(tagThrottleAutoEnabledKey));
|
||||
if(!value.present()) {
|
||||
tr->reset();
|
||||
wait(delay(CLIENT_KNOBS->DEFAULT_BACKOFF));
|
||||
continue;
|
||||
}
|
||||
else if(value.get() == LiteralStringRef("1")) {
|
||||
result = true;
|
||||
}
|
||||
else if(value.get() == LiteralStringRef("0")) {
|
||||
result = false;
|
||||
}
|
||||
else {
|
||||
TraceEvent(SevWarnAlways, "InvalidAutoTagThrottlingValue", db->dbId).detail("Value", value.get());
|
||||
tr->reset();
|
||||
wait(delay(CLIENT_KNOBS->DEFAULT_BACKOFF));
|
||||
continue;
|
||||
}
|
||||
return result;
|
||||
};
|
||||
}
|
||||
|
||||
void signalThrottleChange(Transaction &tr) {
|
||||
tr.atomicOp(tagThrottleSignalKey, LiteralStringRef("XXXXXXXXXX\x00\x00\x00\x00"), MutationRef::SetVersionstampedValue);
|
||||
}
|
||||
|
@ -146,12 +171,16 @@ namespace ThrottleApi {
|
|||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<std::vector<TagThrottleInfo>> getThrottledTags(Database db, int limit) {
|
||||
ACTOR Future<std::vector<TagThrottleInfo>> getThrottledTags(Database db, int limit, bool containsRecommend) {
|
||||
state Transaction tr(db);
|
||||
|
||||
state bool reportAuto = containsRecommend;
|
||||
loop {
|
||||
try {
|
||||
Standalone<RangeResultRef> throttles = wait(tr.getRange(tagThrottleKeys, limit));
|
||||
if (!containsRecommend) {
|
||||
wait(store(reportAuto, getValidAutoEnabled(&tr, db)));
|
||||
}
|
||||
Standalone<RangeResultRef> throttles = wait(tr.getRange(
|
||||
reportAuto ? tagThrottleKeys : KeyRangeRef(tagThrottleKeysPrefix, tagThrottleAutoKeysPrefix), limit));
|
||||
std::vector<TagThrottleInfo> results;
|
||||
for(auto throttle : throttles) {
|
||||
results.push_back(TagThrottleInfo(TagThrottleKey::fromKey(throttle.key), TagThrottleValue::fromValue(throttle.value)));
|
||||
|
@ -164,13 +193,41 @@ namespace ThrottleApi {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> throttleTags(Database db, TagSet tags, double tpsRate, double initialDuration, TagThrottleType throttleType, TransactionPriority priority, Optional<double> expirationTime) {
|
||||
ACTOR Future<std::vector<TagThrottleInfo>> getRecommendedTags(Database db, int limit) {
|
||||
state Transaction tr(db);
|
||||
loop {
|
||||
try {
|
||||
bool enableAuto = wait(getValidAutoEnabled(&tr, db));
|
||||
if(enableAuto) {
|
||||
return std::vector<TagThrottleInfo>();
|
||||
}
|
||||
|
||||
Standalone<RangeResultRef> throttles = wait(tr.getRange(KeyRangeRef(tagThrottleAutoKeysPrefix, tagThrottleKeys.end), limit));
|
||||
std::vector<TagThrottleInfo> results;
|
||||
for(auto throttle : throttles) {
|
||||
results.push_back(TagThrottleInfo(TagThrottleKey::fromKey(throttle.key), TagThrottleValue::fromValue(throttle.value)));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
catch(Error& e) {
|
||||
wait(tr.onError(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> throttleTags(Database db, TagSet tags, double tpsRate, double initialDuration,
|
||||
TagThrottleType throttleType, TransactionPriority priority, Optional<double> expirationTime,
|
||||
Optional<TagThrottledReason> reason) {
|
||||
state Transaction tr(db);
|
||||
state Key key = TagThrottleKey(tags, throttleType, priority).toKey();
|
||||
|
||||
ASSERT(initialDuration > 0);
|
||||
|
||||
TagThrottleValue throttle(tpsRate, expirationTime.present() ? expirationTime.get() : 0, initialDuration);
|
||||
if(throttleType == TagThrottleType::MANUAL) {
|
||||
reason = TagThrottledReason::MANUAL;
|
||||
}
|
||||
TagThrottleValue throttle(tpsRate, expirationTime.present() ? expirationTime.get() : 0, initialDuration,
|
||||
reason.present() ? reason.get() : TagThrottledReason::UNSET);
|
||||
BinaryWriter wr(IncludeVersion(ProtocolVersion::withTagThrottleValue()));
|
||||
wr << throttle;
|
||||
state Value value = wr.toValue();
|
||||
|
|
|
@ -115,6 +115,13 @@ enum class TagThrottleType : uint8_t {
|
|||
AUTO
|
||||
};
|
||||
|
||||
enum class TagThrottledReason: uint8_t {
|
||||
UNSET = 0,
|
||||
MANUAL,
|
||||
BUSY_READ,
|
||||
BUSY_WRITE
|
||||
};
|
||||
|
||||
struct TagThrottleKey {
|
||||
TagSet tags;
|
||||
TagThrottleType throttleType;
|
||||
|
@ -132,17 +139,26 @@ struct TagThrottleValue {
|
|||
double tpsRate;
|
||||
double expirationTime;
|
||||
double initialDuration;
|
||||
TagThrottledReason reason;
|
||||
|
||||
TagThrottleValue() : tpsRate(0), expirationTime(0), initialDuration(0) {}
|
||||
TagThrottleValue(double tpsRate, double expirationTime, double initialDuration)
|
||||
: tpsRate(tpsRate), expirationTime(expirationTime), initialDuration(initialDuration) {}
|
||||
TagThrottleValue() : tpsRate(0), expirationTime(0), initialDuration(0), reason(TagThrottledReason::UNSET) {}
|
||||
TagThrottleValue(double tpsRate, double expirationTime, double initialDuration, TagThrottledReason reason)
|
||||
: tpsRate(tpsRate), expirationTime(expirationTime), initialDuration(initialDuration), reason(reason) {}
|
||||
|
||||
static TagThrottleValue fromValue(const ValueRef& value);
|
||||
|
||||
//To change this serialization, ProtocolVersion::TagThrottleValue must be updated, and downgrades need to be considered
|
||||
template<class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, tpsRate, expirationTime, initialDuration);
|
||||
if(ar.protocolVersion().hasTagThrottleValueReason()) {
|
||||
serializer(ar, tpsRate, expirationTime, initialDuration, reinterpret_cast<uint8_t&>(reason));
|
||||
}
|
||||
else if(ar.protocolVersion().hasTagThrottleValue()) {
|
||||
serializer(ar, tpsRate, expirationTime, initialDuration);
|
||||
if(ar.isDeserializing) {
|
||||
reason = TagThrottledReason::UNSET;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -153,12 +169,13 @@ struct TagThrottleInfo {
|
|||
double tpsRate;
|
||||
double expirationTime;
|
||||
double initialDuration;
|
||||
TagThrottledReason reason;
|
||||
|
||||
TagThrottleInfo(TransactionTag tag, TagThrottleType throttleType, TransactionPriority priority, double tpsRate, double expirationTime, double initialDuration)
|
||||
: tag(tag), throttleType(throttleType), priority(priority), tpsRate(tpsRate), expirationTime(expirationTime), initialDuration(initialDuration) {}
|
||||
TagThrottleInfo(TransactionTag tag, TagThrottleType throttleType, TransactionPriority priority, double tpsRate, double expirationTime, double initialDuration, TagThrottledReason reason = TagThrottledReason::UNSET)
|
||||
: tag(tag), throttleType(throttleType), priority(priority), tpsRate(tpsRate), expirationTime(expirationTime), initialDuration(initialDuration), reason(reason) {}
|
||||
|
||||
TagThrottleInfo(TagThrottleKey key, TagThrottleValue value)
|
||||
: throttleType(key.throttleType), priority(key.priority), tpsRate(value.tpsRate), expirationTime(value.expirationTime), initialDuration(value.initialDuration)
|
||||
TagThrottleInfo(TagThrottleKey key, TagThrottleValue value)
|
||||
: throttleType(key.throttleType), priority(key.priority), tpsRate(value.tpsRate), expirationTime(value.expirationTime), initialDuration(value.initialDuration), reason(value.reason)
|
||||
{
|
||||
ASSERT(key.tags.size() == 1); // Multiple tags per throttle is not currently supported
|
||||
tag = *key.tags.begin();
|
||||
|
@ -166,10 +183,12 @@ struct TagThrottleInfo {
|
|||
};
|
||||
|
||||
namespace ThrottleApi {
|
||||
Future<std::vector<TagThrottleInfo>> getThrottledTags(Database const& db, int const& limit);
|
||||
Future<std::vector<TagThrottleInfo>> getThrottledTags(Database const& db, int const& limit, bool const& containsRecommend = false);
|
||||
Future<std::vector<TagThrottleInfo>> getRecommendedTags(Database const& db, int const& limit);
|
||||
|
||||
Future<Void> throttleTags(Database const& db, TagSet const& tags, double const& tpsRate, double const& initialDuration,
|
||||
TagThrottleType const& throttleType, TransactionPriority const& priority, Optional<double> const& expirationTime = Optional<double>());
|
||||
TagThrottleType const& throttleType, TransactionPriority const& priority, Optional<double> const& expirationTime = Optional<double>(),
|
||||
Optional<TagThrottledReason> const& reason = Optional<TagThrottledReason>());
|
||||
|
||||
Future<bool> unthrottleTags(Database const& db, TagSet const& tags, Optional<TagThrottleType> const& throttleType, Optional<TransactionPriority> const& priority);
|
||||
|
||||
|
|
|
@ -338,7 +338,7 @@ public:
|
|||
return Optional<ClientTagThrottleLimits>();
|
||||
}
|
||||
|
||||
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates() {
|
||||
PrioritizedTransactionTagMap<ClientTagThrottleLimits> getClientRates(bool autoThrottlingEnabled) {
|
||||
PrioritizedTransactionTagMap<ClientTagThrottleLimits> clientRates;
|
||||
|
||||
for(auto tagItr = tagData.begin(); tagItr != tagData.end();) {
|
||||
|
@ -401,14 +401,18 @@ public:
|
|||
}
|
||||
|
||||
tagPresent = true;
|
||||
auto result = clientRates[TransactionPriority::DEFAULT].try_emplace(tagItr->first, adjustedRate, autoItr->second.limits.expiration);
|
||||
if(!result.second && result.first->second.tpsRate > adjustedRate) {
|
||||
result.first->second = ClientTagThrottleLimits(adjustedRate, autoItr->second.limits.expiration);
|
||||
if (autoThrottlingEnabled) {
|
||||
auto result = clientRates[TransactionPriority::DEFAULT].try_emplace(
|
||||
tagItr->first, adjustedRate, autoItr->second.limits.expiration);
|
||||
if (!result.second && result.first->second.tpsRate > adjustedRate) {
|
||||
result.first->second =
|
||||
ClientTagThrottleLimits(adjustedRate, autoItr->second.limits.expiration);
|
||||
} else {
|
||||
TEST(true); // Auto throttle overriden by manual throttle
|
||||
}
|
||||
clientRates[TransactionPriority::BATCH][tagItr->first] =
|
||||
ClientTagThrottleLimits(0, autoItr->second.limits.expiration);
|
||||
}
|
||||
else {
|
||||
TEST(true); // Auto throttle overriden by manual throttle
|
||||
}
|
||||
clientRates[TransactionPriority::BATCH][tagItr->first] = ClientTagThrottleLimits(0, autoItr->second.limits.expiration);
|
||||
}
|
||||
else {
|
||||
ASSERT(autoItr->second.limits.expiration <= now());
|
||||
|
@ -481,6 +485,7 @@ public:
|
|||
TransactionTagMap<RkTagThrottleData> autoThrottledTags;
|
||||
TransactionTagMap<std::map<TransactionPriority, RkTagThrottleData>> manualThrottledTags;
|
||||
TransactionTagMap<RkTagData> tagData;
|
||||
uint32_t busyReadTagCount = 0, busyWriteTagCount = 0;
|
||||
};
|
||||
|
||||
struct RatekeeperLimits {
|
||||
|
@ -787,6 +792,8 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
|
|||
TraceEvent(SevWarnAlways, "InvalidAutoTagThrottlingValue", self->id).detail("Value", autoThrottlingEnabled.get().get());
|
||||
}
|
||||
self->autoThrottlingEnabled = SERVER_KNOBS->AUTO_TAG_THROTTLING_ENABLED;
|
||||
if(!committed)
|
||||
tr.set(tagThrottleAutoEnabledKey, LiteralStringRef(self->autoThrottlingEnabled ? "1" : "0"));
|
||||
}
|
||||
|
||||
RkTagThrottleCollection updatedTagThrottles;
|
||||
|
@ -814,6 +821,12 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
|
|||
|
||||
if(tagKey.throttleType == TagThrottleType::AUTO) {
|
||||
updatedTagThrottles.autoThrottleTag(self->id, tag, 0, tagValue.tpsRate, tagValue.expirationTime);
|
||||
if(tagValue.reason == TagThrottledReason::BUSY_READ){
|
||||
updatedTagThrottles.busyReadTagCount ++;
|
||||
}
|
||||
else if(tagValue.reason == TagThrottledReason::BUSY_WRITE) {
|
||||
updatedTagThrottles.busyWriteTagCount ++;
|
||||
}
|
||||
}
|
||||
else {
|
||||
updatedTagThrottles.manualThrottleTag(self->id, tag, tagKey.priority, tagValue.tpsRate, tagValue.expirationTime, oldLimits);
|
||||
|
@ -849,7 +862,10 @@ void tryAutoThrottleTag(RatekeeperData *self, StorageQueueInfo const& ss) {
|
|||
TagSet tags;
|
||||
tags.addTag(ss.busiestTag.get());
|
||||
|
||||
self->addActor.send(ThrottleApi::throttleTags(self->db, tags, clientRate.get(), SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, TagThrottleType::AUTO, TransactionPriority::DEFAULT, now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION));
|
||||
self->addActor.send(ThrottleApi::throttleTags(
|
||||
self->db, tags, clientRate.get(), SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, TagThrottleType::AUTO,
|
||||
TransactionPriority::DEFAULT, now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION,
|
||||
TagThrottledReason::BUSY_READ));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1197,7 +1213,10 @@ void updateRate(RatekeeperData* self, RatekeeperLimits* limits) {
|
|||
.detail("WorstStorageServerDurabilityLag", worstDurabilityLag)
|
||||
.detail("LimitingStorageServerDurabilityLag", limitingDurabilityLag)
|
||||
.detail("TagsAutoThrottled", self->throttledTags.autoThrottleCount())
|
||||
.detail("TagsAutoThrottledBusyRead", self->throttledTags.busyReadTagCount)
|
||||
.detail("TagsAutoThrottledBusyWrite", self->throttledTags.busyWriteTagCount)
|
||||
.detail("TagsManuallyThrottled", self->throttledTags.manualThrottleCount())
|
||||
.detail("AutoThrottlingEnabled", self->autoThrottlingEnabled)
|
||||
.trackLatest(name);
|
||||
}
|
||||
}
|
||||
|
@ -1307,7 +1326,7 @@ ACTOR Future<Void> ratekeeper(RatekeeperInterface rkInterf, Reference<AsyncVar<S
|
|||
p.lastThrottledTagChangeId = self.throttledTagChangeId;
|
||||
p.lastTagPushTime = now();
|
||||
|
||||
reply.throttledTags = self.throttledTags.getClientRates();
|
||||
reply.throttledTags = self.throttledTags.getClientRates(self.autoThrottlingEnabled);
|
||||
TEST(reply.throttledTags.present() && reply.throttledTags.get().size() > 0); // Returning tag throttles to a proxy
|
||||
}
|
||||
|
||||
|
|
|
@ -1745,11 +1745,14 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
|
|||
state TraceEventFields ratekeeper = wait( timeoutError(rkWorker.interf.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdate") ) ), 1.0) );
|
||||
TraceEventFields batchRatekeeper = wait( timeoutError(rkWorker.interf.eventLogRequest.getReply( EventLogRequest(LiteralStringRef("RkUpdateBatch") ) ), 1.0) );
|
||||
|
||||
bool autoThrottlingEnabled = ratekeeper.getInt("AutoThrottlingEnabled");
|
||||
double tpsLimit = ratekeeper.getDouble("TPSLimit");
|
||||
double batchTpsLimit = batchRatekeeper.getDouble("TPSLimit");
|
||||
double transPerSec = ratekeeper.getDouble("ReleasedTPS");
|
||||
double batchTransPerSec = ratekeeper.getDouble("ReleasedBatchTPS");
|
||||
int autoThrottledTags = ratekeeper.getInt("TagsAutoThrottled");
|
||||
int autoThrottledTagsBusyRead = ratekeeper.getInt("TagsAutoThrottledBusyRead");
|
||||
int autoThrottledTagsBusyWrite = ratekeeper.getInt("TagsAutoThrottledBusyWrite");
|
||||
int manualThrottledTags = ratekeeper.getInt("TagsManuallyThrottled");
|
||||
int ssCount = ratekeeper.getInt("StorageServers");
|
||||
int tlogCount = ratekeeper.getInt("TLogs");
|
||||
|
@ -1779,9 +1782,28 @@ ACTOR static Future<JsonBuilderObject> workloadStatusFetcher(Reference<AsyncVar<
|
|||
(*qos)["batch_released_transactions_per_second"] = batchTransPerSec;
|
||||
|
||||
JsonBuilderObject throttledTagsObj;
|
||||
JsonBuilderObject autoThrottledTagsObj;
|
||||
autoThrottledTagsObj["count"] = autoThrottledTags;
|
||||
JsonBuilderObject autoThrottledTagsObj, recommendThrottleTagsObj;
|
||||
if(autoThrottlingEnabled) {
|
||||
autoThrottledTagsObj["count"] = autoThrottledTags;
|
||||
autoThrottledTagsObj["busy_read"] = autoThrottledTagsBusyRead;
|
||||
autoThrottledTagsObj["busy_write"] = autoThrottledTagsBusyWrite;
|
||||
|
||||
recommendThrottleTagsObj["count"] = 0;
|
||||
recommendThrottleTagsObj["busy_read"] = 0;
|
||||
recommendThrottleTagsObj["busy_write"] = 0;
|
||||
}
|
||||
else {
|
||||
recommendThrottleTagsObj["count"] = autoThrottledTags;
|
||||
recommendThrottleTagsObj["busy_read"] = autoThrottledTagsBusyRead;
|
||||
recommendThrottleTagsObj["busy_write"] = autoThrottledTagsBusyWrite;
|
||||
|
||||
autoThrottledTagsObj["count"] = 0;
|
||||
autoThrottledTagsObj["busy_read"] = 0;
|
||||
autoThrottledTagsObj["busy_write"] = 0;
|
||||
}
|
||||
|
||||
throttledTagsObj["auto"] = autoThrottledTagsObj;
|
||||
throttledTagsObj["recommend"] = recommendThrottleTagsObj;
|
||||
|
||||
JsonBuilderObject manualThrottledTagsObj;
|
||||
manualThrottledTagsObj["count"] = manualThrottledTags;
|
||||
|
|
|
@ -128,6 +128,7 @@ public: // introduced features
|
|||
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, ReportConflictingKeys);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, SmallEndpoints);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B063010000LL, CacheRole);
|
||||
PROTOCOL_VERSION_FEATURE(0x0FDB00B070010001LL, TagThrottleValueReason);
|
||||
};
|
||||
|
||||
// These impact both communications and the deserialization of certain database and IKeyValueStore keys.
|
||||
|
|
Loading…
Reference in New Issue