Upgrade Tests: Scripting download of old libraries and cluster upgrades; Specifying external library directory in C API Tester
This commit is contained in:
parent
70c60c69b8
commit
0f9ebaae37
|
@ -35,6 +35,7 @@ public:
|
|||
std::string traceFormat;
|
||||
std::string logGroup;
|
||||
std::string externalClientLibrary;
|
||||
std::string externalClientDir;
|
||||
std::string testFile;
|
||||
int numFdbThreads;
|
||||
int numClientThreads;
|
||||
|
|
|
@ -45,6 +45,7 @@ enum TesterOptionId {
|
|||
OPT_TRACE_FORMAT,
|
||||
OPT_KNOB,
|
||||
OPT_EXTERNAL_CLIENT_LIBRARY,
|
||||
OPT_EXTERNAL_CLIENT_DIRECTORY,
|
||||
OPT_TEST_FILE
|
||||
};
|
||||
|
||||
|
@ -59,6 +60,7 @@ CSimpleOpt::SOption TesterOptionDefs[] = //
|
|||
{ OPT_TRACE_FORMAT, "--trace-format", SO_REQ_SEP },
|
||||
{ OPT_KNOB, "--knob-", SO_REQ_SEP },
|
||||
{ OPT_EXTERNAL_CLIENT_LIBRARY, "--external-client-library", SO_REQ_SEP },
|
||||
{ OPT_EXTERNAL_CLIENT_DIRECTORY, "--external-client-dir", SO_REQ_SEP },
|
||||
{ OPT_TEST_FILE, "-f", SO_REQ_SEP },
|
||||
{ OPT_TEST_FILE, "--test-file", SO_REQ_SEP },
|
||||
SO_END_OF_OPTIONS };
|
||||
|
@ -84,6 +86,8 @@ void printProgramUsage(const char* execName) {
|
|||
" Changes a knob option. KNOBNAME should be lowercase.\n"
|
||||
" --external-client-library FILE\n"
|
||||
" Path to the external client library.\n"
|
||||
" --external-client-dir DIR\n"
|
||||
" Directory containing external client libraries.\n"
|
||||
" -f, --test-file FILE\n"
|
||||
" Test file to run.\n"
|
||||
" -h, --help Display this help and exit.\n");
|
||||
|
@ -139,7 +143,9 @@ bool processArg(TesterOptions& options, const CSimpleOpt& args) {
|
|||
case OPT_EXTERNAL_CLIENT_LIBRARY:
|
||||
options.externalClientLibrary = args.OptionArg();
|
||||
break;
|
||||
|
||||
case OPT_EXTERNAL_CLIENT_DIRECTORY:
|
||||
options.externalClientDir = args.OptionArg();
|
||||
break;
|
||||
case OPT_TEST_FILE:
|
||||
options.testFile = args.OptionArg();
|
||||
options.testSpec = readTomlTestSpec(options.testFile);
|
||||
|
@ -184,6 +190,10 @@ void applyNetworkOptions(TesterOptions& options) {
|
|||
fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_DISABLE_LOCAL_CLIENT));
|
||||
fdb_check(
|
||||
FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_EXTERNAL_CLIENT_LIBRARY, options.externalClientLibrary));
|
||||
} else if (!options.externalClientDir.empty()) {
|
||||
fdb_check(FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_DISABLE_LOCAL_CLIENT));
|
||||
fdb_check(
|
||||
FdbApi::setOption(FDBNetworkOption::FDB_NET_OPTION_EXTERNAL_CLIENT_DIRECTORY, options.externalClientDir));
|
||||
}
|
||||
|
||||
if (options.testSpec.multiThreaded) {
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
import json
|
||||
from pathlib import Path
|
||||
import random
|
||||
import string
|
||||
import subprocess
|
||||
import os
|
||||
import socket
|
||||
import time
|
||||
|
||||
|
||||
def _get_free_port_internal():
|
||||
|
@ -24,6 +26,12 @@ def get_free_port():
|
|||
return port
|
||||
|
||||
|
||||
def is_port_in_use(port):
|
||||
import socket
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
return s.connect_ex(('localhost', port)) == 0
|
||||
|
||||
|
||||
valid_letters_for_secret = string.ascii_letters + string.digits
|
||||
|
||||
|
||||
|
@ -98,9 +106,11 @@ logdir = {logdir}
|
|||
for _ in range(self.process_number)]
|
||||
self.cluster_desc = random_secret_string(8)
|
||||
self.cluster_secret = random_secret_string(8)
|
||||
self.env_vars = {}
|
||||
self.running = False
|
||||
self.process = None
|
||||
self.fdbmonitor_logfile = None
|
||||
self.use_legacy_conf_syntax = False
|
||||
if create_config:
|
||||
self.create_cluster_file()
|
||||
self.save_config()
|
||||
|
@ -115,7 +125,10 @@ logdir = {logdir}
|
|||
def save_config(self):
|
||||
new_conf_file = self.conf_file.parent / (self.conf_file.name + '.new')
|
||||
with open(new_conf_file, 'x') as f:
|
||||
f.write(LocalCluster.configuration_template.format(
|
||||
conf_template = LocalCluster.configuration_template
|
||||
if (self.use_legacy_conf_syntax):
|
||||
conf_template = conf_template.replace("-", "_")
|
||||
f.write(conf_template.format(
|
||||
etcdir=self.etc,
|
||||
fdbserver_bin=self.fdbserver_binary,
|
||||
datadir=self.data,
|
||||
|
@ -154,14 +167,29 @@ logdir = {logdir}
|
|||
self.fdbmonitor_logfile = open(
|
||||
self.log.joinpath('fdbmonitor.log'), 'w')
|
||||
self.process = subprocess.Popen(
|
||||
args, stdout=self.fdbmonitor_logfile, stderr=self.fdbmonitor_logfile)
|
||||
args, stdout=self.fdbmonitor_logfile, stderr=self.fdbmonitor_logfile, env=self.process_env())
|
||||
self.running = True
|
||||
|
||||
def stop_cluster(self):
|
||||
assert self.running, "Server is not running"
|
||||
if self.process.poll() is None:
|
||||
self.process.terminate()
|
||||
self.running = True
|
||||
self.running = False
|
||||
|
||||
def ensure_ports_released(self, timeout_sec=5):
|
||||
sec = 0
|
||||
while (sec < timeout_sec):
|
||||
in_use = False
|
||||
for port in self.server_ports:
|
||||
if is_port_in_use(port):
|
||||
print("Port {} in use. Waiting for it to be released".format(port))
|
||||
in_use = True
|
||||
break
|
||||
if not in_use:
|
||||
return
|
||||
time.sleep(0.5)
|
||||
sec += 0.5
|
||||
assert False, "Failed to release ports in {}s".format(timeout_sec)
|
||||
|
||||
def __enter__(self):
|
||||
self.start_cluster()
|
||||
|
@ -169,9 +197,27 @@ logdir = {logdir}
|
|||
|
||||
def __exit__(self, xc_type, exc_value, traceback):
|
||||
self.stop_cluster()
|
||||
self.running = False
|
||||
|
||||
def create_database(self, storage='ssd'):
|
||||
args = [self.fdbcli_binary, '-C', self.cluster_file, '--exec',
|
||||
'configure new single {}'.format(storage)]
|
||||
subprocess.run(args)
|
||||
res = subprocess.run(args, env=self.process_env())
|
||||
assert res.returncode == 0, "Create database failed with {}".format(
|
||||
res.returncode)
|
||||
|
||||
def get_status(self):
|
||||
args = [self.fdbcli_binary, '-C', self.cluster_file, '--exec',
|
||||
'status json']
|
||||
res = subprocess.run(args, env=self.process_env(),
|
||||
stdout=subprocess.PIPE)
|
||||
assert res.returncode == 0, "Get status failed with {}".format(
|
||||
res.returncode)
|
||||
return json.loads(res.stdout)
|
||||
|
||||
def process_env(self):
|
||||
env = dict(os.environ)
|
||||
env.update(self.env_vars)
|
||||
return env
|
||||
|
||||
def set_env_var(self, var_name, var_val):
|
||||
self.env_vars[var_name] = var_val
|
||||
|
|
|
@ -0,0 +1,305 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from argparse import ArgumentParser, RawDescriptionHelpFormatter
|
||||
import glob
|
||||
import os
|
||||
from pathlib import Path
|
||||
import platform
|
||||
import shutil
|
||||
import stat
|
||||
import subprocess
|
||||
import sys
|
||||
from threading import Thread
|
||||
import time
|
||||
from urllib import request
|
||||
|
||||
from local_cluster import LocalCluster, random_secret_string
|
||||
|
||||
|
||||
SUPPORTED_PLATFORMS = ["x86_64"]
|
||||
SUPPORTED_VERSIONS = ["7.1.0", "6.3.23",
|
||||
"6.3.22", "6.3.18", "6.3.17", "6.3.16", "6.3.15", "6.3.13", "6.3.12", "6.3.9", "6.2.30",
|
||||
"6.2.29", "6.2.28", "6.2.27", "6.2.26", "6.2.25", "6.2.24", "6.2.23", "6.2.22", "6.2.21",
|
||||
"6.2.20", "6.2.19", "6.2.18", "6.2.17", "6.2.16", "6.2.15", "6.2.10", "6.1.13", "6.1.12",
|
||||
"6.1.11", "6.1.10", "6.0.18", "6.0.17", "6.0.16", "6.0.15", "6.0.14", "5.2.8", "5.2.7",
|
||||
"5.1.7", "5.1.6"]
|
||||
FDB_DOWNLOAD_ROOT = "https://github.com/apple/foundationdb/releases/download/"
|
||||
CURRENT_VERSION = "7.1.0"
|
||||
|
||||
|
||||
def make_executable(path):
|
||||
mode = os.stat(path).st_mode
|
||||
st = os.stat(path)
|
||||
os.chmod(path, st.st_mode | stat.S_IEXEC)
|
||||
|
||||
|
||||
def version_from_str(ver_str):
|
||||
ver = [int(s) for s in ver_str.split(".")]
|
||||
assert len(ver) == 3, "Invalid version string {}".format(ver_str)
|
||||
return ver
|
||||
|
||||
|
||||
def version_before(ver_str1, ver_str2):
|
||||
return version_from_str(ver_str1) < version_from_str(ver_str2)
|
||||
|
||||
|
||||
class UpgradeTest:
|
||||
def __init__(self, build_dir: str, upgrade_path: list, process_number: int = 1, port: str = None):
|
||||
self.build_dir = Path(build_dir).resolve()
|
||||
assert self.build_dir.exists(), "{} does not exist".format(build_dir)
|
||||
assert self.build_dir.is_dir(), "{} is not a directory".format(build_dir)
|
||||
self.upgrade_path = upgrade_path
|
||||
for version in upgrade_path:
|
||||
assert version in SUPPORTED_VERSIONS, "Unsupported version {}".format(
|
||||
version)
|
||||
self.platform = platform.machine()
|
||||
assert self.platform in SUPPORTED_PLATFORMS, "Unsupported platform {}".format(
|
||||
self.platform)
|
||||
self.tmp_dir = self.build_dir.joinpath(
|
||||
"tmp",
|
||||
random_secret_string(16)
|
||||
)
|
||||
self.tmp_dir.mkdir(parents=True)
|
||||
self.download_dir = self.build_dir.joinpath(
|
||||
"tmp",
|
||||
"old_binaries"
|
||||
)
|
||||
self.download_old_binaries()
|
||||
self.create_external_lib_dir()
|
||||
init_version = upgrade_path[0]
|
||||
self.cluster = LocalCluster(
|
||||
self.tmp_dir,
|
||||
self.binary_path(init_version, "fdbserver"),
|
||||
self.binary_path(init_version, "fdbmonitor"),
|
||||
self.binary_path(init_version, "fdbcli"),
|
||||
process_number,
|
||||
port=port,
|
||||
create_config=False
|
||||
)
|
||||
self.cluster.create_cluster_file()
|
||||
self.configure_version(init_version)
|
||||
self.log = self.cluster.log
|
||||
self.etc = self.cluster.etc
|
||||
self.data = self.cluster.data
|
||||
|
||||
def binary_path(self, version, bin_name):
|
||||
if version == CURRENT_VERSION:
|
||||
return self.build_dir.joinpath("bin", bin_name)
|
||||
else:
|
||||
return self.download_dir.joinpath(version, bin_name)
|
||||
|
||||
def lib_dir(self, version):
|
||||
if version == CURRENT_VERSION:
|
||||
return self.build_dir.joinpath("lib")
|
||||
else:
|
||||
return self.download_dir.joinpath(version)
|
||||
|
||||
def download_old_binary(self, version, target_bin_name, remote_bin_name, executable):
|
||||
local_file = self.binary_path(version, target_bin_name)
|
||||
if (local_file.exists()):
|
||||
return
|
||||
self.download_dir.joinpath(version).mkdir(
|
||||
parents=True, exist_ok=True)
|
||||
remote_file = "{}{}/{}".format(FDB_DOWNLOAD_ROOT,
|
||||
version, remote_bin_name)
|
||||
print("Downloading '{}' to '{}'...".format(remote_file, local_file))
|
||||
request.urlretrieve(remote_file, local_file)
|
||||
print("Download complete")
|
||||
assert local_file.exists(), "{} does not exist".format(local_file)
|
||||
if executable:
|
||||
make_executable(local_file)
|
||||
|
||||
def download_old_binaries(self):
|
||||
for version in self.upgrade_path:
|
||||
if version == CURRENT_VERSION:
|
||||
continue
|
||||
self.download_old_binary(version,
|
||||
"fdbserver", "fdbserver.{}".format(self.platform), True)
|
||||
self.download_old_binary(version,
|
||||
"fdbmonitor", "fdbmonitor.{}".format(self.platform), True)
|
||||
self.download_old_binary(version,
|
||||
"fdbcli", "fdbcli.{}".format(self.platform), True)
|
||||
self.download_old_binary(version,
|
||||
"libfdb_c.so", "libfdb_c.{}.so".format(self.platform), False)
|
||||
|
||||
def create_external_lib_dir(self):
|
||||
self.external_lib_dir = self.tmp_dir.joinpath("client_libs")
|
||||
self.external_lib_dir.mkdir(parents=True)
|
||||
for version in self.upgrade_path:
|
||||
src_file_path = self.lib_dir(version).joinpath("libfdb_c.so")
|
||||
assert src_file_path.exists(), "{} does not exist".format(src_file_path)
|
||||
target_file_path = self.external_lib_dir.joinpath(
|
||||
"libfdb_c.{}.so".format(version))
|
||||
shutil.copyfile(src_file_path, target_file_path)
|
||||
|
||||
def health_check(self, timeout_sec=5):
|
||||
retries = 0
|
||||
while retries < timeout_sec:
|
||||
retries += 1
|
||||
status = self.cluster.get_status()
|
||||
if not "processes" in status["cluster"]:
|
||||
print("Health check: no processes found. Retrying")
|
||||
time.sleep(1)
|
||||
continue
|
||||
num_proc = len(status["cluster"]["processes"])
|
||||
if (num_proc < self.cluster.process_number):
|
||||
print("Health check: {} of {} processes found. Retrying",
|
||||
num_proc, self.cluster.process_number)
|
||||
time.sleep(1)
|
||||
continue
|
||||
assert num_proc == self.cluster.process_number, "Number of processes: expected: {}, actual: {}".format(
|
||||
self.cluster.process_number, num_proc)
|
||||
for (_, proc_stat) in status["cluster"]["processes"].items():
|
||||
proc_ver = proc_stat["version"]
|
||||
assert proc_ver == self.cluster_version, "Process version: expected: {}, actual: {}".format(
|
||||
self.cluster_version, proc_ver)
|
||||
print("Health check: OK")
|
||||
return
|
||||
assert False, "Health check: Failed"
|
||||
|
||||
def configure_version(self, version):
|
||||
self.cluster.fdbmonitor_binary = self.binary_path(
|
||||
version, "fdbmonitor")
|
||||
self.cluster.fdbserver_binary = self.binary_path(version, "fdbserver")
|
||||
self.cluster.fdbcli_binary = self.binary_path(version, "fdbcli")
|
||||
self.cluster.set_env_var = "LD_LIBRARY_PATH", self.lib_dir(version)
|
||||
if (version_before(version, "7.1.0")):
|
||||
self.cluster.use_legacy_conf_syntax = True
|
||||
self.cluster.save_config()
|
||||
self.cluster_version = version
|
||||
|
||||
def upgrade_to(self, version):
|
||||
print("Upgrading to version {}".format(version))
|
||||
self.cluster.stop_cluster()
|
||||
self.configure_version(version)
|
||||
self.cluster.ensure_ports_released()
|
||||
self.cluster.start_cluster()
|
||||
print("Upgraded to {}".format(version))
|
||||
|
||||
def __enter__(self):
|
||||
print("Starting cluster version {}".format(self.cluster_version))
|
||||
self.cluster.start_cluster()
|
||||
self.cluster.create_database()
|
||||
return self
|
||||
|
||||
def __exit__(self, xc_type, exc_value, traceback):
|
||||
self.cluster.stop_cluster()
|
||||
shutil.rmtree(self.tmp_dir)
|
||||
|
||||
def exec_workload(self, test_file):
|
||||
cmd_args = [self.tester_bin,
|
||||
'--cluster-file', self.cluster.cluster_file,
|
||||
'--test-file', test_file,
|
||||
'--external-client-dir', self.external_lib_dir]
|
||||
retcode = subprocess.run(
|
||||
cmd_args, stdout=sys.stdout, stderr=sys.stderr,
|
||||
).returncode
|
||||
return retcode
|
||||
|
||||
def exec_upgrade_test(self):
|
||||
self.health_check()
|
||||
for version in self.upgrade_path[1:]:
|
||||
self.upgrade_to(version)
|
||||
self.health_check()
|
||||
|
||||
def exec_test(self, args):
|
||||
self.tester_bin = self.build_dir.joinpath("bin", "fdb_c_api_tester")
|
||||
assert self.tester_bin.exists(), "{} does not exist".format(self.tester_bin)
|
||||
|
||||
thread = Thread(target=self.exec_workload, args=(args.test_file))
|
||||
thread.start()
|
||||
self.exec_upgrade_test()
|
||||
retcode = thread.join()
|
||||
return retcode
|
||||
|
||||
def check_cluster_logs(self, error_limit=100):
|
||||
sev40s = (
|
||||
subprocess.getoutput(
|
||||
"grep -r 'Severity=\"40\"' {}".format(
|
||||
self.cluster.log.as_posix())
|
||||
)
|
||||
.rstrip()
|
||||
.splitlines()
|
||||
)
|
||||
|
||||
err_cnt = 0
|
||||
for line in sev40s:
|
||||
# When running ASAN we expect to see this message. Boost coroutine should be using the correct asan annotations so that it shouldn't produce any false positives.
|
||||
if line.endswith(
|
||||
"WARNING: ASan doesn't fully support makecontext/swapcontext functions and may produce false positives in some cases!"
|
||||
):
|
||||
continue
|
||||
if (err_cnt < error_limit):
|
||||
print(line)
|
||||
err_cnt += 1
|
||||
|
||||
if err_cnt > 0:
|
||||
print(
|
||||
">>>>>>>>>>>>>>>>>>>> Found {} severity 40 events - the test fails", err_cnt)
|
||||
return err_cnt == 0
|
||||
|
||||
def dump_cluster_logs(self):
|
||||
for etc_file in glob.glob(os.path.join(self.cluster.etc, "*")):
|
||||
print(">>>>>>>>>>>>>>>>>>>> Contents of {}:".format(etc_file))
|
||||
with open(etc_file, "r") as f:
|
||||
print(f.read())
|
||||
for log_file in glob.glob(os.path.join(self.cluster.log, "*")):
|
||||
print(">>>>>>>>>>>>>>>>>>>> Contents of {}:".format(log_file))
|
||||
with open(log_file, "r") as f:
|
||||
print(f.read())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = ArgumentParser(
|
||||
formatter_class=RawDescriptionHelpFormatter,
|
||||
description="""
|
||||
TBD
|
||||
""",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--build-dir",
|
||||
"-b",
|
||||
metavar="BUILD_DIRECTORY",
|
||||
help="FDB build directory",
|
||||
required=True,
|
||||
)
|
||||
parser.add_argument(
|
||||
'--upgrade-path',
|
||||
nargs='+',
|
||||
help='Cluster upgrade path: a space separated list of versions',
|
||||
default=[CURRENT_VERSION]
|
||||
)
|
||||
parser.add_argument(
|
||||
'--test-file',
|
||||
nargs='+',
|
||||
help='A .toml file describing a test workload to be generated with fdb_c_api_tester',
|
||||
required=True,
|
||||
default=[CURRENT_VERSION]
|
||||
)
|
||||
parser.add_argument(
|
||||
"--process-number",
|
||||
"-p",
|
||||
help="Number of fdb processes running",
|
||||
type=int,
|
||||
default=1,
|
||||
)
|
||||
parser.add_argument(
|
||||
'--disable-log-dump',
|
||||
help='Do not dump cluster log on error',
|
||||
action="store_true"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
errcode = 1
|
||||
with UpgradeTest(args.build_dir, args.upgrade_path, args.process_number) as test:
|
||||
print("log-dir: {}".format(test.log))
|
||||
print("etc-dir: {}".format(test.etc))
|
||||
print("data-dir: {}".format(test.data))
|
||||
print("cluster-file: {}".format(test.etc.joinpath("fdb.cluster")))
|
||||
errcode = test.exec_test(args)
|
||||
if test.check_cluster_logs():
|
||||
errcode = 1 if errcode == 0 else errcode
|
||||
if errcode != 0 and not args.disable_log_dump:
|
||||
test.dump_cluster_logs()
|
||||
|
||||
sys.exit(errcode)
|
Loading…
Reference in New Issue