Merge pull request #8437 from sfc-gh-tclinkenbeard/add-quota-clear-command

Add fdbcli "quota clear" command
This commit is contained in:
Jingyu Zhou 2022-10-18 15:11:05 -07:00 committed by GitHub
commit 14d13475db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 101 additions and 17 deletions

View File

@ -47,6 +47,12 @@ Note that the quotas are specified in terms of bytes/second, and internally conv
page_cost_quota = ceiling(byte_quota / CLIENT_KNOBS->READ_COST_BYTE_FACTOR)
```
To clear a both reserved and total throughput quotas for a tag, run:
```
fdbcli> quota clear <tag>
```
### Limit Calculation
The transaction budget that ratekeeper calculates and distributes to clients (via GRV proxies) for each tag is calculated based on several intermediate rate calculations, outlined in this section.

View File

@ -56,7 +56,7 @@ ACTOR Future<Void> getQuota(Reference<IDatabase> db, TransactionTag tag, LimitTy
loop {
tr->setOption(FDBTransactionOptions::READ_SYSTEM_KEYS);
try {
state ThreadFuture<Optional<Value>> resultFuture = tr->get(tag.withPrefix(tagQuotaPrefix));
state ThreadFuture<Optional<Value>> resultFuture = tr->get(ThrottleApi::getTagQuotaKey(tag));
Optional<Value> v = wait(safeThreadFutureToFuture(resultFuture));
if (!v.present()) {
fmt::print("<empty>\n");
@ -77,11 +77,10 @@ ACTOR Future<Void> getQuota(Reference<IDatabase> db, TransactionTag tag, LimitTy
ACTOR Future<Void> setQuota(Reference<IDatabase> db, TransactionTag tag, LimitType limitType, double value) {
state Reference<ITransaction> tr = db->createTransaction();
state Key key = tag.withPrefix(tagQuotaPrefix);
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
try {
state ThreadFuture<Optional<Value>> resultFuture = tr->get(key);
state ThreadFuture<Optional<Value>> resultFuture = tr->get(ThrottleApi::getTagQuotaKey(tag));
Optional<Value> v = wait(safeThreadFutureToFuture(resultFuture));
ThrottleApi::TagQuotaValue quota;
if (v.present()) {
@ -103,8 +102,22 @@ ACTOR Future<Void> setQuota(Reference<IDatabase> db, TransactionTag tag, LimitTy
}
}
ACTOR Future<Void> clearQuota(Reference<IDatabase> db, TransactionTag tag) {
state Reference<ITransaction> tr = db->createTransaction();
loop {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
try {
tr->clear(ThrottleApi::getTagQuotaKey(tag));
wait(safeThreadFutureToFuture(tr->commit()));
return Void();
} catch (Error& e) {
wait(safeThreadFutureToFuture(tr->onError(e)));
}
}
}
constexpr auto usage = "quota [get <tag> [reserved_throughput|total_throughput] | set <tag> "
"[reserved_throughput|total_throughput] <value>]";
"[reserved_throughput|total_throughput] <value> | clear <tag>]";
bool exitFailure() {
fmt::print(usage);
@ -117,30 +130,40 @@ namespace fdb_cli {
ACTOR Future<bool> quotaCommandActor(Reference<IDatabase> db, std::vector<StringRef> tokens) {
state bool result = true;
if (tokens.size() != 5 && tokens.size() != 6) {
if (tokens.size() < 3 || tokens.size() > 5) {
return exitFailure();
} else {
auto tag = parseTag(tokens[2]);
auto limitType = parseLimitType(tokens[3]);
if (!tag.present() || !limitType.present()) {
auto const tag = parseTag(tokens[2]);
if (!tag.present()) {
return exitFailure();
}
if (tokens[1] == "get"_sr) {
if (tokens.size() != 4) {
return exitFailure();
}
auto const limitType = parseLimitType(tokens[3]);
if (!limitType.present()) {
return exitFailure();
}
wait(getQuota(db, tag.get(), limitType.get()));
return true;
} else if (tokens[1] == "set"_sr) {
if (tokens.size() != 5) {
return exitFailure();
}
auto const limitType = parseLimitType(tokens[3]);
auto const limitValue = parseLimitValue(tokens[4]);
if (!limitValue.present()) {
if (!limitType.present() || !limitValue.present()) {
return exitFailure();
}
wait(setQuota(db, tag.get(), limitType.get(), limitValue.get()));
return true;
} else if (tokens[1] == "clear"_sr) {
if (tokens.size() != 3) {
return exitFailure();
}
wait(clearQuota(db, tag.get()));
return true;
} else {
return exitFailure();
}

View File

@ -542,8 +542,8 @@ void initHelp() {
"Displays the current read version of the database or currently running transaction.");
helpMap["quota"] = CommandHelp("quota",
"quota [get <tag> [reserved_throughput|total_throughput] | set <tag> "
"[reserved_throughput|total_throughput] <value>]",
"Get or modify the throughput quota for the specified tag.");
"[reserved_throughput|total_throughput] <value> | clear <tag>]",
"Get, modify, or clear the throughput quota for the specified tag.");
helpMap["reset"] =
CommandHelp("reset",
"reset the current transaction",

View File

@ -103,6 +103,59 @@ def maintenance(logger):
output3 = run_fdbcli_command('maintenance')
assert output3 == no_maintenance_output
@enable_logging()
def quota(logger):
# Should be a noop
command = 'quota clear green'
output = run_fdbcli_command(command)
logger.debug(command + ' : ' + output)
assert output == ''
command = 'quota get green total_throughput'
output = run_fdbcli_command(command)
logger.debug(command + ' : ' + output)
assert output == '<empty>'
# Ignored update
command = 'quota set red total_throughput 49152'
output = run_fdbcli_command(command)
logger.debug(command + ' : ' + output)
assert output == ''
command = 'quota set green total_throughput 32768'
output = run_fdbcli_command(command)
logger.debug(command + ' : ' + output)
assert output == ''
command = 'quota set green reserved_throughput 16384'
output = run_fdbcli_command(command)
logger.debug(command + ' : ' + output)
assert output == ''
command = 'quota get green total_throughput'
output = run_fdbcli_command(command)
logger.debug(command + ' : ' + output)
assert output == '32768'
command = 'quota get green reserved_throughput'
output = run_fdbcli_command(command)
logger.debug(command + ' : ' + output)
assert output == '16384'
command = 'quota clear green'
output = run_fdbcli_command(command)
logger.debug(command + ' : ' + output)
assert output == ''
command = 'quota get green total_throughput'
output = run_fdbcli_command(command)
logger.debug(command + ' : ' + output)
assert output == '<empty>'
# Too few arguments, should log help message
command = 'quota get green'
output = run_fdbcli_command(command)
logger.debug(command + ' : ' + output)
@enable_logging()
def setclass(logger):
@ -1035,6 +1088,7 @@ if __name__ == '__main__':
integer_options()
tls_address_suffix()
knobmanagement()
quota()
else:
assert args.process_number > 1, "Process number should be positive"
coordinators()

View File

@ -145,13 +145,13 @@ Value ThrottleApi::TagQuotaValue::toValue() const {
ThrottleApi::TagQuotaValue ThrottleApi::TagQuotaValue::fromValue(ValueRef value) {
auto tuple = Tuple::unpack(value);
if (tuple.size() != 4) {
if (tuple.size() != 2) {
throw invalid_throttle_quota_value();
}
TagQuotaValue result;
try {
result.reservedQuota = tuple.getDouble(0);
result.totalQuota = tuple.getDouble(1);
result.reservedQuota = tuple.getInt(0);
result.totalQuota = tuple.getInt(1);
} catch (Error& e) {
TraceEvent(SevWarnAlways, "TagQuotaValueFailedToDeserialize").error(e);
throw invalid_throttle_quota_value();

View File

@ -597,8 +597,8 @@ Future<Void> enableAuto(Reference<DB> db, bool enabled) {
class TagQuotaValue {
public:
double reservedQuota{ 0.0 };
double totalQuota{ 0.0 };
int64_t reservedQuota{ 0 };
int64_t totalQuota{ 0 };
bool isValid() const;
Value toValue() const;
static TagQuotaValue fromValue(ValueRef);

View File

@ -2802,6 +2802,7 @@ static std::set<int> const& normalWorkerErrors() {
if (s.empty()) {
s.insert(error_code_please_reboot);
s.insert(error_code_please_reboot_delete);
s.insert(error_code_local_config_changed);
}
return s;
}

View File

@ -83,7 +83,7 @@ Future<Optional<T>> stopAfter(Future<T> what) {
ret = Optional<T>(_);
} catch (Error& e) {
bool ok = e.code() == error_code_please_reboot || e.code() == error_code_please_reboot_delete ||
e.code() == error_code_actor_cancelled;
e.code() == error_code_actor_cancelled || e.code() == error_code_local_config_changed;
TraceEvent(ok ? SevInfo : SevError, "StopAfterError").error(e);
if (!ok) {
fprintf(stderr, "Fatal Error: %s\n", e.what());