Merge pull request #4996 from sfc-gh-clin/add-fdbcli-tests

Add test coverage for more fdbcli commands
This commit is contained in:
Chaoguang Lin 2021-06-25 12:01:45 -07:00 committed by GitHub
commit c559621ac6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 273 additions and 9 deletions

View File

@ -4,6 +4,9 @@ import sys
import subprocess
import logging
import functools
import json
import time
import random
def enable_logging(level=logging.ERROR):
"""Enable logging in the function with the specified logging level
@ -24,7 +27,7 @@ def enable_logging(level=logging.ERROR):
handler.setLevel(level)
logger.addHandler(handler)
# pass the logger to the decorated function
result = func(logger, *args,**kwargs)
result = func(logger, *args, **kwargs)
return result
return wrapper
return func_decorator
@ -38,6 +41,15 @@ def run_fdbcli_command(*args):
commands = command_template + ["{}".format(' '.join(args))]
return subprocess.run(commands, stdout=subprocess.PIPE).stdout.decode('utf-8').strip()
def run_fdbcli_command_and_get_error(*args):
"""run the fdbcli statement: fdbcli --exec '<arg1> <arg2> ... <argN>'.
Returns:
string: Stderr output from fdbcli
"""
commands = command_template + ["{}".format(' '.join(args))]
return subprocess.run(commands, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stderr.decode('utf-8').strip()
@enable_logging()
def advanceversion(logger):
# get current read version
@ -82,6 +94,245 @@ def maintenance(logger):
output3 = run_fdbcli_command('maintenance')
assert output3 == no_maintenance_output
@enable_logging()
def setclass(logger):
output1 = run_fdbcli_command('setclass')
class_type_line_1 = output1.split('\n')[-1]
logger.debug(class_type_line_1)
# check process' network address
assert '127.0.0.1' in class_type_line_1
network_address = ':'.join(class_type_line_1.split(':')[:2])
logger.debug("Network address: {}".format(network_address))
# check class type
assert 'unset' in class_type_line_1
# check class source
assert 'command_line' in class_type_line_1
# set class to a random valid type
class_types = ['storage', 'storage', 'transaction', 'resolution',
'commit_proxy', 'grv_proxy', 'master', 'stateless', 'log',
'router', 'cluster_controller', 'fast_restore', 'data_distributor',
'coordinator', 'ratekeeper', 'storage_cache', 'backup'
]
random_class_type = random.choice(class_types)
logger.debug("Change to type: {}".format(random_class_type))
run_fdbcli_command('setclass', network_address, random_class_type)
# check the set successful
output2 = run_fdbcli_command('setclass')
class_type_line_2 = output2.split('\n')[-1]
logger.debug(class_type_line_2)
# check process' network address
assert network_address in class_type_line_2
# check class type changed to the specified value
assert random_class_type in class_type_line_2
# check class source
assert 'set_class' in class_type_line_2
# set back to default
run_fdbcli_command('setclass', network_address, 'default')
# everything should be back to the same as before
output3 = run_fdbcli_command('setclass')
class_type_line_3 = output3.split('\n')[-1]
logger.debug(class_type_line_3)
assert class_type_line_3 == class_type_line_1
@enable_logging()
def lockAndUnlock(logger):
# lock an unlocked database, should be successful
output1 = run_fdbcli_command('lock')
# UID is 32 bytes
lines = output1.split('\n')
lock_uid = lines[0][-32:]
assert lines[1] == 'Database locked.'
logger.debug("UID: {}".format(lock_uid))
assert get_value_from_status_json(True, 'cluster', 'database_lock_state', 'locked')
# lock a locked database, should get the error code 1038
output2 = run_fdbcli_command_and_get_error("lock")
assert output2 == 'ERROR: Database is locked (1038)'
# unlock the database
process = subprocess.Popen(command_template + ['unlock ' + lock_uid], stdin = subprocess.PIPE, stdout = subprocess.PIPE)
line1 = process.stdout.readline()
# The randome passphrease we need to confirm to proceed the unlocking
line2 = process.stdout.readline()
logger.debug("Random passphrase: {}".format(line2))
output3, err = process.communicate(input=line2)
# No error and unlock was successful
assert err is None
assert output3.decode('utf-8').strip() == 'Database unlocked.'
assert not get_value_from_status_json(True, 'cluster', 'database_lock_state', 'locked')
@enable_logging()
def kill(logger):
output1 = run_fdbcli_command('kill')
lines = output1.split('\n')
assert len(lines) == 2
assert lines[1].startswith('127.0.0.1:')
address = lines[1]
logger.debug("Address: {}".format(address))
old_generation = get_value_from_status_json(False, 'cluster', 'generation')
# This is currently an issue with fdbcli,
# where you need to first run 'kill' to initialize processes' list
# and then specify the certain process to kill
process = subprocess.Popen(command_template[:-1], stdin = subprocess.PIPE, stdout = subprocess.PIPE)
#
output2, err = process.communicate(input='kill; kill {}\n'.format(address).encode())
logger.debug(output2)
# wait for a second for the cluster recovery
time.sleep(1)
new_generation = get_value_from_status_json(True, 'cluster', 'generation')
logger.debug("Old: {}, New: {}".format(old_generation, new_generation))
assert new_generation > old_generation
@enable_logging()
def suspend(logger):
output1 = run_fdbcli_command('suspend')
lines = output1.split('\n')
assert len(lines) == 2
assert lines[1].startswith('127.0.0.1:')
address = lines[1]
logger.debug("Address: {}".format(address))
db_available = get_value_from_status_json(False, 'client', 'database_status', 'available')
assert db_available
# use pgrep to get the pid of the fdb process
pinfos = subprocess.check_output(['pgrep', '-a', 'fdbserver']).decode().strip().split('\n')
port = address.split(':')[1]
logger.debug("Port: {}".format(port))
# use the port number to find the exact fdb process we are connecting to
pinfo = list(filter(lambda x: port in x, pinfos))
assert len(pinfo) == 1
pid = pinfo[0].split(' ')[0]
logger.debug("Pid: {}".format(pid))
process = subprocess.Popen(command_template[:-1], stdin = subprocess.PIPE, stdout = subprocess.PIPE)
# suspend the process for enough long time
output2, err = process.communicate(input='suspend; suspend 3600 {}\n'.format(address).encode())
# the cluster should be unavailable after the only process being suspended
assert not get_value_from_status_json(False, 'client', 'database_status', 'available')
# check the process pid still exists
pids = subprocess.check_output(['pidof', 'fdbserver']).decode().strip()
logger.debug("PIDs: {}".format(pids))
assert pid in pids
# kill the process by pid
kill_output = subprocess.check_output(['kill', pid]).decode().strip()
logger.debug("Kill result: {}".format(kill_output))
# The process should come back after a few time
duration = 0 # seconds we already wait
while not get_value_from_status_json(False, 'client', 'database_status', 'available') and duration < 60:
logger.debug("Sleep for 1 second to wait cluster recovery")
time.sleep(1)
duration += 1
# at most after 60 seconds, the cluster should be available
assert get_value_from_status_json(False, 'client', 'database_status', 'available')
def get_value_from_status_json(retry, *args):
while True:
result = json.loads(run_fdbcli_command('status', 'json'))
if result['client']['database_status']['available'] or not retry:
break
for arg in args:
assert arg in result
result = result[arg]
return result
@enable_logging()
def consistencycheck(logger):
consistency_check_on_output = 'ConsistencyCheck is on'
consistency_check_off_output = 'ConsistencyCheck is off'
output1 = run_fdbcli_command('consistencycheck')
assert output1 == consistency_check_on_output
run_fdbcli_command('consistencycheck', 'off')
output2 = run_fdbcli_command('consistencycheck')
assert output2 == consistency_check_off_output
run_fdbcli_command('consistencycheck', 'on')
output3 = run_fdbcli_command('consistencycheck')
assert output3 == consistency_check_on_output
@enable_logging()
def cache_range(logger):
# this command is currently experimental
# just test we can set and clear the cached range
run_fdbcli_command('cache_range', 'set', 'a', 'b')
run_fdbcli_command('cache_range', 'clear', 'a', 'b')
@enable_logging()
def datadistribution(logger):
output1 = run_fdbcli_command('datadistribution', 'off')
assert output1 == 'Data distribution is turned off.'
output2 = run_fdbcli_command('datadistribution', 'on')
assert output2 == 'Data distribution is turned on.'
output3 = run_fdbcli_command('datadistribution', 'disable', 'ssfailure')
assert output3 == 'Data distribution is disabled for storage server failures.'
# While we disable ssfailure, maintenance should fail
error_msg = run_fdbcli_command_and_get_error('maintenance', 'on', 'fake_zone_id', '1')
assert error_msg == "ERROR: Maintenance mode cannot be used while data distribution is disabled for storage server failures. Use 'datadistribution on' to reenable data distribution."
output4 = run_fdbcli_command('datadistribution', 'enable', 'ssfailure')
assert output4 == 'Data distribution is enabled for storage server failures.'
output5 = run_fdbcli_command('datadistribution', 'disable', 'rebalance')
assert output5 == 'Data distribution is disabled for rebalance.'
output6 = run_fdbcli_command('datadistribution', 'enable', 'rebalance')
assert output6 == 'Data distribution is enabled for rebalance.'
time.sleep(1)
@enable_logging()
def transaction(logger):
"""This test will cover the transaction related fdbcli commands.
In particular,
'begin', 'rollback', 'option'
'getversion', 'get', 'getrange', 'clear', 'clearrange', 'set', 'commit'
"""
err1 = run_fdbcli_command_and_get_error('set', 'key', 'value')
assert err1 == 'ERROR: writemode must be enabled to set or clear keys in the database.'
process = subprocess.Popen(command_template[:-1], stdin = subprocess.PIPE, stdout = subprocess.PIPE)
transaction_flow = ['writemode on', 'begin', 'getversion', 'set key value', 'get key', 'commit']
output1, _ = process.communicate(input='\n'.join(transaction_flow).encode())
# split the output into lines
lines = list(filter(len, output1.decode().split('\n')))[-4:]
assert lines[0] == 'Transaction started'
read_version = int(lines[1])
logger.debug("Read version {}".format(read_version))
# line[1] is the printed read version
assert lines[2] == "`key' is `value'"
assert lines[3].startswith('Committed (') and lines[3].endswith(')')
# validate commit version is larger than the read version
commit_verion = int(lines[3][len('Committed ('):-1])
logger.debug("Commit version: {}".format(commit_verion))
assert commit_verion >= read_version
# check the transaction is committed
output2 = run_fdbcli_command('get', 'key')
assert output2 == "`key' is `value'"
# test rollback and read-your-write behavior
process = subprocess.Popen(command_template[:-1], stdin = subprocess.PIPE, stdout = subprocess.PIPE)
transaction_flow = [
'writemode on', 'begin', 'getrange a z',
'clear key', 'get key',
# 'option on READ_YOUR_WRITES_DISABLE', 'get key',
'rollback'
]
output3, _ = process.communicate(input='\n'.join(transaction_flow).encode())
lines = list(filter(len, output3.decode().split('\n')))[-5:]
# lines[0] == "Transaction started" and lines[1] == 'Range limited to 25 keys'
assert lines[2] == "`key' is `value'"
assert lines[3] == "`key': not found"
assert lines[4] == "Transaction rolled back"
# make sure the rollback works
output4 = run_fdbcli_command('get', 'key')
assert output4 == "`key' is `value'"
# test read_your_write_disable option and clear the inserted key
process = subprocess.Popen(command_template[:-1], stdin = subprocess.PIPE, stdout = subprocess.PIPE)
transaction_flow = [
'writemode on', 'begin',
'option on READ_YOUR_WRITES_DISABLE',
'clear key', 'get key',
'commit'
]
output6, _ = process.communicate(input='\n'.join(transaction_flow).encode())
lines = list(filter(len, output6.decode().split('\n')))[-4:]
assert lines[1] == 'Option enabled for current transaction'
# the get key should still return the value even we clear it in the transaction
assert lines[2] == "`key' is `value'"
# Check the transaction is committed
output7 = run_fdbcli_command('get', 'key')
assert output7 == "`key': not found"
if __name__ == '__main__':
# fdbcli_tests.py <path_to_fdbcli_binary> <path_to_fdb_cluster_file>
assert len(sys.argv) == 3, "Please pass arguments: <path_to_fdbcli_binary> <path_to_fdb_cluster_file>"
@ -90,4 +341,13 @@ if __name__ == '__main__':
# tests for fdbcli commands
# assertions will fail if fdbcli does not work as expected
advanceversion()
cache_range()
consistencycheck()
datadistribution()
kill()
lockAndUnlock()
maintenance()
setclass()
suspend()
transaction()

View File

@ -80,8 +80,9 @@ ACTOR Future<bool> clearHealthyZone(Reference<IDatabase> db,
ASSERT(res.size() <= 1);
if (!clearSSFailureZoneString && res.size() == 1 && res[0].key == fdb_cli::ignoreSSFailureSpecialKey) {
if (printWarning) {
printf("ERROR: Maintenance mode cannot be used while data distribution is disabled for storage "
"server failures. Use 'datadistribution on' to reenable data distribution.\n");
fprintf(stderr,
"ERROR: Maintenance mode cannot be used while data distribution is disabled for storage "
"server failures. Use 'datadistribution on' to reenable data distribution.\n");
}
return false;
}
@ -112,8 +113,9 @@ ACTOR Future<bool> setHealthyZone(Reference<IDatabase> db,
ASSERT(res.size() <= 1);
if (res.size() == 1 && res[0].key == fdb_cli::ignoreSSFailureSpecialKey) {
if (printWarning) {
printf("ERROR: Maintenance mode cannot be used while data distribution is disabled for storage "
"server failures. Use 'datadistribution on' to reenable data distribution.\n");
fprintf(stderr,
"ERROR: Maintenance mode cannot be used while data distribution is disabled for storage "
"server failures. Use 'datadistribution on' to reenable data distribution.\n");
}
return false;
}

View File

@ -566,7 +566,8 @@ void initHelp() {
"change the class of a process",
"If no address and class are specified, lists the classes of all servers.\n\nSetting the class to "
"`default' resets the process class to the class specified on the command line. The available "
"classes are `unset', `storage', `transaction', `resolution', `proxy', `master', `test', `unset', "
"classes are `unset', `storage', `transaction', `resolution', `commit_proxy', `grv_proxy', "
"`master', `test', "
"`stateless', `log', `router', `cluster_controller', `fast_restore', `data_distributor', "
"`coordinator', `ratekeeper', `storage_cache', `backup', and `default'.");
helpMap["status"] =
@ -3604,9 +3605,10 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
state std::string passPhrase = deterministicRandom()->randomAlphaNumeric(10);
warn.cancel(); // don't warn while waiting on user input
printf("Unlocking the database is a potentially dangerous operation.\n");
Optional<std::string> input = wait(linenoise.read(
format("Repeat the following passphrase if you would like to proceed (%s) : ",
passPhrase.c_str())));
printf("%s\n", passPhrase.c_str());
fflush(stdout);
Optional<std::string> input =
wait(linenoise.read(format("Repeat the above passphrase if you would like to proceed:")));
warn = checkStatus(timeWarning(5.0, "\nWARNING: Long delay (Ctrl-C to interrupt)\n"), db);
if (input.present() && input.get() == passPhrase) {
UID unlockUID = UID::fromString(tokens[1].toString());