Support throttling and unthrottling tags by priority and their auto/manual state in fdbcli.
This commit is contained in:
parent
f1477b09e9
commit
14b23c146f
|
@ -2509,12 +2509,18 @@ void throttleGenerator(const char* text, const char *line, std::vector<std::stri
|
|||
const char* opts[] = { "on tag", "off", "enable auto", "disable auto", "list", nullptr };
|
||||
arrayGenerator(text, line, opts, lc);
|
||||
}
|
||||
else if(tokens.size() == 2 && tokencmp(tokens[1], "on")) {
|
||||
const char* opts[] = { "tag", nullptr };
|
||||
arrayGenerator(text, line, opts, lc);
|
||||
else if(tokens.size() >= 2 && tokencmp(tokens[1], "on")) {
|
||||
if(tokens.size() == 2) {
|
||||
const char* opts[] = { "tag", nullptr };
|
||||
arrayGenerator(text, line, opts, lc);
|
||||
}
|
||||
else if(tokens.size() == 6) {
|
||||
const char* opts[] = { "default", "immediate", "batch", nullptr };
|
||||
arrayGenerator(text, line, opts, lc);
|
||||
}
|
||||
}
|
||||
else if(tokens.size() == 2 && tokencmp(tokens[1], "off")) {
|
||||
const char* opts[] = { "all", "auto", "manual", "tag", nullptr };
|
||||
else if(tokens.size() >= 2 && tokencmp(tokens[1], "off") && !tokencmp(tokens[tokens.size()-1], "tag")) {
|
||||
const char* opts[] = { "all", "auto", "manual", "tag", "default", "immediate", "batch", nullptr };
|
||||
arrayGenerator(text, line, opts, lc);
|
||||
}
|
||||
else if(tokens.size() == 2 && tokencmp(tokens[1], "enable") || tokencmp(tokens[1], "disable")) {
|
||||
|
@ -2592,23 +2598,50 @@ std::vector<const char*> throttleHintGenerator(std::vector<StringRef> const& tok
|
|||
return { "<on|off|enable auto|disable auto|list>", "[ARGS]" };
|
||||
}
|
||||
else if(tokencmp(tokens[1], "on")) {
|
||||
std::vector<const char*> opts = { "tag", "<TAG>", "[RATE]", "[DURATION]" };
|
||||
std::vector<const char*> opts = { "tag", "<TAG>", "[RATE]", "[DURATION]", "[default|immediate|batch]" };
|
||||
if(tokens.size() == 2) {
|
||||
return opts;
|
||||
}
|
||||
else if(((tokens.size() == 3 && inArgument) || tokencmp(tokens[2], "tag")) && tokens.size() < 6) {
|
||||
else if(((tokens.size() == 3 && inArgument) || tokencmp(tokens[2], "tag")) && tokens.size() < 7) {
|
||||
return std::vector<const char*>(opts.begin() + tokens.size() - 2, opts.end());
|
||||
}
|
||||
}
|
||||
else if(tokencmp(tokens[1], "off")) {
|
||||
if(tokens.size() == 2) {
|
||||
return { "<all|auto|manual|tag>", "[ARGS]" };
|
||||
}
|
||||
else if(tokens.size() == 3 && tokencmp(tokens[2], "tag")) {
|
||||
if(tokencmp(tokens[tokens.size()-1], "tag")) {
|
||||
return { "<TAG>" };
|
||||
}
|
||||
else if(tokens.size() == 3 && inArgument) {
|
||||
return { "[ARGS]" };
|
||||
else {
|
||||
bool hasType = false;
|
||||
bool hasTag = false;
|
||||
bool hasPriority = false;
|
||||
for(int i = 2; i < tokens.size(); ++i) {
|
||||
if(tokencmp(tokens[i], "all") || tokencmp(tokens[i], "auto") || tokencmp(tokens[i], "manual")) {
|
||||
hasType = true;
|
||||
}
|
||||
else if(tokencmp(tokens[i], "default") || tokencmp(tokens[i], "immediate") || tokencmp(tokens[i], "batch")) {
|
||||
hasPriority = true;
|
||||
}
|
||||
else if(tokencmp(tokens[i], "tag")) {
|
||||
hasTag = true;
|
||||
++i;
|
||||
}
|
||||
else {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<const char*> options;
|
||||
if(!hasType) {
|
||||
options.push_back("[all|auto|manual]");
|
||||
}
|
||||
if(!hasTag) {
|
||||
options.push_back("[tag <TAG>]");
|
||||
}
|
||||
if(!hasPriority) {
|
||||
options.push_back("[default|immediate|batch]");
|
||||
}
|
||||
|
||||
return options;
|
||||
}
|
||||
}
|
||||
else if((tokencmp(tokens[1], "enable") || tokencmp(tokens[1], "disable")) && tokens.size() == 2) {
|
||||
|
@ -3976,7 +4009,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
(int)(itr->tpsRate),
|
||||
std::min((int)(itr->expirationTime-now()), (int)(itr->initialDuration)),
|
||||
transactionPriorityToString(itr->priority, false),
|
||||
itr->autoThrottled ? "auto" : "manual",
|
||||
itr->throttleType == TagThrottleType::AUTO ? "auto" : "manual",
|
||||
itr->tag.toString().c_str());
|
||||
}
|
||||
}
|
||||
|
@ -3989,19 +4022,21 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
printf("There are no throttled tags\n");
|
||||
}
|
||||
}
|
||||
else if(tokencmp(tokens[1], "on") && tokens.size() <=6) {
|
||||
if(tokens.size() < 4 || !tokencmp(tokens[2], "tag")) {
|
||||
printf("Usage: throttle on tag <TAG> [RATE] [DURATION]\n");
|
||||
else if(tokencmp(tokens[1], "on")) {
|
||||
if(tokens.size() < 4 || !tokencmp(tokens[2], "tag") || tokens.size() > 7) {
|
||||
printf("Usage: throttle on tag <TAG> [RATE] [DURATION] [PRIORITY]\n");
|
||||
printf("\n");
|
||||
printf("Enables throttling for transactions with the specified tag.\n");
|
||||
printf("An optional transactions per second rate can be specified (default 0).\n");
|
||||
printf("An optional duration can be specified, which must include a time suffix (s, m, h, d) (default 1h).\n");
|
||||
printf("An optional priority can be specified. Choices are `default', `immediate', and `batch' (default `default').\n");
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
double tpsRate = 0.0;
|
||||
uint64_t duration = 3600;
|
||||
TransactionPriority priority = TransactionPriority::DEFAULT;
|
||||
|
||||
if(tokens.size() >= 5) {
|
||||
char *end;
|
||||
|
@ -4025,70 +4060,145 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
|
|||
continue;
|
||||
}
|
||||
duration = parsedDuration.get();
|
||||
}
|
||||
|
||||
if(duration == 0) {
|
||||
printf("ERROR: throttle duration cannot be 0\n");
|
||||
is_error = true;
|
||||
continue;
|
||||
if(duration == 0) {
|
||||
printf("ERROR: throttle duration cannot be 0\n");
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if(tokens.size() == 7) {
|
||||
if(tokens[6] == LiteralStringRef("default")) {
|
||||
priority = TransactionPriority::DEFAULT;
|
||||
}
|
||||
else if(tokens[6] == LiteralStringRef("immediate")) {
|
||||
priority = TransactionPriority::IMMEDIATE;
|
||||
}
|
||||
else if(tokens[6] == LiteralStringRef("batch")) {
|
||||
priority = TransactionPriority::BATCH;
|
||||
}
|
||||
else {
|
||||
printf("ERROR: unrecognized priority `%s'. Must be one of `default',\n `immediate', or `batch'.\n", tokens[6].toString().c_str());
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
TagSet tags;
|
||||
tags.addTag(tokens[3]);
|
||||
|
||||
wait(ThrottleApi::throttleTags(db, tags, tpsRate, duration, false, TransactionPriority::DEFAULT));
|
||||
wait(ThrottleApi::throttleTags(db, tags, tpsRate, duration, TagThrottleType::MANUAL, priority));
|
||||
printf("Tag `%s' has been throttled\n", tokens[3].toString().c_str());
|
||||
}
|
||||
else if(tokencmp(tokens[1], "off")) {
|
||||
if(tokencmp(tokens[2], "tag") && tokens.size() == 4) {
|
||||
TagSet tags;
|
||||
tags.addTag(tokens[3]);
|
||||
bool success = wait(ThrottleApi::unthrottleTags(db, tags, false, TransactionPriority::DEFAULT)); // TODO: Allow targeting priority and auto/manual
|
||||
if(success) {
|
||||
printf("Unthrottled tag `%s'\n", tokens[3].toString().c_str());
|
||||
int nextIndex = 2;
|
||||
TagSet tags;
|
||||
bool throttleTypeSpecified = false;
|
||||
Optional<TagThrottleType> throttleType = TagThrottleType::MANUAL;
|
||||
Optional<TransactionPriority> priority;
|
||||
|
||||
if(tokens.size() == 2) {
|
||||
is_error = true;
|
||||
}
|
||||
|
||||
while(nextIndex < tokens.size() && !is_error) {
|
||||
if(tokencmp(tokens[nextIndex], "all")) {
|
||||
if(throttleTypeSpecified) {
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
throttleTypeSpecified = true;
|
||||
throttleType = Optional<TagThrottleType>();
|
||||
++nextIndex;
|
||||
}
|
||||
else {
|
||||
printf("Tag `%s' was not throttled\n", tokens[3].toString().c_str());
|
||||
else if(tokencmp(tokens[nextIndex], "auto")) {
|
||||
if(throttleTypeSpecified) {
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
throttleTypeSpecified = true;
|
||||
throttleType = TagThrottleType::AUTO;
|
||||
++nextIndex;
|
||||
}
|
||||
else if(tokencmp(tokens[nextIndex], "manual")) {
|
||||
if(throttleTypeSpecified) {
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
throttleTypeSpecified = true;
|
||||
throttleType = TagThrottleType::MANUAL;
|
||||
++nextIndex;
|
||||
}
|
||||
else if(tokencmp(tokens[nextIndex], "default")) {
|
||||
if(priority.present()) {
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
priority = TransactionPriority::DEFAULT;
|
||||
++nextIndex;
|
||||
}
|
||||
else if(tokencmp(tokens[nextIndex], "immediate")) {
|
||||
if(priority.present()) {
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
priority = TransactionPriority::IMMEDIATE;
|
||||
++nextIndex;
|
||||
}
|
||||
else if(tokencmp(tokens[nextIndex], "batch")) {
|
||||
if(priority.present()) {
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
priority = TransactionPriority::BATCH;
|
||||
++nextIndex;
|
||||
}
|
||||
else if(tokencmp(tokens[nextIndex], "tag")) {
|
||||
if(tags.size() > 0 || nextIndex == tokens.size()-1) {
|
||||
is_error = true;
|
||||
continue;
|
||||
}
|
||||
tags.addTag(tokens[nextIndex+1]);
|
||||
nextIndex += 2;
|
||||
}
|
||||
}
|
||||
else if(tokencmp(tokens[2], "all") && tokens.size() == 3) {
|
||||
bool unthrottled = wait(ThrottleApi::unthrottleAll(db));
|
||||
if(unthrottled) {
|
||||
printf("Unthrottled all tags\n");
|
||||
|
||||
if(!is_error) {
|
||||
state const char *throttleTypeString = !throttleType.present() ? "" : (throttleType.get() == TagThrottleType::AUTO ? "auto-" : "manually ");
|
||||
state std::string priorityString = priority.present() ? format(" at %s priority", transactionPriorityToString(priority.get(), false)) : "";
|
||||
|
||||
if(tags.size() > 0) {
|
||||
bool success = wait(ThrottleApi::unthrottleTags(db, tags, throttleType, priority));
|
||||
if(success) {
|
||||
printf("Unthrottled tag `%s'%s\n", tokens[3].toString().c_str(), priorityString.c_str());
|
||||
}
|
||||
else {
|
||||
printf("Tag `%s' was not %sthrottled%s\n", tokens[3].toString().c_str(), throttleTypeString, priorityString.c_str());
|
||||
}
|
||||
}
|
||||
else {
|
||||
printf("There were no tags being throttled\n");
|
||||
}
|
||||
}
|
||||
else if(tokencmp(tokens[2], "auto") && tokens.size() == 3) {
|
||||
bool unthrottled = wait(ThrottleApi::unthrottleAuto(db));
|
||||
if(unthrottled) {
|
||||
printf("Unthrottled all auto-throttled tags\n");
|
||||
}
|
||||
else {
|
||||
printf("There were no tags being throttled\n");
|
||||
}
|
||||
}
|
||||
else if(tokencmp(tokens[2], "manual") && tokens.size() == 3) {
|
||||
bool unthrottled = wait(ThrottleApi::unthrottleManual(db));
|
||||
if(unthrottled) {
|
||||
printf("Unthrottled all manually throttled tags\n");
|
||||
}
|
||||
else {
|
||||
printf("There were no tags being throttled\n");
|
||||
bool unthrottled = wait(ThrottleApi::unthrottleAll(db, throttleType, priority));
|
||||
if(unthrottled) {
|
||||
printf("Unthrottled all %sthrottled tags%s\n", throttleTypeString, priorityString.c_str());
|
||||
}
|
||||
else {
|
||||
printf("There were no tags being %sthrottled%s\n", throttleTypeString, priorityString.c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
printf("Usage: throttle off <all|auto|manual|tag> [TAG]\n");
|
||||
printf("Usage: throttle off [all|auto|manual] [tag <TAG>] [PRIORITY]\n");
|
||||
printf("\n");
|
||||
printf("Disables throttling for the specified tag(s).\n");
|
||||
printf("Use `all' to turn off all tag throttles, `auto' to turn off throttles created by\n");
|
||||
printf("the cluster, and `manual' to turn off throttles created manually. Use `tag <TAG>'\n");
|
||||
printf("to turn off throttles for a specific tag\n");
|
||||
is_error = true;
|
||||
printf("Disables throttling for throttles matching the specified filters. At least one filter must be used.\n\n");
|
||||
printf("An optional qualifier `all', `auto', or `manual' can be used to specify the type of throttle\n");
|
||||
printf("affected. `all' targets all throttles, `auto' targets those created by the cluster, and\n");
|
||||
printf("`manual' targets those created manually (default `manual').\n\n");
|
||||
printf("The `tag' filter can be use to turn off only a specific tag.\n\n");
|
||||
printf("The priority filter can be used to turn off only throttles at specific priorities. Choices are\n");
|
||||
printf("`default', `immediate', or `batch'. By default, all priorities are targeted.\n");
|
||||
}
|
||||
}
|
||||
else if((tokencmp(tokens[1], "enable") || tokencmp(tokens[1], "disable")) && tokens.size() == 3 && tokencmp(tokens[2], "auto")) {
|
||||
else if(tokencmp(tokens[1], "enable") || tokencmp(tokens[1], "disable")) {
|
||||
if(tokens.size() != 3 || !tokencmp(tokens[2], "auto")) {
|
||||
printf("Usage: throttle <enable|disable> auto\n");
|
||||
printf("\n");
|
||||
|
|
|
@ -73,7 +73,7 @@ Key TagThrottleKey::toKey() const {
|
|||
memcpy(str, tagThrottleKeysPrefix.begin(), tagThrottleKeysPrefix.size());
|
||||
str += tagThrottleKeysPrefix.size();
|
||||
|
||||
*(str++) = autoThrottled ? 1 : 0;
|
||||
*(str++) = (uint8_t)throttleType;
|
||||
*(str++) = (uint8_t)priority;
|
||||
|
||||
for(auto tag : tags) {
|
||||
|
@ -89,7 +89,7 @@ Key TagThrottleKey::toKey() const {
|
|||
|
||||
TagThrottleKey TagThrottleKey::fromKey(const KeyRef& key) {
|
||||
const uint8_t *str = key.substr(tagThrottleKeysPrefix.size()).begin();
|
||||
bool autoThrottled = *(str++) != 0;
|
||||
TagThrottleType throttleType = TagThrottleType(*(str++));
|
||||
TransactionPriority priority = TransactionPriority(*(str++));
|
||||
TagSet tags;
|
||||
|
||||
|
@ -99,7 +99,7 @@ TagThrottleKey TagThrottleKey::fromKey(const KeyRef& key) {
|
|||
str += size;
|
||||
}
|
||||
|
||||
return TagThrottleKey(tags, autoThrottled, priority);
|
||||
return TagThrottleKey(tags, throttleType, priority);
|
||||
}
|
||||
|
||||
TagThrottleValue TagThrottleValue::fromValue(const ValueRef& value) {
|
||||
|
@ -164,9 +164,9 @@ namespace ThrottleApi {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> throttleTags(Database db, TagSet tags, double tpsRate, double initialDuration, bool autoThrottled, TransactionPriority priority, Optional<double> expirationTime) {
|
||||
ACTOR Future<Void> throttleTags(Database db, TagSet tags, double tpsRate, double initialDuration, TagThrottleType throttleType, TransactionPriority priority, Optional<double> expirationTime) {
|
||||
state Transaction tr(db);
|
||||
state Key key = TagThrottleKey(tags, autoThrottled, priority).toKey();
|
||||
state Key key = TagThrottleKey(tags, throttleType, priority).toKey();
|
||||
|
||||
ASSERT(initialDuration > 0);
|
||||
|
||||
|
@ -177,7 +177,7 @@ namespace ThrottleApi {
|
|||
|
||||
loop {
|
||||
try {
|
||||
if(!autoThrottled) {
|
||||
if(throttleType == TagThrottleType::MANUAL) {
|
||||
Optional<Value> oldThrottle = wait(tr.get(key));
|
||||
if(!oldThrottle.present()) {
|
||||
wait(updateThrottleCount(&tr, 1));
|
||||
|
@ -186,7 +186,7 @@ namespace ThrottleApi {
|
|||
|
||||
tr.set(key, value);
|
||||
|
||||
if(!autoThrottled) {
|
||||
if(throttleType == TagThrottleType::MANUAL) {
|
||||
signalThrottleChange(tr);
|
||||
}
|
||||
|
||||
|
@ -199,28 +199,54 @@ namespace ThrottleApi {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<bool> unthrottleTags(Database db, TagSet tags, bool autoThrottled, TransactionPriority priority) {
|
||||
ACTOR Future<bool> unthrottleTags(Database db, TagSet tags, Optional<TagThrottleType> throttleType, Optional<TransactionPriority> priority) {
|
||||
state Transaction tr(db);
|
||||
state Key key = TagThrottleKey(tags, autoThrottled, priority).toKey();
|
||||
state bool removed = false;
|
||||
|
||||
state std::vector<Key> keys;
|
||||
for(auto p : allTransactionPriorities) {
|
||||
if(!priority.present() || priority.get() == p) {
|
||||
if(!throttleType.present() || throttleType.get() == TagThrottleType::AUTO) {
|
||||
keys.push_back(TagThrottleKey(tags, TagThrottleType::AUTO, p).toKey());
|
||||
}
|
||||
if(!throttleType.present() || throttleType.get() == TagThrottleType::MANUAL) {
|
||||
keys.push_back(TagThrottleKey(tags, TagThrottleType::MANUAL, p).toKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
state bool removed = false;
|
||||
|
||||
loop {
|
||||
try {
|
||||
state Optional<Value> value = wait(tr.get(key));
|
||||
if(value.present()) {
|
||||
if(!autoThrottled) {
|
||||
wait(updateThrottleCount(&tr, -1));
|
||||
state std::vector<Future<Optional<Value>>> values;
|
||||
for(auto key : keys) {
|
||||
values.push_back(tr.get(key));
|
||||
}
|
||||
|
||||
wait(waitForAll(values));
|
||||
|
||||
int delta = 0;
|
||||
for(int i = 0; i < values.size(); ++i) {
|
||||
if(values[i].get().present()) {
|
||||
if(TagThrottleKey::fromKey(keys[i]).throttleType == TagThrottleType::MANUAL) {
|
||||
delta -= 1;
|
||||
}
|
||||
|
||||
tr.clear(keys[i]);
|
||||
|
||||
// Report that we are removing this tag if we ever see it present.
|
||||
// This protects us from getting confused if the transaction is maybe committed.
|
||||
// It's ok if someone else actually ends up removing this tag at the same time
|
||||
// and we aren't the ones to actually do it.
|
||||
removed = true;
|
||||
}
|
||||
}
|
||||
|
||||
tr.clear(key);
|
||||
if(delta != 0) {
|
||||
wait(updateThrottleCount(&tr, delta));
|
||||
}
|
||||
if(removed) {
|
||||
signalThrottleChange(tr);
|
||||
|
||||
// Report that we are removing this tag if we ever see it present.
|
||||
// This protects us from getting confused if the transaction is maybe committed.
|
||||
// It's ok if someone else actually ends up removing this tag at the same time
|
||||
// and we aren't the ones to actually do it.
|
||||
removed = true;
|
||||
wait(tr.commit());
|
||||
}
|
||||
|
||||
|
@ -232,7 +258,7 @@ namespace ThrottleApi {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<bool> unthrottleTags(Database db, KeyRef beginKey, KeyRef endKey, bool onlyExpiredThrottles) {
|
||||
ACTOR Future<bool> unthrottleMatchingThrottles(Database db, KeyRef beginKey, KeyRef endKey, Optional<TransactionPriority> priority, bool onlyExpiredThrottles) {
|
||||
state Transaction tr(db);
|
||||
|
||||
state KeySelector begin = firstGreaterOrEqual(beginKey);
|
||||
|
@ -253,8 +279,12 @@ namespace ThrottleApi {
|
|||
}
|
||||
}
|
||||
|
||||
bool autoThrottled = TagThrottleKey::fromKey(tag.key).autoThrottled;
|
||||
if(!autoThrottled) {
|
||||
TagThrottleKey key = TagThrottleKey::fromKey(tag.key);
|
||||
if(priority.present() && key.priority != priority.get()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if(key.throttleType == TagThrottleType::MANUAL) {
|
||||
++manualUnthrottledTags;
|
||||
}
|
||||
|
||||
|
@ -285,20 +315,22 @@ namespace ThrottleApi {
|
|||
}
|
||||
}
|
||||
|
||||
Future<bool> unthrottleManual(Database db) {
|
||||
return unthrottleTags(db, tagThrottleKeysPrefix, tagThrottleAutoKeysPrefix, false);
|
||||
}
|
||||
Future<bool> unthrottleAll(Database db, Optional<TagThrottleType> tagThrottleType, Optional<TransactionPriority> priority) {
|
||||
KeyRef begin = tagThrottleKeys.begin;
|
||||
KeyRef end = tagThrottleKeys.end;
|
||||
|
||||
Future<bool> unthrottleAuto(Database db) {
|
||||
return unthrottleTags(db, tagThrottleAutoKeysPrefix, tagThrottleKeys.end, false);
|
||||
}
|
||||
if(tagThrottleType.present() && tagThrottleType == TagThrottleType::AUTO) {
|
||||
begin = tagThrottleAutoKeysPrefix;
|
||||
}
|
||||
else if(tagThrottleType.present() && tagThrottleType == TagThrottleType::MANUAL) {
|
||||
end = tagThrottleAutoKeysPrefix;
|
||||
}
|
||||
|
||||
Future<bool> unthrottleAll(Database db) {
|
||||
return unthrottleTags(db, tagThrottleKeys.begin, tagThrottleKeys.end, false);
|
||||
return unthrottleMatchingThrottles(db, begin, end, priority, false);
|
||||
}
|
||||
|
||||
Future<bool> expire(Database db) {
|
||||
return unthrottleTags(db, tagThrottleKeys.begin, tagThrottleKeys.end, true);
|
||||
return unthrottleMatchingThrottles(db, tagThrottleKeys.begin, tagThrottleKeys.end, Optional<TransactionPriority>(), true);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> enableAuto(Database db, bool enabled) {
|
||||
|
|
|
@ -107,14 +107,19 @@ struct dynamic_size_traits<TagSet> : std::true_type {
|
|||
}
|
||||
};
|
||||
|
||||
enum class TagThrottleType : uint8_t {
|
||||
MANUAL,
|
||||
AUTO
|
||||
};
|
||||
|
||||
struct TagThrottleKey {
|
||||
TagSet tags;
|
||||
bool autoThrottled;
|
||||
TagThrottleType throttleType;
|
||||
TransactionPriority priority;
|
||||
|
||||
TagThrottleKey() : autoThrottled(false), priority(TransactionPriority::DEFAULT) {}
|
||||
TagThrottleKey(TagSet tags, bool autoThrottled, TransactionPriority priority)
|
||||
: tags(tags), autoThrottled(autoThrottled), priority(priority) {}
|
||||
TagThrottleKey() : throttleType(TagThrottleType::MANUAL), priority(TransactionPriority::DEFAULT) {}
|
||||
TagThrottleKey(TagSet tags, TagThrottleType throttleType, TransactionPriority priority)
|
||||
: tags(tags), throttleType(throttleType), priority(priority) {}
|
||||
|
||||
Key toKey() const;
|
||||
static TagThrottleKey fromKey(const KeyRef& key);
|
||||
|
@ -139,17 +144,17 @@ struct TagThrottleValue {
|
|||
|
||||
struct TagThrottleInfo {
|
||||
TransactionTag tag;
|
||||
bool autoThrottled;
|
||||
TagThrottleType throttleType;
|
||||
TransactionPriority priority;
|
||||
double tpsRate;
|
||||
double expirationTime;
|
||||
double initialDuration;
|
||||
|
||||
TagThrottleInfo(TransactionTag tag, bool autoThrottled, TransactionPriority priority, double tpsRate, double expirationTime, double initialDuration)
|
||||
: tag(tag), autoThrottled(autoThrottled), priority(priority), tpsRate(tpsRate), expirationTime(expirationTime), initialDuration(initialDuration) {}
|
||||
TagThrottleInfo(TransactionTag tag, TagThrottleType throttleType, TransactionPriority priority, double tpsRate, double expirationTime, double initialDuration)
|
||||
: tag(tag), throttleType(throttleType), priority(priority), tpsRate(tpsRate), expirationTime(expirationTime), initialDuration(initialDuration) {}
|
||||
|
||||
TagThrottleInfo(TagThrottleKey key, TagThrottleValue value)
|
||||
: autoThrottled(key.autoThrottled), priority(key.priority), tpsRate(value.tpsRate), expirationTime(value.expirationTime), initialDuration(value.initialDuration)
|
||||
: throttleType(key.throttleType), priority(key.priority), tpsRate(value.tpsRate), expirationTime(value.expirationTime), initialDuration(value.initialDuration)
|
||||
{
|
||||
ASSERT(key.tags.size() == 1); // Multiple tags per throttle is not currently supported
|
||||
tag = *key.tags.begin();
|
||||
|
@ -160,13 +165,11 @@ namespace ThrottleApi {
|
|||
Future<std::vector<TagThrottleInfo>> getThrottledTags(Database const& db, int const& limit);
|
||||
|
||||
Future<Void> throttleTags(Database const& db, TagSet const& tags, double const& tpsRate, double const& initialDuration,
|
||||
bool const& autoThrottled, TransactionPriority const& priority, Optional<double> const& expirationTime = Optional<double>());
|
||||
TagThrottleType const& throttleType, TransactionPriority const& priority, Optional<double> const& expirationTime = Optional<double>());
|
||||
|
||||
Future<bool> unthrottleTags(Database const& db, TagSet const& tags, bool const& autoThrottled, TransactionPriority const& priority);
|
||||
Future<bool> unthrottleTags(Database const& db, TagSet const& tags, Optional<TagThrottleType> const& throttleType, Optional<TransactionPriority> const& priority);
|
||||
|
||||
Future<bool> unthrottleManual(Database db);
|
||||
Future<bool> unthrottleAuto(Database db);
|
||||
Future<bool> unthrottleAll(Database db);
|
||||
Future<bool> unthrottleAll(Database db, Optional<TagThrottleType> throttleType, Optional<TransactionPriority> priority);
|
||||
Future<bool> expire(Database db);
|
||||
|
||||
Future<Void> enableAuto(Database const& db, bool const& enabled);
|
||||
|
|
|
@ -762,7 +762,7 @@ ACTOR Future<Void> monitorThrottlingChanges(RatekeeperData *self) {
|
|||
TransactionTag tag = *tagKey.tags.begin();
|
||||
Optional<ClientTagThrottleLimits> oldLimits = self->throttledTags.getManualTagThrottleLimits(tag, tagKey.priority);
|
||||
|
||||
if(tagKey.autoThrottled) {
|
||||
if(tagKey.throttleType == TagThrottleType::AUTO) {
|
||||
updatedTagThrottles.autoThrottleTag(tag, 0, tagValue.tpsRate, tagValue.expirationTime);
|
||||
}
|
||||
else {
|
||||
|
@ -799,7 +799,7 @@ void tryAutoThrottleTag(RatekeeperData *self, StorageQueueInfo const& ss, RkTagT
|
|||
TagSet tags;
|
||||
tags.addTag(ss.busiestTag.get());
|
||||
|
||||
self->addActor.send(ThrottleApi::throttleTags(self->db, tags, clientRate.get(), SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, true, TransactionPriority::DEFAULT, now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION));
|
||||
self->addActor.send(ThrottleApi::throttleTags(self->db, tags, clientRate.get(), SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION, TagThrottleType::AUTO, TransactionPriority::DEFAULT, now() + SERVER_KNOBS->AUTO_TAG_THROTTLE_DURATION));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,6 +50,22 @@ struct TagThrottleApiWorkload : TestWorkload {
|
|||
|
||||
virtual void getMetrics(vector<PerfMetric>& m) {}
|
||||
|
||||
static Optional<TagThrottleType> randomTagThrottleType() {
|
||||
Optional<TagThrottleType> throttleType;
|
||||
switch(deterministicRandom()->randomInt(0, 3)) {
|
||||
case 0:
|
||||
throttleType = TagThrottleType::AUTO;
|
||||
break;
|
||||
case 1:
|
||||
throttleType = TagThrottleType::MANUAL;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return throttleType;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> throttleTag(Database cx, std::map<std::pair<TransactionTag, TransactionPriority>, TagThrottleInfo> *manuallyThrottledTags) {
|
||||
state TransactionTag tag = TransactionTagRef(deterministicRandom()->randomChoice(DatabaseContext::debugTransactionTagChoices));
|
||||
state TransactionPriority priority = deterministicRandom()->randomChoice(allTransactionPriorities);
|
||||
|
@ -60,7 +76,7 @@ struct TagThrottleApiWorkload : TestWorkload {
|
|||
tagSet.addTag(tag);
|
||||
|
||||
try {
|
||||
wait(ThrottleApi::throttleTags(cx, tagSet, rate, duration, false, priority));
|
||||
wait(ThrottleApi::throttleTags(cx, tagSet, rate, duration, TagThrottleType::MANUAL, priority));
|
||||
}
|
||||
catch(Error &e) {
|
||||
state Error err = e;
|
||||
|
@ -72,7 +88,7 @@ struct TagThrottleApiWorkload : TestWorkload {
|
|||
throw err;
|
||||
}
|
||||
|
||||
manuallyThrottledTags->insert_or_assign(std::make_pair(tag, priority), TagThrottleInfo(tag, false, priority, rate, now() + duration, duration));
|
||||
manuallyThrottledTags->insert_or_assign(std::make_pair(tag, priority), TagThrottleInfo(tag, TagThrottleType::MANUAL, priority, rate, now() + duration, duration));
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
@ -82,26 +98,30 @@ struct TagThrottleApiWorkload : TestWorkload {
|
|||
TagSet tagSet;
|
||||
tagSet.addTag(tag);
|
||||
|
||||
state bool autoThrottled = deterministicRandom()->coinflip();
|
||||
TransactionPriority priority = deterministicRandom()->randomChoice(allTransactionPriorities);
|
||||
state Optional<TagThrottleType> throttleType = TagThrottleApiWorkload::randomTagThrottleType();
|
||||
Optional<TransactionPriority> priority = deterministicRandom()->coinflip() ? Optional<TransactionPriority>() : deterministicRandom()->randomChoice(allTransactionPriorities);
|
||||
|
||||
state bool erased = false;
|
||||
state double expiration = 0;
|
||||
if(!autoThrottled) {
|
||||
auto itr = manuallyThrottledTags->find(std::make_pair(tag, priority));
|
||||
if(itr != manuallyThrottledTags->end()) {
|
||||
expiration = itr->second.expirationTime;
|
||||
erased = true;
|
||||
manuallyThrottledTags->erase(itr);
|
||||
state double maxExpiration = 0;
|
||||
if(!throttleType.present() || throttleType.get() == TagThrottleType::MANUAL) {
|
||||
for(auto p : allTransactionPriorities) {
|
||||
if(!priority.present() || priority.get() == p) {
|
||||
auto itr = manuallyThrottledTags->find(std::make_pair(tag, p));
|
||||
if(itr != manuallyThrottledTags->end()) {
|
||||
maxExpiration = std::max(maxExpiration, itr->second.expirationTime);
|
||||
erased = true;
|
||||
manuallyThrottledTags->erase(itr);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool removed = wait(ThrottleApi::unthrottleTags(cx, tagSet, autoThrottled, priority));
|
||||
bool removed = wait(ThrottleApi::unthrottleTags(cx, tagSet, throttleType, priority));
|
||||
if(removed) {
|
||||
ASSERT(erased || autoThrottled);
|
||||
ASSERT(erased || !throttleType.present() || throttleType.get() == TagThrottleType::AUTO);
|
||||
}
|
||||
else {
|
||||
ASSERT(expiration < now());
|
||||
ASSERT(maxExpiration < now());
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
@ -113,7 +133,7 @@ struct TagThrottleApiWorkload : TestWorkload {
|
|||
int manualThrottledTags = 0;
|
||||
int activeAutoThrottledTags = 0;
|
||||
for(auto &tag : tags) {
|
||||
if(!tag.autoThrottled) {
|
||||
if(tag.throttleType == TagThrottleType::MANUAL) {
|
||||
ASSERT(manuallyThrottledTags->find(std::make_pair(tag.tag, tag.priority)) != manuallyThrottledTags->end());
|
||||
++manualThrottledTags;
|
||||
}
|
||||
|
@ -139,34 +159,32 @@ struct TagThrottleApiWorkload : TestWorkload {
|
|||
}
|
||||
|
||||
ACTOR Future<Void> unthrottleTagGroup(Database cx, std::map<std::pair<TransactionTag, TransactionPriority>, TagThrottleInfo> *manuallyThrottledTags) {
|
||||
state int choice = deterministicRandom()->randomInt(0, 3);
|
||||
state Optional<TagThrottleType> throttleType = TagThrottleApiWorkload::randomTagThrottleType();
|
||||
state Optional<TransactionPriority> priority = deterministicRandom()->coinflip() ? Optional<TransactionPriority>() : deterministicRandom()->randomChoice(allTransactionPriorities);
|
||||
|
||||
if(choice == 0) {
|
||||
bool unthrottled = wait(ThrottleApi::unthrottleAll(cx));
|
||||
bool unthrottled = wait(ThrottleApi::unthrottleAll(cx, throttleType, priority));
|
||||
if(!throttleType.present() || throttleType.get() == TagThrottleType::MANUAL) {
|
||||
bool unthrottleExpected = false;
|
||||
for(auto itr = manuallyThrottledTags->begin(); itr != manuallyThrottledTags->end(); ++itr) {
|
||||
if(itr->second.expirationTime > now()) {
|
||||
unthrottleExpected = true;
|
||||
bool empty = manuallyThrottledTags->empty();
|
||||
for(auto itr = manuallyThrottledTags->begin(); itr != manuallyThrottledTags->end();) {
|
||||
if(!priority.present() || priority.get() == itr->first.second) {
|
||||
if(itr->second.expirationTime > now()) {
|
||||
unthrottleExpected = true;
|
||||
}
|
||||
|
||||
itr = manuallyThrottledTags->erase(itr);
|
||||
}
|
||||
else {
|
||||
++itr;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(!unthrottleExpected || unthrottled);
|
||||
manuallyThrottledTags->clear();
|
||||
}
|
||||
else if(choice == 1) {
|
||||
bool unthrottled = wait(ThrottleApi::unthrottleManual(cx));
|
||||
bool unthrottleExpected = false;
|
||||
for(auto itr = manuallyThrottledTags->begin(); itr != manuallyThrottledTags->end(); ++itr) {
|
||||
if(itr->second.expirationTime > now()) {
|
||||
unthrottleExpected = true;
|
||||
}
|
||||
if(throttleType.present()) {
|
||||
ASSERT((unthrottled && !empty) || (!unthrottled && !unthrottleExpected));
|
||||
}
|
||||
else {
|
||||
ASSERT(unthrottled || !unthrottleExpected);
|
||||
}
|
||||
|
||||
ASSERT((unthrottled && !manuallyThrottledTags->empty()) || (!unthrottled && !unthrottleExpected));
|
||||
manuallyThrottledTags->clear();
|
||||
}
|
||||
else {
|
||||
bool unthrottled = wait(ThrottleApi::unthrottleAuto(cx));
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
@ -176,7 +194,7 @@ struct TagThrottleApiWorkload : TestWorkload {
|
|||
if(deterministicRandom()->coinflip()) {
|
||||
wait(ThrottleApi::enableAuto(cx, true));
|
||||
if(deterministicRandom()->coinflip()) {
|
||||
bool unthrottled = wait(ThrottleApi::unthrottleAuto(cx));
|
||||
bool unthrottled = wait(ThrottleApi::unthrottleAll(cx, TagThrottleType::AUTO, Optional<TransactionPriority>()));
|
||||
}
|
||||
}
|
||||
else {
|
||||
|
|
Loading…
Reference in New Issue