fdbcli: Add options for knob management

- setknob <knob_name> <knob_value> [config_class]
- getknob <knob_name> [config_class]
- Added new option to begin to specify if it's a configuration txn. Syntax is begin [config-txn]
- Added utility function for converting tuples to string
- Added knobmanagment test in fdbcli_tests.py
This commit is contained in:
Kevin Hoxha 2022-10-04 15:25:56 -07:00 committed by Lukas Joswiak
parent 2ba98f3a60
commit ff1b2df8f6
7 changed files with 296 additions and 22 deletions

View File

@ -499,11 +499,14 @@ void initHelp() {
"transaction, and are automatically committed for you. By explicitly beginning a transaction, "
"successive operations are all performed as part of a single transaction.\n\nTo commit the "
"transaction, use the commit command. To discard the transaction, use the reset command.");
helpMap["commit"] = CommandHelp("commit",
helpMap["commit"] = CommandHelp("commit [description]",
"commit the current transaction",
"Any sets or clears executed after the start of the current transaction will be "
"committed to the database. On success, the committed version number is displayed. "
"If commit fails, the error is displayed and the transaction must be retried.");
"If commit fails, the error is displayed and the transaction must be retried. The "
"command optionally allows for a description in case the transaction targets the "
"configuration database. If no description is provided in the command, a prompt "
"will be shown asking for a relevant description of the configuration change");
helpMap["clear"] = CommandHelp(
"clear <KEY>",
"clear a key from the database",
@ -552,6 +555,14 @@ void initHelp() {
helpMap["set"] = CommandHelp("set <KEY> <VALUE>",
"set a value for a given key",
"If KEY is not already present in the database, it will be created." ESCAPINGKV);
helpMap["setknob"] = CommandHelp("setknob <KEY> <VALUE> [CONFIG_CLASS]",
"updates a knob to specified value",
"setknob will prompt for a descrption of the changes" ESCAPINGKV);
helpMap["getknob"] = CommandHelp(
"getknob <KEY> [CONFIG_CLASS]", "gets the value of the specified knob", "CONFIG_CLASS is optional." ESCAPINGK);
helpMap["option"] = CommandHelp(
"option <STATE> <OPTION> <ARG>",
"enables or disables an option",
@ -1050,12 +1061,17 @@ Future<T> stopNetworkAfter(Future<T> what) {
}
}
enum TransType { Db = 0, Config, None };
ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterConnectionFile> ccf) {
state LineNoise& linenoise = *plinenoise;
state bool intrans = false;
state TransType transtype = TransType::None;
state bool isCommitDesc = false;
state Database localDb;
state Reference<IDatabase> db;
state Reference<IDatabase> configDb;
state Reference<ITenant> tenant;
state Optional<TenantName> tenantName;
state Optional<TenantMapEntry> tenantEntry;
@ -1064,6 +1080,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
state const Reference<ITenant> managementTenant;
state Reference<ITransaction> tr;
state Reference<ITransaction> config_tr;
state Transaction trx;
state bool writeMode = false;
@ -1085,6 +1102,8 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
printf("Using cluster file `%s'.\n", ccf->getLocation().c_str());
}
db = API->createDatabase(opt.clusterFile.c_str());
configDb = API->createDatabase(opt.clusterFile.c_str());
configDb->setOption(FDBDatabaseOptions::USE_CONFIG_DATABASE);
} catch (Error& e) {
fprintf(stderr, "ERROR: %s (%d)\n", e.what(), e.code());
printf("Unable to connect to cluster from `%s'\n", ccf->getLocation().c_str());
@ -1442,23 +1461,46 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
} else {
activeOptions = FdbOptions(globalOptions);
options = &activeOptions;
getTransaction(db, tenant, tr, options, false);
intrans = true;
transtype = TransType::None;
getTransaction(db, tenant, tr, options, false);
printf("Transaction started\n");
}
continue;
}
if (tokencmp(tokens[0], "commit")) {
if (tokens.size() != 1) {
if (tokens.size() > 2) {
printUsage(tokens[0]);
is_error = true;
} else if (!intrans) {
fprintf(stderr, "ERROR: No active transaction\n");
is_error = true;
} else {
wait(commitTransaction(tr));
if (isCommitDesc && tokens.size() == 1) {
// prompt for description and add to txn
state Optional<std::string> raw;
while (!raw.present() || raw.get().empty()) {
fprintf(stdout,
"Please set a description for the change. Description must be non-empty.\n");
state Optional<std::string> rawline =
wait(makeInterruptable(linenoise.read("description: ")));
raw = rawline;
}
std::string line = raw.get();
config_tr->set("\xff\xff/description"_sr, line);
}
if (transtype == TransType::Db) {
wait(commitTransaction(tr));
} else {
if (tokens.size() > 1) {
config_tr->set("\xff\xff/description"_sr, tokens[1]);
}
wait(commitTransaction(config_tr));
}
isCommitDesc = false;
intrans = false;
transtype = TransType::None;
options = &globalOptions;
}
@ -1481,10 +1523,16 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
fprintf(stderr, "ERROR: No active transaction\n");
is_error = true;
} else {
tr->reset();
activeOptions = FdbOptions(globalOptions);
options = &activeOptions;
options->apply(tr);
if (transtype == TransType::Config) {
config_tr->reset();
} else {
tr->reset();
activeOptions = FdbOptions(globalOptions);
options = &activeOptions;
options->apply(tr);
}
isCommitDesc = false;
transtype = TransType::None;
printf("Transaction reset\n");
}
continue;
@ -1510,6 +1558,15 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
printUsage(tokens[0]);
is_error = true;
} else {
if (intrans) {
if (transtype == TransType::None) {
transtype = TransType::Db;
} else if (transtype == TransType::Config) {
fprintf(stderr, "ERROR: Cannot perform get in configuration transaction\n");
is_error = true;
continue;
}
}
state ThreadFuture<Optional<Value>> valueF =
getTransaction(db, tenant, tr, options, intrans)->get(tokens[1]);
Optional<Standalone<StringRef>> v = wait(makeInterruptable(safeThreadFutureToFuture(valueF)));
@ -1618,7 +1675,17 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
} else {
state int limit;
bool valid = true;
if (intrans) {
if (transtype == TransType::None) {
transtype = TransType::Db;
} else if (transtype == TransType::Config) {
fprintf(
stderr,
"ERROR: Cannot perform getrange or getrangekeys in configuration transaction\n");
is_error = true;
continue;
}
}
if (tokens.size() == 4) {
// INT_MAX is 10 digits; rather than
// worrying about overflow we'll just cap
@ -1707,6 +1774,15 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
printUsage(tokens[0]);
is_error = true;
} else {
if (intrans) {
if (transtype == TransType::None) {
transtype = TransType::Db;
} else if (transtype == TransType::Config) {
fprintf(stderr, "ERROR: Cannot perform set in configuration transaction\n");
is_error = true;
continue;
}
}
getTransaction(db, tenant, tr, options, intrans);
tr->set(tokens[1], tokens[2]);
@ -1717,6 +1793,91 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
continue;
}
if (tokencmp(tokens[0], "setknob")) {
if (tokens.size() > 4 || tokens.size() < 3) {
printUsage(tokens[0]);
is_error = true;
} else {
if (intrans) {
if (transtype == TransType::None) {
transtype = TransType::Config;
} else if (transtype == TransType::Db) {
fprintf(stderr, "ERROR: Cannot perform setknob in database transaction\n");
is_error = true;
isCommitDesc = false;
continue;
}
}
Tuple t;
if (tokens.size() == 4) {
t.append(tokens[3]);
} else {
t.appendNull();
}
t.append(tokens[1]);
getTransaction(configDb, tenant, config_tr, options, intrans);
config_tr->set(t.pack(), tokens[2]);
if (!intrans) {
// prompt for description and add to txn
state Optional<std::string> raw_desc;
while (!raw_desc.present() || raw_desc.get().empty()) {
fprintf(stdout,
"Please set a description for the change. Description must be non-empty\n");
state Optional<std::string> rawline_knob =
wait(makeInterruptable(linenoise.read("description: ")));
raw_desc = rawline_knob;
}
std::string line = raw_desc.get();
config_tr->set("\xff\xff/description"_sr, line);
wait(commitTransaction(config_tr));
} else {
isCommitDesc = true;
}
}
continue;
}
if (tokencmp(tokens[0], "getknob")) {
if (tokens.size() > 3 || tokens.size() < 2) {
printUsage(tokens[0]);
is_error = true;
} else {
if (intrans) {
if (transtype == TransType::None) {
transtype = TransType::Config;
} else if (transtype == TransType::Db) {
fprintf(stderr, "ERROR: Cannot perform getknob in database transaction\n");
is_error = true;
continue;
}
}
Tuple t;
if (tokens.size() == 2) {
t.appendNull();
} else {
t.append(tokens[2]);
}
t.append(tokens[1]);
state ThreadFuture<Optional<Value>> valueF_knob =
getTransaction(configDb, tenant, config_tr, options, intrans)->get(t.pack());
Optional<Standalone<StringRef>> v =
wait(makeInterruptable(safeThreadFutureToFuture(valueF_knob)));
std::string knob_class = printable(tokens[1]);
if (tokens.size() == 3) {
std::string config_class = (" in configuration class " + printable(tokens[2]));
knob_class += config_class;
}
if (v.present())
printf("`%s' is `%s'\n",
knob_class.c_str(),
Tuple::tupleToString(Tuple::unpack(v.get())).c_str());
else
printf("`%s' is not found\n", knob_class.c_str());
}
continue;
}
if (tokencmp(tokens[0], "clear")) {
if (!writeMode) {
fprintf(stderr, "ERROR: writemode must be enabled to set or clear keys in the database.\n");
@ -1728,6 +1889,15 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
printUsage(tokens[0]);
is_error = true;
} else {
if (intrans) {
if (transtype == TransType::None) {
transtype = TransType::Db;
} else if (transtype == TransType::Config) {
fprintf(stderr, "ERROR: Cannot perform clear in configuration transaction\n");
is_error = true;
continue;
}
}
getTransaction(db, tenant, tr, options, intrans);
tr->clear(tokens[1]);
@ -1749,6 +1919,15 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
printUsage(tokens[0]);
is_error = true;
} else {
if (intrans) {
if (transtype == TransType::None) {
transtype = TransType::Db;
} else if (transtype == TransType::Config) {
fprintf(stderr, "ERROR: Cannot perform clearrange in configuration transaction\n");
is_error = true;
continue;
}
}
getTransaction(db, tenant, tr, options, intrans);
tr->clear(KeyRangeRef(tokens[1], tokens[2]));
@ -1928,7 +2107,6 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise, Reference<ClusterCo
}
TraceEvent(SevInfo, "CLICommandLog", randomID).detail("Command", line).detail("IsError", is_error);
} catch (Error& e) {
if (e.code() == error_code_operation_cancelled) {
throw;

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
from cProfile import run
import sys
import shutil
import os
@ -334,6 +335,57 @@ def consistencycheck(logger):
assert output3 == consistency_check_on_output
@enable_logging()
def knobmanagement(logger):
# this test will set knobs and verify that the knobs are properly set
# must use begin/commit to avoid prompt for description
# Incorrect arguments
output = run_fdbcli_command('setknob')
assert output == "Usage: setknob <KEY> <VALUE> [CONFIG_CLASS]"
output = run_fdbcli_command('setknob', 'min_trace_severity')
assert output == "Usage: setknob <KEY> <VALUE> [CONFIG_CLASS]"
output = run_fdbcli_command('getknob')
assert output == "Usage: getknob <KEY> [CONFIG_CLASS]"
logger.debug("incorrect args passed")
# Invalid knob name
err = run_fdbcli_command_and_get_error('begin; setknob dummy_knob 20; commit \"fdbcli change\";')
logger.debug("err is: {}".format(err))
assert len(err) > 0
logger.debug("invalid knob name passed")
# Invalid type for knob
err = run_fdbcli_command_and_get_error('begin; setknob min_trace_severity dummy-text; commit \"fdbcli change\";')
logger.debug("err is: {}".format(err))
assert len(err) > 0
logger.debug("invalid knob type passed")
# Verifying we can't do a normal set, clear, get, getrange, clearrange
# with a setknob
err = run_fdbcli_command_and_get_error('writemode on; begin; set foo bar; setknob max_metric_size 1000; commit;')
logger.debug("err is: {}".format(err))
assert len(err) > 0
err = run_fdbcli_command_and_get_error('writemode on; begin; clear foo; setknob max_metric_size 1000; commit')
logger.debug("err is: {}".format(err))
assert len(err) > 0
# Various setknobs and verified by getknob
output = run_fdbcli_command('begin; setknob min_trace_severity 30; setknob max_metric_size 1000; \
setknob tracing_udp_listener_addr 192.168.0.1; \
setknob tracing_sample_rate 0.3; \
commit \"This is an fdbcli test for knobs\";')
assert "Committed" in output
output = run_fdbcli_command('getknob', 'min_trace_severity')
assert r"`min_trace_severity' is `30'" == output
output = run_fdbcli_command('getknob', 'max_metric_size')
assert r"`max_metric_size' is `1000'" == output
output = run_fdbcli_command('getknob', 'tracing_udp_listener_addr')
assert r"`tracing_udp_listener_addr' is `'192.168.0.1''" == output
output = run_fdbcli_command('getknob', 'tracing_sample_rate')
assert r"`tracing_sample_rate' is `0.300000'" == output
@enable_logging()
def cache_range(logger):
# this command is currently experimental
@ -983,6 +1035,7 @@ if __name__ == '__main__':
versionepoch()
integer_options()
tls_address_suffix()
knobmanagement()
else:
assert args.process_number > 1, "Process number should be positive"
coordinators()

View File

@ -44,19 +44,20 @@ ConfigKey ConfigKeyRef::decodeKey(KeyRef const& key) {
}
Value KnobValueRef::ToValueFunc::operator()(int v) const {
return BinaryWriter::toValue(v, Unversioned());
// return BinaryWriter::toValue(v, Unversioned());
return Tuple::makeTuple(v).pack();
}
Value KnobValueRef::ToValueFunc::operator()(int64_t v) const {
return BinaryWriter::toValue(v, Unversioned());
return Tuple::makeTuple(v).pack();
}
Value KnobValueRef::ToValueFunc::operator()(bool v) const {
return BinaryWriter::toValue(v, Unversioned());
return Tuple::makeTuple(v).pack();
}
Value KnobValueRef::ToValueFunc::operator()(ValueRef v) const {
return v;
return Tuple::makeTuple(v).pack();
}
Value KnobValueRef::ToValueFunc::operator()(double v) const {
return BinaryWriter::toValue(v, Unversioned());
return Tuple::makeTuple(v).pack();
}
KnobValue KnobValueRef::CreatorFunc::operator()(NoKnobFound) const {

View File

@ -99,6 +99,48 @@ Tuple Tuple::unpack(StringRef const& str, bool exclude_incomplete) {
return Tuple(str, exclude_incomplete);
}
std::string Tuple::tupleToString(const Tuple& tuple) {
std::string str;
if (tuple.size() > 1) {
str += "(";
}
for (int i = 0; i < tuple.size(); ++i) {
Tuple::ElementType type = tuple.getType(i);
if (type == Tuple::NULL_TYPE) {
str += "NULL";
} else if (type == Tuple::BYTES || type == Tuple::UTF8) {
if (type == Tuple::UTF8) {
str += "u";
}
str += "\'" + tuple.getString(i).printable() + "\'";
} else if (type == Tuple::INT) {
str += format("%ld", tuple.getInt(i));
} else if (type == Tuple::FLOAT) {
str += format("%f", tuple.getFloat(i));
} else if (type == Tuple::DOUBLE) {
str += format("%f", tuple.getDouble(i));
} else if (type == Tuple::BOOL) {
str += tuple.getBool(i) ? "true" : "false";
} else if (type == Tuple::VERSIONSTAMP) {
TupleVersionstamp versionstamp = tuple.getVersionstamp(i);
str += format("Transaction Version: '%ld', BatchNumber: '%hd', UserVersion : '%hd'",
versionstamp.getVersion(),
versionstamp.getBatchNumber(),
versionstamp.getUserVersion());
} else {
ASSERT(false);
}
if (i < tuple.size() - 1) {
str += ", ";
}
}
if (tuple.size() > 1) {
str += ")";
}
return str;
}
Tuple Tuple::unpackUserType(StringRef const& str, bool exclude_incomplete) {
return Tuple(str, exclude_incomplete, true);
}

View File

@ -48,6 +48,7 @@ struct Tuple {
// Note that strings can't be incomplete because they are parsed such that the end of the packed
// byte string is considered the end of the string in lieu of a specific end.
static Tuple unpack(StringRef const& str, bool exclude_incomplete = false);
static std::string tupleToString(Tuple const& tuple);
static Tuple unpackUserType(StringRef const& str, bool exclude_incomplete = false);
Tuple& append(Tuple const& tuple);

View File

@ -2727,8 +2727,7 @@ ACTOR Future<Void> workerServer(Reference<IClusterConnectionRecord> connRecord,
f.cancel();
state Error e = err;
bool ok = e.code() == error_code_please_reboot || e.code() == error_code_actor_cancelled ||
e.code() == error_code_please_reboot_delete;
e.code() == error_code_please_reboot_delete || e.code() == error_code_local_config_changed;
endRole(Role::WORKER, interf.id(), "WorkerError", ok, e);
errorForwarders.clear(false);
sharedLogs.clear();

View File

@ -85,8 +85,8 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
init( WRITE_TRACING_ENABLED, true ); if( randomize && BUGGIFY ) WRITE_TRACING_ENABLED = false;
init( TRACING_SPAN_ATTRIBUTES_ENABLED, false ); // Additional K/V and tenant data added to Span Attributes
init( TRACING_SAMPLE_RATE, 0.0 ); // Fraction of distributed traces (not spans) to sample (0 means ignore all traces)
init( TRACING_UDP_LISTENER_ADDR, "127.0.0.1" ); // Only applicable if TracerType is set to a network option
init( TRACING_SAMPLE_RATE, 0.0); // Fraction of distributed traces (not spans) to sample (0 means ignore all traces)
init( TRACING_UDP_LISTENER_ADDR, "127.0.0.1"); // Only applicable if TracerType is set to a network option
init( TRACING_UDP_LISTENER_PORT, 8889 ); // Only applicable if TracerType is set to a network option
//connectionMonitor
@ -229,7 +229,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
init( ZERO_LENGTH_FILE_PAD, 1 );
init( TRACE_FLUSH_INTERVAL, 0.25 );
init( TRACE_RETRY_OPEN_INTERVAL, 1.00 );
init( MIN_TRACE_SEVERITY, isSimulated ? 1 : 10 ); // Related to the trace severity in Trace.h
init( MIN_TRACE_SEVERITY, isSimulated ? 1 : 10, Atomic::NO ); // Related to the trace severity in Trace.h
init( MAX_TRACE_SUPPRESSIONS, 1e4 );
init( TRACE_DATETIME_ENABLED, true ); // trace time in human readable format (always real time)
init( TRACE_SYNC_ENABLED, 0 );
@ -245,7 +245,7 @@ void FlowKnobs::initialize(Randomize randomize, IsSimulated isSimulated) {
//TDMetrics
init( MAX_METRICS, 600 );
init( MAX_METRIC_SIZE, 2500 );
init( MAX_METRIC_SIZE, 2500, Atomic::NO );
init( MAX_METRIC_LEVEL, 25 );
init( METRIC_LEVEL_DIVISOR, log(4) );
init( METRIC_LIMIT_START_QUEUE_SIZE, 10 ); // The queue size at which to start restricting logging by disabling levels