Upgrade Tests: Refactoring LocalCluster to enable configuration updates
This commit is contained in:
parent
34cd80818b
commit
70c60c69b8
|
@ -4,10 +4,10 @@ import os
|
||||||
import shutil
|
import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
from local_cluster import LocalCluster
|
from local_cluster import random_secret_string
|
||||||
from argparse import ArgumentParser, RawDescriptionHelpFormatter
|
from argparse import ArgumentParser, RawDescriptionHelpFormatter
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from random import choice
|
|
||||||
|
|
||||||
class ClusterFileGenerator:
|
class ClusterFileGenerator:
|
||||||
def __init__(self, output_dir: str):
|
def __init__(self, output_dir: str):
|
||||||
|
@ -15,8 +15,7 @@ class ClusterFileGenerator:
|
||||||
assert self.output_dir.exists(), "{} does not exist".format(output_dir)
|
assert self.output_dir.exists(), "{} does not exist".format(output_dir)
|
||||||
assert self.output_dir.is_dir(), "{} is not a directory".format(output_dir)
|
assert self.output_dir.is_dir(), "{} is not a directory".format(output_dir)
|
||||||
self.tmp_dir = self.output_dir.joinpath(
|
self.tmp_dir = self.output_dir.joinpath(
|
||||||
'tmp',
|
'tmp', random_secret_string(16))
|
||||||
''.join(choice(LocalCluster.valid_letters_for_secret) for i in range(16)))
|
|
||||||
self.tmp_dir.mkdir(parents=True)
|
self.tmp_dir.mkdir(parents=True)
|
||||||
self.cluster_file_path = self.tmp_dir.joinpath('fdb.cluster')
|
self.cluster_file_path = self.tmp_dir.joinpath('fdb.cluster')
|
||||||
|
|
||||||
|
@ -46,8 +45,10 @@ if __name__ == '__main__':
|
||||||
The environment variable FDB_CLUSTER_FILE is set to the generated cluster file for the command if
|
The environment variable FDB_CLUSTER_FILE is set to the generated cluster file for the command if
|
||||||
it is not set already.
|
it is not set already.
|
||||||
""")
|
""")
|
||||||
parser.add_argument('--output-dir', '-o', metavar='OUTPUT_DIRECTORY', help='Directory where output files are written', required=True)
|
parser.add_argument('--output-dir', '-o', metavar='OUTPUT_DIRECTORY',
|
||||||
parser.add_argument('cmd', metavar="COMMAND", nargs="+", help="The command to run")
|
help='Directory where output files are written', required=True)
|
||||||
|
parser.add_argument('cmd', metavar="COMMAND",
|
||||||
|
nargs="+", help="The command to run")
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
errcode = 1
|
errcode = 1
|
||||||
|
|
||||||
|
@ -61,7 +62,9 @@ if __name__ == '__main__':
|
||||||
cmd_args.append(cmd)
|
cmd_args.append(cmd)
|
||||||
|
|
||||||
env = dict(**os.environ)
|
env = dict(**os.environ)
|
||||||
env['FDB_CLUSTER_FILE'] = env.get('FDB_CLUSTER_FILE', generator.cluster_file_path)
|
env['FDB_CLUSTER_FILE'] = env.get(
|
||||||
errcode = subprocess.run(cmd_args, stdout=sys.stdout, stderr=sys.stderr, env=env).returncode
|
'FDB_CLUSTER_FILE', generator.cluster_file_path)
|
||||||
|
errcode = subprocess.run(
|
||||||
|
cmd_args, stdout=sys.stdout, stderr=sys.stderr, env=env).returncode
|
||||||
|
|
||||||
sys.exit(errcode)
|
sys.exit(errcode)
|
||||||
|
|
|
@ -1,9 +1,8 @@
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from argparse import ArgumentParser
|
|
||||||
import random
|
import random
|
||||||
import string
|
import string
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import os
|
||||||
import socket
|
import socket
|
||||||
|
|
||||||
|
|
||||||
|
@ -25,6 +24,13 @@ def get_free_port():
|
||||||
return port
|
return port
|
||||||
|
|
||||||
|
|
||||||
|
valid_letters_for_secret = string.ascii_letters + string.digits
|
||||||
|
|
||||||
|
|
||||||
|
def random_secret_string(len):
|
||||||
|
return ''.join(random.choice(valid_letters_for_secret) for i in range(len))
|
||||||
|
|
||||||
|
|
||||||
class LocalCluster:
|
class LocalCluster:
|
||||||
configuration_template = """
|
configuration_template = """
|
||||||
## foundationdb.conf
|
## foundationdb.conf
|
||||||
|
@ -48,7 +54,7 @@ cluster-file = {etcdir}/fdb.cluster
|
||||||
## Default parameters for individual fdbserver processes
|
## Default parameters for individual fdbserver processes
|
||||||
[fdbserver]
|
[fdbserver]
|
||||||
command = {fdbserver_bin}
|
command = {fdbserver_bin}
|
||||||
public-address = auto:$ID
|
public-address = {ip_address}:$ID
|
||||||
listen-address = public
|
listen-address = public
|
||||||
datadir = {datadir}/$ID
|
datadir = {datadir}/$ID
|
||||||
logdir = {logdir}
|
logdir = {logdir}
|
||||||
|
@ -65,76 +71,107 @@ logdir = {logdir}
|
||||||
|
|
||||||
## An individual fdbserver process with id 4000
|
## An individual fdbserver process with id 4000
|
||||||
## Parameters set here override defaults from the [fdbserver] section
|
## Parameters set here override defaults from the [fdbserver] section
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
valid_letters_for_secret = string.ascii_letters + string.digits
|
def __init__(self, basedir: str, fdbserver_binary: str, fdbmonitor_binary: str, fdbcli_binary: str,
|
||||||
|
process_number: int, create_config=True, port=None, ip_address=None):
|
||||||
def __init__(self, basedir: str, fdbserver_binary: str, fdbmonitor_binary: str,
|
|
||||||
fdbcli_binary: str, process_number: int, create_config=True, port=None, ip_address=None):
|
|
||||||
self.basedir = Path(basedir)
|
self.basedir = Path(basedir)
|
||||||
|
self.etc = self.basedir.joinpath('etc')
|
||||||
|
self.log = self.basedir.joinpath('log')
|
||||||
|
self.data = self.basedir.joinpath('data')
|
||||||
|
self.conf_file = self.etc.joinpath('foundationdb.conf')
|
||||||
|
self.cluster_file = self.etc.joinpath('fdb.cluster')
|
||||||
self.fdbserver_binary = Path(fdbserver_binary)
|
self.fdbserver_binary = Path(fdbserver_binary)
|
||||||
self.fdbmonitor_binary = Path(fdbmonitor_binary)
|
self.fdbmonitor_binary = Path(fdbmonitor_binary)
|
||||||
self.fdbcli_binary = Path(fdbcli_binary)
|
self.fdbcli_binary = Path(fdbcli_binary)
|
||||||
for b in (self.fdbserver_binary, self.fdbmonitor_binary, self.fdbcli_binary):
|
for b in (self.fdbserver_binary, self.fdbmonitor_binary, self.fdbcli_binary):
|
||||||
assert b.exists(), "{} does not exist".format(b)
|
assert b.exists(), "{} does not exist".format(b)
|
||||||
if not self.basedir.exists():
|
|
||||||
self.basedir.mkdir()
|
|
||||||
self.etc = self.basedir.joinpath('etc')
|
|
||||||
self.log = self.basedir.joinpath('log')
|
|
||||||
self.data = self.basedir.joinpath('data')
|
|
||||||
self.etc.mkdir(exist_ok=True)
|
self.etc.mkdir(exist_ok=True)
|
||||||
self.log.mkdir(exist_ok=True)
|
self.log.mkdir(exist_ok=True)
|
||||||
self.data.mkdir(exist_ok=True)
|
self.data.mkdir(exist_ok=True)
|
||||||
self.port = get_free_port() if port is None else port
|
self.process_number = process_number
|
||||||
self.ip_address = '127.0.0.1' if ip_address is None else ip_address
|
self.ip_address = '127.0.0.1' if ip_address is None else ip_address
|
||||||
|
self.first_port = port
|
||||||
|
if (self.first_port is not None):
|
||||||
|
self.last_used_port = self.first_port-1
|
||||||
|
self.server_ports = [self.__next_port()
|
||||||
|
for _ in range(self.process_number)]
|
||||||
|
self.cluster_desc = random_secret_string(8)
|
||||||
|
self.cluster_secret = random_secret_string(8)
|
||||||
self.running = False
|
self.running = False
|
||||||
self.process = None
|
self.process = None
|
||||||
self.fdbmonitor_logfile = None
|
self.fdbmonitor_logfile = None
|
||||||
if create_config:
|
if create_config:
|
||||||
with open(self.etc.joinpath('fdb.cluster'), 'x') as f:
|
self.create_cluster_file()
|
||||||
random_string = lambda len : ''.join(random.choice(LocalCluster.valid_letters_for_secret) for i in range(len))
|
self.save_config()
|
||||||
f.write('{desc}:{secret}@{ip_addr}:{server_port}'.format(
|
|
||||||
desc=random_string(8),
|
def __next_port(self):
|
||||||
secret=random_string(8),
|
if (self.first_port is None):
|
||||||
ip_addr=self.ip_address,
|
return get_free_port()
|
||||||
server_port=self.port
|
else:
|
||||||
))
|
self.last_used_port += 1
|
||||||
with open(self.etc.joinpath('foundationdb.conf'), 'x') as f:
|
return self.last_used_port
|
||||||
|
|
||||||
|
def save_config(self):
|
||||||
|
new_conf_file = self.conf_file.parent / (self.conf_file.name + '.new')
|
||||||
|
with open(new_conf_file, 'x') as f:
|
||||||
f.write(LocalCluster.configuration_template.format(
|
f.write(LocalCluster.configuration_template.format(
|
||||||
etcdir=self.etc,
|
etcdir=self.etc,
|
||||||
fdbserver_bin=self.fdbserver_binary,
|
fdbserver_bin=self.fdbserver_binary,
|
||||||
datadir=self.data,
|
datadir=self.data,
|
||||||
logdir=self.log
|
logdir=self.log,
|
||||||
|
ip_address=self.ip_address
|
||||||
))
|
))
|
||||||
# By default, the cluster only has one process
|
# By default, the cluster only has one process
|
||||||
# If a port number is given and process_number > 1, we will use subsequent numbers
|
# If a port number is given and process_number > 1, we will use subsequent numbers
|
||||||
# E.g., port = 4000, process_number = 5
|
# E.g., port = 4000, process_number = 5
|
||||||
# Then 4000,4001,4002,4003,4004 will be used as ports
|
# Then 4000,4001,4002,4003,4004 will be used as ports
|
||||||
# If port number is not given, we will randomly pick free ports
|
# If port number is not given, we will randomly pick free ports
|
||||||
for index, _ in enumerate(range(process_number)):
|
for port in self.server_ports:
|
||||||
f.write('[fdbserver.{server_port}]\n'.format(server_port=self.port))
|
f.write('[fdbserver.{server_port}]\n'.format(
|
||||||
self.port = get_free_port() if port is None else str(int(self.port) + 1)
|
server_port=port))
|
||||||
|
f.flush()
|
||||||
|
os.fsync(f.fileno())
|
||||||
|
|
||||||
def __enter__(self):
|
os.replace(new_conf_file, self.conf_file)
|
||||||
|
|
||||||
|
def create_cluster_file(self):
|
||||||
|
with open(self.cluster_file, 'x') as f:
|
||||||
|
f.write('{desc}:{secret}@{ip_addr}:{server_port}'.format(
|
||||||
|
desc=self.cluster_desc,
|
||||||
|
secret=self.cluster_secret,
|
||||||
|
ip_addr=self.ip_address,
|
||||||
|
server_port=self.server_ports[0]
|
||||||
|
))
|
||||||
|
|
||||||
|
def start_cluster(self):
|
||||||
assert not self.running, "Can't start a server that is already running"
|
assert not self.running, "Can't start a server that is already running"
|
||||||
args = [str(self.fdbmonitor_binary),
|
args = [str(self.fdbmonitor_binary),
|
||||||
'--conffile',
|
'--conffile',
|
||||||
str(self.etc.joinpath('foundationdb.conf')),
|
str(self.etc.joinpath('foundationdb.conf')),
|
||||||
'--lockfile',
|
'--lockfile',
|
||||||
str(self.etc.joinpath('fdbmonitor.lock'))]
|
str(self.etc.joinpath('fdbmonitor.lock'))]
|
||||||
self.fdbmonitor_logfile = open(self.log.joinpath('fdbmonitor.log'), 'w')
|
self.fdbmonitor_logfile = open(
|
||||||
self.process = subprocess.Popen(args, stdout=self.fdbmonitor_logfile, stderr=self.fdbmonitor_logfile)
|
self.log.joinpath('fdbmonitor.log'), 'w')
|
||||||
|
self.process = subprocess.Popen(
|
||||||
|
args, stdout=self.fdbmonitor_logfile, stderr=self.fdbmonitor_logfile)
|
||||||
self.running = True
|
self.running = True
|
||||||
return self
|
|
||||||
|
|
||||||
def __exit__(self, xc_type, exc_value, traceback):
|
def stop_cluster(self):
|
||||||
assert self.running, "Server is not running"
|
assert self.running, "Server is not running"
|
||||||
if self.process.poll() is None:
|
if self.process.poll() is None:
|
||||||
self.process.terminate()
|
self.process.terminate()
|
||||||
|
self.running = True
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self.start_cluster()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, xc_type, exc_value, traceback):
|
||||||
|
self.stop_cluster()
|
||||||
self.running = False
|
self.running = False
|
||||||
|
|
||||||
def create_database(self, storage='ssd'):
|
def create_database(self, storage='ssd'):
|
||||||
args = [self.fdbcli_binary, '-C', self.etc.joinpath('fdb.cluster'), '--exec',
|
args = [self.fdbcli_binary, '-C', self.cluster_file, '--exec',
|
||||||
'configure new single {}'.format(storage)]
|
'configure new single {}'.format(storage)]
|
||||||
subprocess.run(args)
|
subprocess.run(args)
|
||||||
|
|
|
@ -5,9 +5,8 @@ import os
|
||||||
import shutil
|
import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
from local_cluster import LocalCluster
|
from local_cluster import LocalCluster, random_secret_string
|
||||||
from argparse import ArgumentParser, RawDescriptionHelpFormatter
|
from argparse import ArgumentParser, RawDescriptionHelpFormatter
|
||||||
from random import choice
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
@ -18,8 +17,7 @@ class TempCluster:
|
||||||
assert self.build_dir.is_dir(), "{} is not a directory".format(build_dir)
|
assert self.build_dir.is_dir(), "{} is not a directory".format(build_dir)
|
||||||
tmp_dir = self.build_dir.joinpath(
|
tmp_dir = self.build_dir.joinpath(
|
||||||
"tmp",
|
"tmp",
|
||||||
"".join(choice(LocalCluster.valid_letters_for_secret)
|
random_secret_string(16)
|
||||||
for i in range(16)),
|
|
||||||
)
|
)
|
||||||
tmp_dir.mkdir(parents=True)
|
tmp_dir.mkdir(parents=True)
|
||||||
self.cluster = LocalCluster(
|
self.cluster = LocalCluster(
|
||||||
|
|
Loading…
Reference in New Issue