Merge pull request from xis19/binding_test

Scripts to start a local cluster and run binding test in batch
This commit is contained in:
Jingyu Zhou 2023-01-23 17:47:19 -08:00 committed by GitHub
commit 4baab48db6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1095 additions and 0 deletions

View File

@ -0,0 +1,3 @@
# `local_cluster` library
`local_cluster` library provides a way of spawning local FoundationDB processes.

View File

@ -0,0 +1,358 @@
#! /usr/bin/env python3
import argparse
import asyncio
import logging
import os
import os.path
import sys
import lib.fdb_process
import lib.local_cluster
import lib.process
from typing import List
logger = logging.getLogger("binding_test")
SCRIPT_DIR = os.path.split(os.path.abspath(__file__))[0]
# At this stage, the binaries are staying together with the current script
BINARY_DIR = SCRIPT_DIR
DEFAULT_FDBSERVER_PATH = os.path.join(BINARY_DIR, "fdbserver")
DEFAULT_FDBCLI_PATH = os.path.join(BINARY_DIR, "fdbcli")
# This is the LD_LIBRARY_PATH, so the file name is not included
DEFAULT_LIBFDB_PATH = os.path.abspath(os.path.join(BINARY_DIR))
DEFAULT_BINDINGTESTER = os.path.join(
BINARY_DIR, "tests", "bindingtester", "bindingtester.py"
)
# binding test requires a working Python binder. The system default binder may
# be outdated or even not exist. Thus, the binder in the package is used and
# is assumed to work reasonably.
DEFAULT_PYTHON_BINDER = os.path.join(BINARY_DIR, "tests", "python")
DEFAULT_CONCURRENCY = 5
DEFAULT_OPERATIONS = 1000
DEFAULT_HCA_OPERATIONS = 100
DEFAULT_TIMEOUT_PER_TEST = 360.0
def _setup_logs(log_level: int = logging.INFO):
log_format = logging.Formatter(
"%(asctime)s | %(name)20s :: %(levelname)-8s :: %(message)s"
)
logger.handlers.clear()
stdout_handler = logging.StreamHandler(stream=sys.stderr)
stdout_handler.setLevel(log_level)
stdout_handler.setFormatter(log_format)
logger.addHandler(stdout_handler)
logger.setLevel(log_level)
# Here we might lose some of the logging from lib
lib_logger = logging.getLogger("lib")
lib_logger.setLevel(log_level)
def _setup_args() -> argparse.Namespace:
"""Parse the command line arguments"""
parser = argparse.ArgumentParser("binding_test.py")
parser.add_argument("--num-cycles", type=int, default=1, help="Number of cycles")
parser.add_argument(
"--debug", action="store_true", default=False, help="Debug logging"
)
parser.add_argument(
"--stop-at-failure",
type=int,
default=-1,
help="Stop the test at binding test failure",
)
parser.add_argument(
"--fdbserver-path",
type=str,
default=DEFAULT_FDBSERVER_PATH,
help="Path to fdbserver",
)
parser.add_argument(
"--fdbcli-path", type=str, default=DEFAULT_FDBCLI_PATH, help="Path to fdbcli"
)
parser.add_argument(
"--libfdb-path",
type=str,
default=DEFAULT_LIBFDB_PATH,
help="Path to libfdb.so. NOTE: The file name should not be included.",
)
parser.add_argument(
"--binding-tester-path",
type=str,
default=DEFAULT_BINDINGTESTER,
help="Path to binding tester",
)
parser.add_argument(
"--num-ops", type=int, default=DEFAULT_OPERATIONS, help="Num ops in test"
)
parser.add_argument(
"--num-hca-ops",
type=int,
default=DEFAULT_HCA_OPERATIONS,
help="Num ops in HCA test",
)
parser.add_argument(
"--concurrency", type=int, default=DEFAULT_CONCURRENCY, help="Concurrency level"
)
parser.add_argument(
"--test-timeout",
type=float,
default=DEFAULT_TIMEOUT_PER_TEST,
help="Timeout for each single test",
)
return parser.parse_args()
def _check_file(path: str, executable: bool = True):
if not os.path.exists(path):
raise RuntimeError(f"{path} not found")
if executable and (not os.path.isfile(path) or not os.access(path, os.X_OK)):
raise RuntimeError(f"{path} not executable")
# TODO it might be better to import the binding_test rather than calling using subprocess
class TestSet:
def __init__(
self,
binding_tester: str,
num_ops: int,
num_hca_ops: int,
concurrency: int,
ld_library_path: str,
timeout: float,
logging_level: str = "INFO",
) -> None:
self._binding_tester = binding_tester
self._num_ops = num_ops
self._num_hca_ops = num_hca_ops
self._concurrency = concurrency
self._timeout = timeout
self._logging_level = logging_level
self._env = dict(os.environ)
self._update_path_from_env("LD_LIBRARY_PATH", ld_library_path)
self._update_path_from_env("PYTHONPATH", DEFAULT_PYTHON_BINDER)
def _update_path_from_env(self, environment_variable_name: str, new_path: str):
original_path = os.getenv(environment_variable_name)
self._env[environment_variable_name] = (
f"{new_path}:{original_path}" if original_path else new_path
)
logger.debug(
f"{environment_variable_name} for binding tester: {self._env['LD_LIBRARY_PATH']}"
)
async def _test_coroutine(
self,
api_language: str,
test_name: str,
additional_args: List[str],
):
arguments = [
api_language,
"--test-name",
test_name,
"--logging-level",
self._logging_level,
]
arguments += additional_args
process = await lib.process.Process(
executable=self._binding_tester,
arguments=arguments,
env=self._env,
).run()
try:
await asyncio.wait_for(process.wait(), timeout=self._timeout)
finally:
stdout = (await process.stdout.read(-1)).decode("utf-8")
stderr = (await process.stderr.read(-1)).decode("utf-8")
if len(stdout):
logger.info("API Test stdout:\n{}".format(stdout))
else:
logger.info("API Test stdout: [Empty]")
if len(stderr):
logger.warning("API Test stderr:\n{}".format(stderr))
else:
logger.info("API Test stderr: [Empty]")
async def _run_test(
self,
api_language: str,
test_name: str,
additional_args: List[str],
):
logger.debug(f"Run test API [{api_language}] Test name [{test_name}]")
try:
await self._test_coroutine(
api_language=api_language,
test_name=test_name,
additional_args=additional_args,
)
except asyncio.TimeoutError as timeout:
logger.exception(
f"Test API [{api_language}] Test name [{test_name}] failed due to timeout {self._timeout}"
)
return False
except Exception as e:
logger.exception(
f"Test API [{api_language}] Test name [{test_name}] failed with exception: {str(e)}"
)
return False
logger.debug(f"Test API [{api_language}] Test name [{test_name}] completed")
return True
async def run_scripted_test(self, test: str):
return await self._run_test(test, "scripted", [])
async def run_api_test(self, test: str):
return await self._run_test(
test, "api", ["--compare", "--num-ops", str(self._num_ops)]
)
async def run_api_concurrency_test(self, test: str):
return await self._run_test(
test,
"api",
["--concurrency", str(self._concurrency), "--num-ops", str(self._num_ops)],
)
async def run_directory_test(self, test: str):
return await self._run_test(
test,
"directory",
[
"--compare",
"--num-ops",
str(self._num_ops),
],
)
async def run_directory_hca_test(self, test: str):
return await self._run_test(
test,
"directory_hca",
[
"--concurrency",
str(self._concurrency),
"--num-ops",
str(self._num_hca_ops),
],
)
API_LANGUAGES = [
"python3",
"java",
"java_async",
"go",
"flow",
]
def _log_cluster_lines_with_severity(
cluster: lib.local_cluster.FDBServerLocalCluster, severity: int
):
for process_handlers in cluster.handlers:
for log_file, lines in process_handlers.get_log_with_severity(severity).items():
if severity == 40:
reporter = logger.error
elif severity == 30:
reporter = logger.warning
elif severity == 20:
reporter = logger.info
else:
reporter = logger.debug
if len(lines) == 0:
reporter(f"{log_file}: No Severity={severity} lines")
else:
reporter(
"{}: {} lines with Severity={}\n{}".format(
log_file, len(lines), severity, "".join(lines)
)
)
async def run_binding_tests(
test_set: TestSet, num_cycles: int, stop_at_failure: int = None
):
tests = [
test_set.run_scripted_test,
test_set.run_api_test,
test_set.run_api_concurrency_test,
test_set.run_directory_test,
test_set.run_directory_hca_test,
]
num_failures: int = 0
async def run_tests():
nonlocal num_failures
for api_language in API_LANGUAGES:
for test in tests:
test_success = await test(api_language)
if not test_success:
num_failures += 1
if stop_at_failure and num_failures > stop_at_failure:
raise RuntimeError(
f"Maximum number of test failures have reached"
)
async with lib.local_cluster.FDBServerLocalCluster(1) as local_cluster:
logger.info("Start binding test")
try:
for cycle in range(num_cycles):
logger.info(f"Starting cycle {cycle}")
await run_tests()
except:
logger.exception("Error found during the binding test")
finally:
logger.info(f"Binding test completed with {num_failures} failures")
_log_cluster_lines_with_severity(local_cluster, 40)
_log_cluster_lines_with_severity(local_cluster, 30)
def main():
args = _setup_args()
_setup_logs(args.debug)
_check_file(args.fdbserver_path, True)
_check_file(args.fdbcli_path, True)
_check_file(args.libfdb_path, False)
lib.fdb_process.set_fdbserver_path(args.fdbserver_path)
lib.fdb_process.set_fdbcli_path(args.fdbcli_path)
logger.info(f"Executable: {__file__}")
logger.info(f"PID: {os.getpid()}")
logger.info(f"fdbserver: {args.fdbserver_path}")
logger.info(f"fdbcli: {args.fdbcli_path}")
logger.info(f"libfdb: {args.libfdb_path}")
logger.info(f"NumCycles: {args.num_cycles}")
test_set = TestSet(
binding_tester=args.binding_tester_path,
num_ops=args.num_ops,
num_hca_ops=args.num_hca_ops,
concurrency=args.concurrency,
ld_library_path=args.libfdb_path,
timeout=args.test_timeout,
)
asyncio.run(run_binding_tests(test_set, args.num_cycles, args.stop_at_failure))
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,20 @@
import logging
import sys
def _setup_logs():
logger = logging.getLogger(__name__)
logger.handlers.clear()
log_format = logging.Formatter(
"%(asctime)s | %(name)20s :: %(levelname)-8s :: %(message)s"
)
stdout_handler = logging.StreamHandler(stream=sys.stderr)
stdout_handler.setFormatter(log_format)
logger.addHandler(stdout_handler)
_setup_logs()

View File

@ -0,0 +1,36 @@
import ipaddress
import logging
import os.path
import secrets
from typing import Union
logger = logging.getLogger(__name__)
def generate_fdb_cluster_file(
base_directory: str,
description: Union[str, None] = None,
ip_address: Union[ipaddress.IPv4Address, None] = None,
port: Union[int, None] = None,
) -> str:
"""Generate a fdb.cluster file
:param str base_directory: The place the fdb.cluster will be created
:param Union[str, None] description: Cluster description
:param Union[ipaddress.IPv4Address, None] ip_address: IP, defaults to None
:param Union[int, None] port: Port, defaults to None
:return str: Path to the cluster file
"""
cluster_file_name = os.path.join(base_directory, "fdb.cluster")
description = description or secrets.token_hex(10)
ip_address = ip_address or ipaddress.ip_address("127.0.0.1")
port = port or 4000
content = f"{description}:{description}@{ip_address}:{port}"
with open(cluster_file_name, "w") as stream:
stream.write(content)
logger.debug(f"Generated cluster file with content: {content}")
return cluster_file_name

View File

@ -0,0 +1,254 @@
import asyncio
import glob
import ipaddress
import json
import logging
import os.path
import shutil
import lib.process
from typing import Dict, List, Union
logger = logging.getLogger(__name__)
FDBSERVER_TIMEOUT: float = 180.0
FDBCLI_TIMEOUT: float = 180.0
FDBCLI_RETRY_TIME: float = 1.0
class FileNotFoundError(OSError):
"""File not found error"""
def __init__(self, strerror: str, filename: str, *args, **kwargs):
"""Constructor
:param int errno: errno
:param str strerror: error text
:param str filename: file name
"""
super().__init__(*args, **kwargs)
self._strerror = strerror
self._filename = filename
@property
def filename(self) -> str:
"""Name of the file"""
return self._strerror
@property
def strerror(self) -> str:
"""Error context"""
return self._strerror
class _ExecutablePath:
"""Path to executable"""
def __init__(self, executable: str, overridden_path: str = None):
"""Constructor
:param str executable:
:param str overridden_path:
:raises FileNotFoundError:
"""
self._executable = executable
self._path = None
if overridden_path:
self.set(overridden_path)
def set(self, overridden_path: str = None):
"""Set and validate the path
:param str overridden_path:
"""
path = overridden_path
if path is None:
path = shutil.which(self._executable)
if path is None or not os.path.exists(path):
raise FileNotFoundError(
0, f"{self._executable} not found in {path}", self._executable
)
logger.debug(f"Setting {self._executable} executable to {path}")
self._path = path
def __str__(self) -> str:
return self._path
_fdbserver_path: _ExecutablePath = _ExecutablePath("fdbserver")
_fdbcli_path: _ExecutablePath = _ExecutablePath("fdbcli")
def set_fdbserver_path(path: Union[str, None] = None):
"""Set the path to fdbserver executable
:raises RuntimeError: if fdbserver is not found
"""
_fdbserver_path.set(path)
def get_fdbserver_path() -> str:
"""Gets the path to fdbserver executable"""
return str(_fdbserver_path)
def set_fdbcli_path(path: Union[str, None] = None):
"""Set the path to fdbcli executable
:raises RuntimeError: if fdbcli is not found
"""
_fdbcli_path.set(path)
def get_fdbcli_path() -> str:
"""Get the path to fdbcli executable"""
return str(_fdbcli_path)
class FDBServerProcess(lib.process.Process):
"""Maintain a FDB server process as coroutine"""
def __init__(
self,
cluster_file: str,
public_ip_address: Union[ipaddress.IPv4Address, None] = None,
port: Union[int, None] = None,
class_: str = None,
data_path: str = None,
log_path: str = None,
fdbserver_overridden_path: str = None,
):
"""Constructor
:param str cluster_file: Path to the cluster file
:param ipaddress.IPv4Address public_ip_address: IP address
:param int port: Port the FDB uses
:param str class_: FDB process class
:param str data_path: Path to the database files
:param str log_path: Path to log files
:param str fdbserver_overridden_path:
"""
super().__init__(executable=str(fdbserver_overridden_path or _fdbserver_path))
self._cluster_file = cluster_file
self._public_address = f"{public_ip_address or '127.0.0.1'}:{port or 4000}"
self._class_ = class_
self._data_path = data_path
self._log_path = log_path
async def run(self):
self._args = ["--cluster-file", self._cluster_file, "-p", self._public_address]
if self._class_:
self._args.extend("-c", self._class_)
if self._data_path:
self._args.extend(["--datadir", self._data_path])
if self._log_path:
self._args.extend(["--logdir", self._log_path])
return await super().run()
def _iterate_log_files(self) -> List[str]:
"""Iterate log files
:return List[str]: _description_
"""
pattern = os.path.join(self._log_path, "*.xml")
for path in glob.glob(pattern, recursive=False):
yield path
def get_log_with_severity(self, severity=40) -> Dict[str, List[str]]:
"""Get the lines with given severity from XML log
:return Dict[str, List[str]]:
"""
result = {}
severity_string = f'Severity="{severity}"'
for log_file in self._iterate_log_files():
result[log_file] = []
with open(log_file) as stream:
for line in stream:
if severity_string in line:
result[log_file].append(line)
return result
class FDBCLIProcess(lib.process.Process):
"""Maintain a FDB CLI process as coroutine"""
def __init__(self, cluster_file: str, commands: Union[List[str], str, None] = None):
"""Constructor
:param str cluster_file: Path to the cluster file
:param List[str] commands: Commands
"""
super().__init__(executable=str(_fdbcli_path))
self._cluster_file = cluster_file
self._commands = commands
async def run(self):
self._args = ["-C", self._cluster_file]
if isinstance(self._commands, list):
self._args.extend(["--exec", ";".join(self._commands)])
elif isinstance(self._commands, str):
self._args.extend(["--exec", self._commands])
return await super().run()
async def get_server_status(cluster_file: str) -> Union[Dict, None]:
"""Get the status of fdbserver via fdbcli
:param str cluster_file: path to the cluster file
:return Union[Dict, None]:
"""
fdbcli_process = await lib.fdb_process.FDBCLIProcess(
cluster_file=cluster_file, commands="status json"
).run()
try:
output = await asyncio.wait_for(fdbcli_process.stdout.read(-1), FDBCLI_TIMEOUT)
await fdbcli_process.wait()
return json.loads(output.decode())
except TimeoutError:
return None
async def wait_fdbserver_up(cluster_file: str):
"""Wait for the server that responds
:param str cluster_file: path to the cluster file
"""
async def impl():
response = await get_server_status(cluster_file=cluster_file)
while response is None:
await asyncio.sleep(FDBCLI_RETRY_TIME)
response = await get_server_status(cluster_file=cluster_file)
await asyncio.wait_for(impl(), FDBSERVER_TIMEOUT)
async def wait_fdbserver_available(cluster_file: str):
"""Wait for the server gets available
:param str cluster_file: path to the cluster file
"""
async def impl():
while True:
status = await get_server_status(cluster_file)
if status is not None and status.get("client", {}).get(
"database_status", {}
).get("available", False):
break
await asyncio.sleep(FDBCLI_RETRY_TIME)
await asyncio.wait_for(impl(), FDBSERVER_TIMEOUT)
try:
set_fdbserver_path()
except FileNotFoundError:
logger.warn("Cannot find fdbserver at default location")
try:
set_fdbcli_path()
except FileNotFoundError:
logger.warn("Cannot find fdbcli at default location")

View File

@ -0,0 +1,153 @@
import asyncio
import logging
import os
import os.path
import lib.cluster_file
import lib.fdb_process
import lib.work_directory
from typing import List, Union
logger = logging.getLogger(__name__)
FDB_DEFAULT_PORT = 4000
async def configure_fdbserver(cluster_file: str):
await lib.fdb_process.wait_fdbserver_up(cluster_file=cluster_file)
await (
await lib.fdb_process.FDBCLIProcess(
cluster_file=cluster_file,
commands="configure new single memory tenant_mode=optional_experimental",
).run()
).wait()
await lib.fdb_process.wait_fdbserver_available(cluster_file=cluster_file)
async def spawn_fdbservers(
num_processes: int,
directory: lib.work_directory.WorkDirectory,
cluster_file: str,
port: Union[int, None] = None,
):
fdb_processes = {"handlers": [], "processes": []}
initial_port = port or FDB_DEFAULT_PORT
for i in range(num_processes):
data_path = os.path.join(directory.data_directory, str(i))
os.makedirs(data_path, exist_ok=True)
log_path = os.path.join(directory.log_directory, str(i))
os.makedirs(log_path, exist_ok=True)
fdb_server_process = lib.fdb_process.FDBServerProcess(
cluster_file=cluster_file,
port=initial_port,
data_path=data_path,
log_path=log_path,
)
fdb_processes["handlers"].append(fdb_server_process)
fdb_processes["processes"].append(await fdb_server_process.run())
initial_port += 1
return fdb_processes
class FDBServerLocalCluster:
def __init__(
self,
num_processes: int,
work_directory: Union[str, None] = None,
cluster_file: Union[str, None] = None,
port: Union[int, None] = None,
):
"""Constructor
:param int num_processes: _description_
:param Union[str, None] work_directory: _description_, defaults to None
:param Union[str, None] cluster_file: _description_, defaults to None
:param Union[int, None] port: _description_, defaults to None
"""
self._num_processes: int = num_processes
self._work_directory: Union[str, None] = work_directory
self._cluster_file: Union[str, None] = cluster_file
self._port: int = port
self._processes = None
@property
def work_directory(self) -> Union[str, None]:
"""Work directory
:return str: _description_
"""
return self._work_directory
@property
def cluster_file(self) -> Union[str, None]:
"""Path to the cluster file
:return str: _description_
"""
return self._cluster_file
@property
def processes(self):
"""Processes
"""
return self._processes["processes"]
@property
def handlers(self):
"""Handlers
"""
return self._processes["handlers"]
def terminate(self):
"""Terminate the cluster
"""
# Send SIGTERM
logger.debug("Sending SIGTERM")
for process in self.processes:
process.terminate()
# Send SIGKILL
logger.debug("Sending SIGKILL")
for process in self.processes:
process.kill()
async def run(self) -> List[asyncio.subprocess.Process]:
directory = lib.work_directory.WorkDirectory(self.work_directory)
directory.setup()
self._work_directory = directory.base_directory
logger.info(f"Work directory: {directory.base_directory}")
if not self._cluster_file:
self._cluster_file = lib.cluster_file.generate_fdb_cluster_file(
directory.base_directory
)
self._processes = await spawn_fdbservers(
self._num_processes, directory, self.cluster_file, self._port
)
await configure_fdbserver(self._cluster_file)
logger.info("FoundationDB ready to use")
async def __aenter__(self):
"""Enter the context
:return _type_: _description_
"""
await self.run()
return self
async def __aexit__(self, exc_type, exc, tb):
"""Exit the context
:param _type_ exc_type: _description_
:param _type_ exc: _description_
:param _type_ tb: _description_
"""
self.terminate()

View File

@ -0,0 +1,82 @@
""" Process management """
import asyncio
import logging
from typing import Dict, List, Union
logger = logging.getLogger(__name__)
class Process:
"""Maintain a process as coroutine"""
def __init__(
self, executable: str, arguments: List[str] = None, env: Dict[str, str] = None
):
"""Constructor
:param str executable: Path to the executable
:param List[str] arguments: arguments
"""
self._executable = executable
self._args = arguments or []
self._env = env
self._process: asyncio.subprocess.Process = None
async def run(self) -> asyncio.subprocess.Process:
logger.debug(
f"Spawning process [{self._executable} {' '.join(str(arg) for arg in self._args)}]"
)
self._process = await asyncio.subprocess.create_subprocess_exec(
self._executable,
*self._args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=self._env,
)
return self._process
@property
def pid(self) -> Union[int, None]:
"""Return the pid of the process
:return: The PID, or None if not running
:rtype: int | None
"""
if self._process is None:
return None
return self._process.pid
def kill(self):
"""Kill the process
:raises RuntimeError: if not running
"""
if self._process is None:
raise RuntimeError("Not running")
self._process.kill()
def terminate(self):
"""Terminate the process
:raises RuntimeError: if not running
"""
if self._process is None:
raise RuntimeError("Not running")
self._process.terminate()
def return_code(self) -> Union[int, None]:
"""Get the return code
:return: The return code, if the process is terminated; otherwise None
:rtype: int | None
"""
if self._process is None:
return None
return self._process.returncode
def is_running(self) -> bool:
"""Check if is running
:return: True if still running
:rtype: bool
"""
return self.pid is not None and self.return_code is None

View File

@ -0,0 +1,103 @@
"""Maintains work directories for FDB """
import logging
import os
import os.path
import shutil
import tempfile
from typing import Union, List
logger = logging.getLogger(__name__)
class WorkDirectory:
def __init__(
self,
base_directory: str = None,
data_directory: str = "data/",
log_directory: str = "log/",
auto_cleanup: bool = False,
):
"""Constructor
:param str base_directory: Base directory, if None, uses a temporary directory
:param str data_directory: Data directory, related to base_directory
:param str log_directory: Log directory, related to base_directory
:param bool auto_cleanup: Automatically deletes the file defaults to False
"""
self._data_directory_rel = data_directory
self._log_directory_rel = log_directory
self._base_directory = base_directory
self._data_directory = None
self._log_directory = None
self._pwd = os.getcwd()
self._auto_cleanup = auto_cleanup
@property
def base_directory(self) -> str:
"""Base directory
:return str:
"""
return self._base_directory
@property
def data_directory(self) -> str:
"""Data directory
:return str:
"""
if self._base_directory is None:
return None
return self._data_directory
@property
def log_directory(self) -> str:
"""Log directory
:return str:
"""
if self._base_directory is None:
return None
return self._log_directory
def setup(self):
"""Set up the directories
"""
if self._base_directory is None:
self._base_directory = tempfile.mkdtemp()
logger.debug(f"Work directory {self.base_directory}")
self._data_directory = os.path.join(self._base_directory, self._data_directory_rel)
os.makedirs(self.data_directory, exist_ok=True)
logger.debug(f"Created data directory {self.data_directory}")
self._log_directory = os.path.join(self._base_directory, self._log_directory_rel)
os.makedirs(self.log_directory, exist_ok=True)
os.chdir(self.base_directory)
logger.debug(f"Created log directory {self.log_directory}")
def teardown(self):
"""Tear down the directories
"""
shutil.rmtree(self.base_directory)
self._logger.debug(f"Cleaned up directory {self.base_directory}")
def __enter__(self):
"""Enter the context"""
self.setup()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit the context"""
os.chdir(self._pwd)
if self._auto_cleanup:
self.teardown()

View File

@ -0,0 +1,86 @@
#!/usr/bin/env python3
""" Runs a local FoundationDB cluster
"""
import argparse
import asyncio
import logging
import os
import os.path
import sys
import lib.local_cluster
logger = logging.getLogger(__name__)
def _setup_logs(log_level: int = logging.INFO):
log_format = logging.Formatter(
"%(asctime)s | %(name)20s :: %(levelname)-8s :: %(message)s"
)
logger.handlers.clear()
stdout_handler = logging.StreamHandler(stream=sys.stderr)
stdout_handler.setLevel(log_level)
stdout_handler.setFormatter(log_format)
logger.addHandler(stdout_handler)
logger.setLevel(log_level)
# Here we might lose some of the logging from lib
lib_logger = logging.getLogger("lib")
lib_logger.setLevel(log_level)
def _setup_args() -> argparse.Namespace:
"""Parse the command line arguments"""
parser = argparse.ArgumentParser(os.path.basename(__file__))
parser.add_argument(
"-n", "--num-processes", type=int, default=1, help="Number of FDB processes"
)
parser.add_argument(
"-W", "--work-dir", type=str, default=None, help="Work directory"
)
parser.add_argument(
"--debug", action="store_true", default=False, help="Debug logging"
)
parser.add_argument("--cluster-file", type=str, default=None, help="Cluster file")
parser.add_argument(
"--fdbserver-path", type=str, default=None, help="Path to fdbserver"
)
parser.add_argument("--fdbcli-path", type=str, default=None, help="Path to fdbcli")
parser.add_argument(
"--port", type=int, default=4000, help="Port for the first process"
)
return parser.parse_args()
async def run_fdbservers(num_processes, work_dir, cluster_file, port):
async with lib.local_cluster.FDBServerLocalCluster(
num_processes, work_dir, cluster_file, port
):
await asyncio.sleep(20)
def main():
args = _setup_args()
_setup_logs(logging.DEBUG if args.debug else logging.INFO)
if args.num_processes < 1:
raise RuntimeError(f"Must spawn more than 1 process, got {args.num_processes}")
lib.fdb_process.set_fdbserver_path(args.fdbserver_path)
asyncio.get_event_loop().run_until_complete(
run_fdbservers(args.num_processes, args.work_dir, args.cluster_file, args.port)
)
return 0
if __name__ == "__main__":
sys.exit(main())