Cluster wiggle test
This commit is contained in:
parent
19e7b13eb2
commit
fab8f35683
bindings/c
tests/TestRunner
|
@ -333,6 +333,27 @@ endif()
|
|||
--upgrade-path "7.0.0" "7.2.0"
|
||||
--process-number 3
|
||||
)
|
||||
|
||||
add_test(NAME fdb_c_cluster_wiggle
|
||||
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
|
||||
--build-dir ${CMAKE_BINARY_DIR}
|
||||
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
|
||||
--upgrade-path "7.2.0" "wiggle"
|
||||
--disable-log-dump
|
||||
--process-number 3
|
||||
--redundancy double
|
||||
)
|
||||
|
||||
add_test(NAME fdb_c_wiggle_and_upgrade
|
||||
COMMAND ${CMAKE_SOURCE_DIR}/tests/TestRunner/upgrade_test.py
|
||||
--build-dir ${CMAKE_BINARY_DIR}
|
||||
--test-file ${CMAKE_SOURCE_DIR}/bindings/c/test/apitester/tests/upgrade/MixedApiWorkloadMultiThr.toml
|
||||
--upgrade-path "7.0.0" "wiggle" "7.2.0"
|
||||
--disable-log-dump
|
||||
--process-number 3
|
||||
--redundancy double
|
||||
)
|
||||
|
||||
endif()
|
||||
|
||||
endif()
|
||||
|
|
|
@ -33,8 +33,8 @@
|
|||
|
||||
namespace FdbApiTester {
|
||||
|
||||
constexpr int LONG_WAIT_TIME_US = 1000000;
|
||||
constexpr int LARGE_NUMBER_OF_RETRIES = 5;
|
||||
constexpr int LONG_WAIT_TIME_US = 2000000;
|
||||
constexpr int LARGE_NUMBER_OF_RETRIES = 10;
|
||||
|
||||
void TransactionActorBase::complete(fdb_error_t err) {
|
||||
error = err;
|
||||
|
|
|
@ -7,6 +7,10 @@ import os
|
|||
import socket
|
||||
import time
|
||||
|
||||
CLUSTER_UPDATE_TIMEOUT_SEC = 10
|
||||
EXCLUDE_SERVERS_TIMEOUT_SEC = 120
|
||||
RETRY_INTERVAL_SEC = 0.5
|
||||
|
||||
|
||||
def _get_free_port_internal():
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
|
@ -94,6 +98,7 @@ logdir = {logdir}
|
|||
port=None,
|
||||
ip_address=None,
|
||||
blob_granules_enabled: bool = False,
|
||||
redundancy: str = "single"
|
||||
):
|
||||
self.basedir = Path(basedir)
|
||||
self.etc = self.basedir.joinpath("etc")
|
||||
|
@ -110,6 +115,7 @@ logdir = {logdir}
|
|||
self.log.mkdir(exist_ok=True)
|
||||
self.data.mkdir(exist_ok=True)
|
||||
self.process_number = process_number
|
||||
self.redundancy = redundancy
|
||||
self.ip_address = "127.0.0.1" if ip_address is None else ip_address
|
||||
self.first_port = port
|
||||
self.blob_granules_enabled = blob_granules_enabled
|
||||
|
@ -119,7 +125,9 @@ logdir = {logdir}
|
|||
|
||||
if self.first_port is not None:
|
||||
self.last_used_port = int(self.first_port) - 1
|
||||
self.server_ports = [self.__next_port() for _ in range(self.process_number)]
|
||||
self.server_ports = {server_id: self.__next_port() for server_id in range(self.process_number)}
|
||||
self.server_by_port = {port: server_id for server_id, port in self.server_ports.items()}
|
||||
self.next_server_id = self.process_number
|
||||
self.cluster_desc = random_secret_string(8)
|
||||
self.cluster_secret = random_secret_string(8)
|
||||
self.env_vars = {}
|
||||
|
@ -127,6 +135,8 @@ logdir = {logdir}
|
|||
self.process = None
|
||||
self.fdbmonitor_logfile = None
|
||||
self.use_legacy_conf_syntax = False
|
||||
self.coordinators = set()
|
||||
self.active_servers = set(self.server_ports.keys())
|
||||
|
||||
if create_config:
|
||||
self.create_cluster_file()
|
||||
|
@ -163,11 +173,15 @@ logdir = {logdir}
|
|||
# E.g., port = 4000, process_number = 5
|
||||
# Then 4000,4001,4002,4003,4004 will be used as ports
|
||||
# If port number is not given, we will randomly pick free ports
|
||||
for port in self.server_ports:
|
||||
f.write("[fdbserver.{server_port}]\n".format(server_port=port))
|
||||
for server_id in self.active_servers:
|
||||
f.write("[fdbserver.{server_port}]\n".format(server_port=self.server_ports[server_id]))
|
||||
if self.use_legacy_conf_syntax:
|
||||
f.write("machine_id = {}\n".format(server_id))
|
||||
else:
|
||||
f.write("machine-id = {}\n".format(server_id))
|
||||
if self.blob_granules_enabled:
|
||||
# make last process a blob_worker class
|
||||
f.write("class = blob_worker")
|
||||
f.write("class = blob_worker\n")
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
|
||||
|
@ -183,6 +197,7 @@ logdir = {logdir}
|
|||
server_port=self.server_ports[0],
|
||||
)
|
||||
)
|
||||
self.coordinators = {0}
|
||||
|
||||
def start_cluster(self):
|
||||
assert not self.running, "Can't start a server that is already running"
|
||||
|
@ -212,7 +227,8 @@ logdir = {logdir}
|
|||
sec = 0
|
||||
while sec < timeout_sec:
|
||||
in_use = False
|
||||
for port in self.server_ports:
|
||||
for server_id in self.active_servers:
|
||||
port = self.server_ports[server_id]
|
||||
if is_port_in_use(port):
|
||||
print("Port {} in use. Waiting for it to be released".format(port))
|
||||
in_use = True
|
||||
|
@ -230,37 +246,60 @@ logdir = {logdir}
|
|||
def __exit__(self, xc_type, exc_value, traceback):
|
||||
self.stop_cluster()
|
||||
|
||||
def __fdbcli_exec(self, cmd, stdout, stderr, timeout):
|
||||
args = [self.fdbcli_binary, "-C", self.cluster_file, "--exec", cmd]
|
||||
res = subprocess.run(args, env=self.process_env(), stderr=stderr, stdout=stdout, timeout=timeout)
|
||||
assert res.returncode == 0, "fdbcli command {} failed with {}".format(cmd, res.returncode)
|
||||
return res.stdout
|
||||
|
||||
# Execute a fdbcli command
|
||||
def fdbcli_exec(self, cmd, timeout=None):
|
||||
self.__fdbcli_exec(cmd, None, None, timeout)
|
||||
|
||||
# Execute a fdbcli command and return its output
|
||||
def fdbcli_exec_and_get(self, cmd, timeout=None):
|
||||
return self.__fdbcli_exec(cmd, subprocess.PIPE, None, timeout)
|
||||
|
||||
def create_database(self, storage="ssd", enable_tenants=True):
|
||||
db_config = "configure new single {}".format(storage)
|
||||
db_config = "configure new {} {}".format(self.redundancy, storage)
|
||||
if enable_tenants:
|
||||
db_config += " tenant_mode=optional_experimental"
|
||||
if self.blob_granules_enabled:
|
||||
db_config += " blob_granules_enabled:=1"
|
||||
args = [self.fdbcli_binary, "-C", self.cluster_file, "--exec", db_config]
|
||||
|
||||
res = subprocess.run(args, env=self.process_env())
|
||||
assert res.returncode == 0, "Create database failed with {}".format(
|
||||
res.returncode
|
||||
)
|
||||
self.fdbcli_exec(db_config)
|
||||
|
||||
if self.blob_granules_enabled:
|
||||
bg_args = [
|
||||
self.fdbcli_binary,
|
||||
"-C",
|
||||
self.cluster_file,
|
||||
"--exec",
|
||||
"blobrange start \\x00 \\xff",
|
||||
]
|
||||
bg_res = subprocess.run(bg_args, env=self.process_env())
|
||||
assert bg_res.returncode == 0, "Start blob granules failed with {}".format(
|
||||
bg_res.returncode
|
||||
)
|
||||
self.fdbcli_exec("blobrange start \\x00 \\xff")
|
||||
|
||||
# Get cluster status using fdbcli
|
||||
def get_status(self):
|
||||
args = [self.fdbcli_binary, "-C", self.cluster_file, "--exec", "status json"]
|
||||
res = subprocess.run(args, env=self.process_env(), stdout=subprocess.PIPE)
|
||||
assert res.returncode == 0, "Get status failed with {}".format(res.returncode)
|
||||
return json.loads(res.stdout)
|
||||
status_output = self.fdbcli_exec_and_get("status json")
|
||||
return json.loads(status_output)
|
||||
|
||||
# Get the set of servers from the cluster status matching the given filter
|
||||
def get_servers_from_status(self, filter):
|
||||
status = self.get_status()
|
||||
if "processes" not in status["cluster"]:
|
||||
return {}
|
||||
|
||||
servers_found = set()
|
||||
addresses = [proc_info["address"] for proc_info in status["cluster"]["processes"].values() if filter(proc_info)]
|
||||
for addr in addresses:
|
||||
port = int(addr.split(":", 1)[1])
|
||||
assert port in self.server_by_port, "Unknown server port {}".format(port)
|
||||
servers_found.add(self.server_by_port[port])
|
||||
|
||||
return servers_found
|
||||
|
||||
# Get the set of all servers from the cluster status
|
||||
def get_all_servers_from_status(self):
|
||||
return self.get_servers_from_status(lambda _: True)
|
||||
|
||||
# Get the set of all servers with coordinator role from the cluster status
|
||||
def get_coordinators_from_status(self):
|
||||
def is_coordinator(proc_status):
|
||||
return any(entry["role"] == "coordinator" for entry in proc_status["roles"])
|
||||
return self.get_servers_from_status(is_coordinator)
|
||||
|
||||
def process_env(self):
|
||||
env = dict(os.environ)
|
||||
|
@ -269,3 +308,102 @@ logdir = {logdir}
|
|||
|
||||
def set_env_var(self, var_name, var_val):
|
||||
self.env_vars[var_name] = var_val
|
||||
|
||||
# Add a new server process to the cluster and return its ID
|
||||
# Need to call save_config to apply the changes
|
||||
def add_server(self):
|
||||
server_id = self.next_server_id
|
||||
assert server_id not in self.server_ports, "Server ID {} is already in use".format(server_id)
|
||||
self.next_server_id += 1
|
||||
port = self.__next_port()
|
||||
self.server_ports[server_id] = port
|
||||
self.server_by_port[port] = server_id
|
||||
self.active_servers.add(server_id)
|
||||
return server_id
|
||||
|
||||
# Remove the server with the given ID from the cluster
|
||||
# Need to call save_config to apply the changes
|
||||
def remove_server(self, server_id):
|
||||
assert server_id in self.active_servers, "Server {} does not exist".format(server_id)
|
||||
self.active_servers.remove(server_id)
|
||||
|
||||
# Wait until changes to the set of servers (additions & removals) are applied
|
||||
def wait_for_server_update(self, timeout=CLUSTER_UPDATE_TIMEOUT_SEC):
|
||||
time_limit = time.time() + timeout
|
||||
servers_found = set()
|
||||
while (time.time() <= time_limit):
|
||||
servers_found = self.get_all_servers_from_status()
|
||||
if (servers_found != self.active_servers):
|
||||
break
|
||||
time.sleep(RETRY_INTERVAL_SEC)
|
||||
assert "Failed to apply server changes after {}sec. Expected: {}, Actual: {}".format(
|
||||
timeout, self.active_servers, servers_found)
|
||||
|
||||
# Apply changes to the set of the coordinators, based on the current value of self.coordinators
|
||||
def update_coordinators(self):
|
||||
urls = ["{}:{}".format(self.ip_address, self.server_ports[id]) for id in self.coordinators]
|
||||
self.fdbcli_exec("coordinators {}".format(" ".join(urls)))
|
||||
|
||||
# Wait until the changes to the set of the coordinators are applied
|
||||
def wait_for_coordinator_update(self, timeout=CLUSTER_UPDATE_TIMEOUT_SEC):
|
||||
time_limit = time.time() + timeout
|
||||
coord_found = set()
|
||||
while (time.time() <= time_limit):
|
||||
coord_found = self.get_coordinators_from_status()
|
||||
if (coord_found != self.coordinators):
|
||||
break
|
||||
time.sleep(RETRY_INTERVAL_SEC)
|
||||
assert "Failed to apply coordinator changes after {}sec. Expected: {}, Actual: {}".format(
|
||||
timeout, self.coordinators, coord_found)
|
||||
# Check if the cluster file was successfully updated too
|
||||
connection_string = open(self.cluster_file, "r").read()
|
||||
for server_id in self.coordinators:
|
||||
assert connection_string.find(str(self.server_ports[server_id])) != -1, \
|
||||
"Missing coordinator {} port {} in the cluster file".format(server_id, self.server_ports[server_id])
|
||||
|
||||
# Exclude the servers with the given ID from the cluster, i.e. move out their data
|
||||
# The method waits until the changes are applied
|
||||
def exclude_servers(self, server_ids):
|
||||
urls = ["{}:{}".format(self.ip_address, self.server_ports[id]) for id in server_ids]
|
||||
self.fdbcli_exec("exclude FORCE {}".format(" ".join(urls)), timeout=EXCLUDE_SERVERS_TIMEOUT_SEC)
|
||||
|
||||
# Perform a cluster wiggle: replace all servers with new ones
|
||||
def cluster_wiggle(self):
|
||||
old_servers = self.active_servers.copy()
|
||||
new_servers = set()
|
||||
print("Starting cluster wiggle")
|
||||
print("Old servers: {} on ports {}".format(old_servers, [
|
||||
self.server_ports[server_id] for server_id in old_servers]))
|
||||
print("Old coordinators: {}".format(self.coordinators))
|
||||
|
||||
# Step 1: add new servers
|
||||
start_time = time.time()
|
||||
for _ in range(len(old_servers)):
|
||||
new_servers.add(self.add_server())
|
||||
print("New servers: {} on ports {}".format(new_servers, [
|
||||
self.server_ports[server_id] for server_id in new_servers]))
|
||||
self.save_config()
|
||||
self.wait_for_server_update()
|
||||
print("New servers successfully added to the cluster. Time: {}s".format(time.time()-start_time))
|
||||
|
||||
# Step 2: change coordinators
|
||||
start_time = time.time()
|
||||
new_coordinators = set(random.sample(new_servers, len(self.coordinators)))
|
||||
print("New coordinators: {}".format(new_coordinators))
|
||||
self.coordinators = new_coordinators.copy()
|
||||
self.update_coordinators()
|
||||
self.wait_for_coordinator_update()
|
||||
print("Coordinators successfully changed. Time: {}s".format(time.time()-start_time))
|
||||
|
||||
# Step 3: exclude old servers from the cluster, i.e. move out their data
|
||||
start_time = time.time()
|
||||
self.exclude_servers(old_servers)
|
||||
print("Old servers successfully excluded from the cluster. Time: {}s".format(time.time()-start_time))
|
||||
|
||||
# Step 4: remove the old servers
|
||||
start_time = time.time()
|
||||
for server_id in old_servers:
|
||||
self.remove_server(server_id)
|
||||
self.save_config()
|
||||
self.wait_for_server_update()
|
||||
print("Old servers successfully removed from the cluster. Time: {}s".format(time.time()-start_time))
|
||||
|
|
|
@ -65,6 +65,7 @@ SUPPORTED_VERSIONS = [
|
|||
"5.1.7",
|
||||
"5.1.6",
|
||||
]
|
||||
CLUSTER_ACTIONS = ["wiggle"]
|
||||
FDB_DOWNLOAD_ROOT = "https://github.com/apple/foundationdb/releases/download/"
|
||||
LOCAL_OLD_BINARY_REPO = "/opt/foundationdb/old/"
|
||||
CURRENT_VERSION = "7.2.0"
|
||||
|
@ -128,19 +129,15 @@ def read_to_str(filename):
|
|||
class UpgradeTest:
|
||||
def __init__(
|
||||
self,
|
||||
build_dir: str,
|
||||
upgrade_path: list,
|
||||
process_number: int = 1,
|
||||
port: str = None,
|
||||
args
|
||||
):
|
||||
self.build_dir = Path(build_dir).resolve()
|
||||
assert self.build_dir.exists(), "{} does not exist".format(build_dir)
|
||||
assert self.build_dir.is_dir(), "{} is not a directory".format(build_dir)
|
||||
self.upgrade_path = upgrade_path
|
||||
for version in upgrade_path:
|
||||
assert version in SUPPORTED_VERSIONS, "Unsupported version {}".format(
|
||||
version
|
||||
)
|
||||
self.build_dir = Path(args.build_dir).resolve()
|
||||
assert self.build_dir.exists(), "{} does not exist".format(args.build_dir)
|
||||
assert self.build_dir.is_dir(), "{} is not a directory".format(args.build_dir)
|
||||
self.upgrade_path = args.upgrade_path
|
||||
self.used_versions = set(self.upgrade_path).difference(set(CLUSTER_ACTIONS))
|
||||
for version in self.used_versions:
|
||||
assert version in SUPPORTED_VERSIONS, "Unsupported version or cluster action {}".format(version)
|
||||
self.platform = platform.machine()
|
||||
assert self.platform in SUPPORTED_PLATFORMS, "Unsupported platform {}".format(
|
||||
self.platform
|
||||
|
@ -153,15 +150,15 @@ class UpgradeTest:
|
|||
self.local_binary_repo = None
|
||||
self.download_old_binaries()
|
||||
self.create_external_lib_dir()
|
||||
init_version = upgrade_path[0]
|
||||
init_version = self.upgrade_path[0]
|
||||
self.cluster = LocalCluster(
|
||||
self.tmp_dir,
|
||||
self.binary_path(init_version, "fdbserver"),
|
||||
self.binary_path(init_version, "fdbmonitor"),
|
||||
self.binary_path(init_version, "fdbcli"),
|
||||
process_number,
|
||||
port=port,
|
||||
args.process_number,
|
||||
create_config=False,
|
||||
redundancy=args.redundancy
|
||||
)
|
||||
self.cluster.create_cluster_file()
|
||||
self.configure_version(init_version)
|
||||
|
@ -267,7 +264,7 @@ class UpgradeTest:
|
|||
|
||||
# Download all old binaries required for testing the specified upgrade path
|
||||
def download_old_binaries(self):
|
||||
for version in self.upgrade_path:
|
||||
for version in self.used_versions:
|
||||
if version == CURRENT_VERSION:
|
||||
continue
|
||||
|
||||
|
@ -293,7 +290,7 @@ class UpgradeTest:
|
|||
def create_external_lib_dir(self):
|
||||
self.external_lib_dir = self.tmp_dir.joinpath("client_libs")
|
||||
self.external_lib_dir.mkdir(parents=True)
|
||||
for version in self.upgrade_path:
|
||||
for version in self.used_versions:
|
||||
src_file_path = self.lib_dir(version).joinpath("libfdb_c.so")
|
||||
assert src_file_path.exists(), "{} does not exist".format(src_file_path)
|
||||
target_file_path = self.external_lib_dir.joinpath(
|
||||
|
@ -313,7 +310,7 @@ class UpgradeTest:
|
|||
time.sleep(1)
|
||||
continue
|
||||
num_proc = len(status["cluster"]["processes"])
|
||||
if num_proc < self.cluster.process_number:
|
||||
if num_proc != self.cluster.process_number:
|
||||
print(
|
||||
"Health check: {} of {} processes found. Retrying".format(
|
||||
num_proc, self.cluster.process_number
|
||||
|
@ -321,11 +318,6 @@ class UpgradeTest:
|
|||
)
|
||||
time.sleep(1)
|
||||
continue
|
||||
assert (
|
||||
num_proc == self.cluster.process_number
|
||||
), "Number of processes: expected: {}, actual: {}".format(
|
||||
self.cluster.process_number, num_proc
|
||||
)
|
||||
for (_, proc_stat) in status["cluster"]["processes"].items():
|
||||
proc_ver = proc_stat["version"]
|
||||
assert (
|
||||
|
@ -370,7 +362,7 @@ class UpgradeTest:
|
|||
# Determine FDB API version matching the upgrade path
|
||||
def determine_api_version(self):
|
||||
self.api_version = api_version_from_str(CURRENT_VERSION)
|
||||
for version in self.upgrade_path:
|
||||
for version in self.used_versions:
|
||||
self.api_version = min(api_version_from_str(version), self.api_version)
|
||||
|
||||
# Start the tester to generate the workload specified by the test file
|
||||
|
@ -428,7 +420,6 @@ class UpgradeTest:
|
|||
os._exit(1)
|
||||
|
||||
# Perform a progress check: Trigger it and wait until it is completed
|
||||
|
||||
def progress_check(self):
|
||||
self.progress_event.clear()
|
||||
os.write(self.ctrl_pipe, b"CHECK\n")
|
||||
|
@ -464,11 +455,15 @@ class UpgradeTest:
|
|||
try:
|
||||
self.health_check()
|
||||
self.progress_check()
|
||||
for version in self.upgrade_path[1:]:
|
||||
random_sleep(0.0, 2.0)
|
||||
self.upgrade_to(version)
|
||||
self.health_check()
|
||||
self.progress_check()
|
||||
random_sleep(0.0, 2.0)
|
||||
for entry in self.upgrade_path[1:]:
|
||||
if entry == "wiggle":
|
||||
self.cluster.cluster_wiggle()
|
||||
else:
|
||||
assert entry in self.used_versions, "Unexpected entry in the upgrade path: {}".format(entry)
|
||||
self.upgrade_to(entry)
|
||||
self.health_check()
|
||||
self.progress_check()
|
||||
os.write(self.ctrl_pipe, b"STOP\n")
|
||||
finally:
|
||||
os.close(self.ctrl_pipe)
|
||||
|
@ -611,7 +606,8 @@ if __name__ == "__main__":
|
|||
parser.add_argument(
|
||||
"--upgrade-path",
|
||||
nargs="+",
|
||||
help="Cluster upgrade path: a space separated list of versions",
|
||||
help="Cluster upgrade path: a space separated list of versions.\n" +
|
||||
"The list may also contain cluster change actions: {}".format(CLUSTER_ACTIONS),
|
||||
default=[CURRENT_VERSION],
|
||||
)
|
||||
parser.add_argument(
|
||||
|
@ -626,6 +622,12 @@ if __name__ == "__main__":
|
|||
type=int,
|
||||
default=0,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--redundancy",
|
||||
help="Database redundancy level (default: single)",
|
||||
type=str,
|
||||
default="single",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--disable-log-dump",
|
||||
help="Do not dump cluster log on error",
|
||||
|
@ -639,11 +641,14 @@ if __name__ == "__main__":
|
|||
args.process_number = random.randint(1, 5)
|
||||
print("Testing with {} processes".format(args.process_number))
|
||||
|
||||
assert len(args.upgrade_path) > 0, "Upgrade path must be specified"
|
||||
assert args.upgrade_path[0] in SUPPORTED_VERSIONS, "Upgrade path begin with a valid version number"
|
||||
|
||||
if args.run_with_gdb:
|
||||
RUN_WITH_GDB = True
|
||||
|
||||
errcode = 1
|
||||
with UpgradeTest(args.build_dir, args.upgrade_path, args.process_number) as test:
|
||||
with UpgradeTest(args) as test:
|
||||
print("log-dir: {}".format(test.log))
|
||||
print("etc-dir: {}".format(test.etc))
|
||||
print("data-dir: {}".format(test.data))
|
||||
|
|
Loading…
Reference in New Issue