Temporarily disable client trace check for older version (#9578)

* Disable client trace check test on older version

Older version doesn't guarantee trace flush upon network::stop()
Comment it out for the time being

* Black-reformat authz and client config tester scripts
This commit is contained in:
Junhyun Shim 2023-03-06 15:36:28 +01:00 committed by GitHub
parent 9cbf5c8f0e
commit a2a29af56e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 580 additions and 168 deletions

View File

@ -38,7 +38,9 @@ class TestCluster(LocalCluster):
version: str,
):
self.client_config_tester_bin = Path(args.client_config_tester_bin).resolve()
assert self.client_config_tester_bin.exists(), "{} does not exist".format(self.client_config_tester_bin)
assert self.client_config_tester_bin.exists(), "{} does not exist".format(
self.client_config_tester_bin
)
self.build_dir = Path(args.build_dir).resolve()
assert self.build_dir.exists(), "{} does not exist".format(args.build_dir)
assert self.build_dir.is_dir(), "{} is not a directory".format(args.build_dir)
@ -68,7 +70,10 @@ class TestCluster(LocalCluster):
self.fdbmonitor_binary = downloader.binary_path(version, "fdbmonitor")
self.fdbserver_binary = downloader.binary_path(version, "fdbserver")
self.fdbcli_binary = downloader.binary_path(version, "fdbcli")
self.set_env_var("LD_LIBRARY_PATH", "%s:%s" % (downloader.lib_dir(version), os.getenv("LD_LIBRARY_PATH")))
self.set_env_var(
"LD_LIBRARY_PATH",
"%s:%s" % (downloader.lib_dir(version), os.getenv("LD_LIBRARY_PATH")),
)
self.save_config()
self.ensure_ports_released()
self.start_cluster()
@ -113,26 +118,43 @@ class ClientConfigTest:
self.external_lib_dir.mkdir(parents=True)
for version in versions:
src_file_path = downloader.lib_path(version)
self.tc.assertTrue(src_file_path.exists(), "{} does not exist".format(src_file_path))
target_file_path = self.external_lib_dir.joinpath("libfdb_c.{}.so".format(version))
self.tc.assertTrue(
src_file_path.exists(), "{} does not exist".format(src_file_path)
)
target_file_path = self.external_lib_dir.joinpath(
"libfdb_c.{}.so".format(version)
)
shutil.copyfile(src_file_path, target_file_path)
self.tc.assertTrue(target_file_path.exists(), "{} does not exist".format(target_file_path))
self.tc.assertTrue(
target_file_path.exists(), "{} does not exist".format(target_file_path)
)
def create_external_lib_path(self, version):
src_file_path = downloader.lib_path(version)
self.tc.assertTrue(src_file_path.exists(), "{} does not exist".format(src_file_path))
self.external_lib_path = self.test_dir.joinpath("libfdb_c.{}.so".format(version))
self.tc.assertTrue(
src_file_path.exists(), "{} does not exist".format(src_file_path)
)
self.external_lib_path = self.test_dir.joinpath(
"libfdb_c.{}.so".format(version)
)
shutil.copyfile(src_file_path, self.external_lib_path)
self.tc.assertTrue(self.external_lib_path.exists(), "{} does not exist".format(self.external_lib_path))
self.tc.assertTrue(
self.external_lib_path.exists(),
"{} does not exist".format(self.external_lib_path),
)
def create_cluster_file_with_wrong_port(self):
self.test_cluster_file = self.test_dir.joinpath("{}.cluster".format(random_alphanum_string(16)))
self.test_cluster_file = self.test_dir.joinpath(
"{}.cluster".format(random_alphanum_string(16))
)
port = self.cluster.port_provider.get_free_port()
with open(self.test_cluster_file, "w") as file:
file.write("abcde:fghijk@127.0.0.1:{}".format(port))
def create_invalid_cluster_file(self):
self.test_cluster_file = self.test_dir.joinpath("{}.cluster".format(random_alphanum_string(16)))
self.test_cluster_file = self.test_dir.joinpath(
"{}.cluster".format(random_alphanum_string(16))
)
port = self.cluster.port_provider.get_free_port()
with open(self.test_cluster_file, "w") as file:
file.write("abcde:fghijk@")
@ -149,7 +171,9 @@ class ClientConfigTest:
def check_available_clients(self, expected_clients):
self.tc.assertIsNotNone(self.status_json)
self.tc.assertTrue("AvailableClients" in self.status_json)
actual_clients = [client["ReleaseVersion"] for client in self.status_json["AvailableClients"]]
actual_clients = [
client["ReleaseVersion"] for client in self.status_json["AvailableClients"]
]
self.tc.assertEqual(set(expected_clients), set(actual_clients))
def check_protocol_version_not_set(self):
@ -220,7 +244,11 @@ class ClientConfigTest:
# ----------------------------
def exec(self):
cmd_args = [self.cluster.client_config_tester_bin, "--cluster-file", self.test_cluster_file]
cmd_args = [
self.cluster.client_config_tester_bin,
"--cluster-file",
self.test_cluster_file,
]
if self.tmp_dir is not None:
cmd_args += ["--tmp-dir", self.tmp_dir]
@ -247,7 +275,10 @@ class ClientConfigTest:
cmd_args += ["--network-option-fail_incompatible_client", ""]
if self.trace_file_identifier is not None:
cmd_args += ["--network-option-trace_file_identifier", self.trace_file_identifier]
cmd_args += [
"--network-option-trace_file_identifier",
self.trace_file_identifier,
]
if self.trace_initialize_on_setup:
cmd_args += ["--network-option-trace_initialize_on_setup", ""]
@ -267,8 +298,13 @@ class ClientConfigTest:
if self.print_status:
cmd_args += ["--print-status"]
print("\nExecuting test command: {}".format(" ".join([str(c) for c in cmd_args])), file=sys.stderr)
tester_proc = subprocess.Popen(cmd_args, stdout=subprocess.PIPE, stderr=sys.stderr)
print(
"\nExecuting test command: {}".format(" ".join([str(c) for c in cmd_args])),
file=sys.stderr,
)
tester_proc = subprocess.Popen(
cmd_args, stdout=subprocess.PIPE, stderr=sys.stderr
)
out, _ = tester_proc.communicate()
self.tc.assertEqual(0, tester_proc.returncode)
if self.print_status:
@ -276,7 +312,10 @@ class ClientConfigTest:
try:
self.status_json = json.loads(out)
except json.JSONDecodeError as e:
print("Error '{}' parsing output {}".format(e, out.decode()), file=sys.stderr)
print(
"Error '{}' parsing output {}".format(e, out.decode()),
file=sys.stderr,
)
self.tc.assertIsNotNone(self.status_json)
print("Status: ", self.status_json, file=sys.stderr)
else:
@ -336,12 +375,16 @@ class ClientConfigTests(unittest.TestCase):
# Multiple external clients, normal case
test = ClientConfigTest(self)
test.print_status = True
test.create_external_lib_dir([CURRENT_VERSION, PREV_RELEASE_VERSION, PREV2_RELEASE_VERSION])
test.create_external_lib_dir(
[CURRENT_VERSION, PREV_RELEASE_VERSION, PREV2_RELEASE_VERSION]
)
test.disable_local_client = True
test.api_version = api_version_from_str(PREV2_RELEASE_VERSION)
test.exec()
test.check_healthy_status_report()
test.check_available_clients([CURRENT_VERSION, PREV_RELEASE_VERSION, PREV2_RELEASE_VERSION])
test.check_available_clients(
[CURRENT_VERSION, PREV_RELEASE_VERSION, PREV2_RELEASE_VERSION]
)
test.check_current_client(CURRENT_VERSION)
def test_no_external_client_support_api_version(self):
@ -366,7 +409,9 @@ class ClientConfigTests(unittest.TestCase):
def test_one_external_client_wrong_api_version(self):
# Multiple external clients, API version unsupported by one of othem
test = ClientConfigTest(self)
test.create_external_lib_dir([CURRENT_VERSION, PREV_RELEASE_VERSION, PREV2_RELEASE_VERSION])
test.create_external_lib_dir(
[CURRENT_VERSION, PREV_RELEASE_VERSION, PREV2_RELEASE_VERSION]
)
test.disable_local_client = True
test.api_version = api_version_from_str(CURRENT_VERSION)
test.expected_error = 2204 # API function missing
@ -376,7 +421,9 @@ class ClientConfigTests(unittest.TestCase):
# Multiple external clients; API version unsupported by one of them; Ignore failures
test = ClientConfigTest(self)
test.print_status = True
test.create_external_lib_dir([CURRENT_VERSION, PREV_RELEASE_VERSION, PREV2_RELEASE_VERSION])
test.create_external_lib_dir(
[CURRENT_VERSION, PREV_RELEASE_VERSION, PREV2_RELEASE_VERSION]
)
test.disable_local_client = True
test.api_version = api_version_from_str(CURRENT_VERSION)
test.ignore_external_client_failures = True
@ -549,10 +596,17 @@ class ClientTracingTests(unittest.TestCase):
self.assertEqual(3, len(self.trace_files))
primary_trace = self.find_trace_file(with_ip=True)
self.find_and_check_event(primary_trace, "ClientStart", ["Machine"], [])
cur_ver_trace = self.find_trace_file(with_ip=True, version=CURRENT_VERSION, thread_idx=0)
cur_ver_trace = self.find_trace_file(
with_ip=True, version=CURRENT_VERSION, thread_idx=0
)
self.find_and_check_event(cur_ver_trace, "ClientStart", ["Machine"], [])
prev_ver_trace = self.find_trace_file(with_ip=True, version=PREV_RELEASE_VERSION, thread_idx=0)
self.find_and_check_event(prev_ver_trace, "ClientStart", ["Machine"], [])
prev_ver_trace = self.find_trace_file(
with_ip=True, version=PREV_RELEASE_VERSION, thread_idx=0
)
# there have been sporadic check failures in the trace check below, so we comment this out for the time being
# previous release version was likely not flushing trace correctly when network::stop() is called
# TODO: re-enable this check when we bump up PREV_RELEASE_VERSION to one where there is such a guarantee
# self.find_and_check_event(prev_ver_trace, "ClientStart", ["Machine"], [])
def test_default_config_error_case(self):
# Test that no trace files are created with a default configuration
@ -580,10 +634,14 @@ class ClientTracingTests(unittest.TestCase):
primary_trace = self.find_trace_file()
# The machine address will be available only in the second ClientStart event
self.find_and_check_event(primary_trace, "ClientStart", [], ["Machine"])
self.find_and_check_event(primary_trace, "ClientStart", ["Machine"], [], seqno=1)
self.find_and_check_event(
primary_trace, "ClientStart", ["Machine"], [], seqno=1
)
cur_ver_trace = self.find_trace_file(version=CURRENT_VERSION, thread_idx=0)
self.find_and_check_event(cur_ver_trace, "ClientStart", [], ["Machine"])
self.find_and_check_event(cur_ver_trace, "ClientStart", ["Machine"], [], seqno=1)
self.find_and_check_event(
cur_ver_trace, "ClientStart", ["Machine"], [], seqno=1
)
def test_init_on_setup_trace_error_case(self):
# Test trace files created with trace_initialize_on_setup option
@ -611,7 +669,9 @@ class ClientTracingTests(unittest.TestCase):
self.exec_test()
self.assertEqual(2, len(self.trace_files))
self.find_trace_file(with_ip=True, identifier="fdbclient")
self.find_trace_file(with_ip=True, identifier="fdbclient", version=CURRENT_VERSION, thread_idx=0)
self.find_trace_file(
with_ip=True, identifier="fdbclient", version=CURRENT_VERSION, thread_idx=0
)
def test_init_on_setup_and_trace_identifier(self):
# Test trace files created with trace_initialize_on_setup option
@ -626,7 +686,9 @@ class ClientTracingTests(unittest.TestCase):
self.exec_test()
self.assertEqual(2, len(self.trace_files))
self.find_trace_file(identifier="fdbclient")
self.find_trace_file(identifier="fdbclient", version=CURRENT_VERSION, thread_idx=0)
self.find_trace_file(
identifier="fdbclient", version=CURRENT_VERSION, thread_idx=0
)
# ---------------
# Helper methods
@ -652,7 +714,9 @@ class ClientTracingTests(unittest.TestCase):
events.append(json.loads(line))
self.trace_file_events[trace] = events
def find_trace_file(self, with_ip=False, identifier=None, version=None, thread_idx=None):
def find_trace_file(
self, with_ip=False, identifier=None, version=None, thread_idx=None
):
self.assertIsNotNone(self.trace_files)
for trace_file in self.trace_files:
name = os.path.basename(trace_file)
@ -676,7 +740,9 @@ class ClientTracingTests(unittest.TestCase):
return trace_file
self.fail("No maching trace file found")
def find_and_check_event(self, trace_file, event_type, attr_present, attr_missing, seqno=0):
def find_and_check_event(
self, trace_file, event_type, attr_present, attr_missing, seqno=0
):
self.assertTrue(trace_file in self.trace_file_events)
for event in self.trace_file_events[trace_file]:
if event["Type"] == event_type:

View File

@ -23,8 +23,9 @@ from multiprocessing import Pipe, Process
from typing import Union, List
from util import to_str, to_bytes, cleanup_tenant
class _admin_request(object):
def __init__(self, op: str, args: List[Union[str, bytes]]=[]):
def __init__(self, op: str, args: List[Union[str, bytes]] = []):
self.op = op
self.args = args
@ -34,6 +35,7 @@ class _admin_request(object):
def __repr__(self):
return f"admin_request({self.op}, {self.args})"
def main_loop(main_pipe, pipe):
main_pipe.close()
use_grv_cache = False
@ -85,9 +87,18 @@ def main_loop(main_pipe, pipe):
resp = Exception("db not open")
else:
tr = db.create_transaction()
del tr[b'':b'\xff']
del tr[b"":b"\xff"]
tr.commit().wait()
tenants = list(map(lambda x: x.key, list(fdb.tenant_management.list_tenants(db, b'', b'\xff', 0).to_list())))
tenants = list(
map(
lambda x: x.key,
list(
fdb.tenant_management.list_tenants(
db, b"", b"\xff", 0
).to_list()
),
)
)
for tenant in tenants:
fdb.tenant_management.delete_tenant(db, tenant)
elif op == "terminate":
@ -99,11 +110,14 @@ def main_loop(main_pipe, pipe):
resp = e
pipe.send(resp)
_admin_server = None
def get():
return _admin_server
# server needs to be a singleton running in subprocess, because FDB network layer (including active TLS config) is a global var
class Server(object):
def __init__(self):
@ -111,7 +125,9 @@ class Server(object):
assert _admin_server is None, "admin server may be setup once per process"
_admin_server = self
self._main_pipe, self._admin_pipe = Pipe(duplex=True)
self._admin_proc = Process(target=main_loop, args=(self._main_pipe, self._admin_pipe))
self._admin_proc = Process(
target=main_loop, args=(self._main_pipe, self._admin_pipe)
)
def start(self):
self._admin_proc.start()

View File

@ -33,21 +33,65 @@ from collections.abc import Callable
from multiprocessing import Process, Pipe
from typing import Union
from authz_util import token_gen, private_key_gen, public_keyset_from_keys, alg_from_kty
from util import random_alphanum_str, random_alphanum_bytes, to_str, to_bytes, KeyFileReverter, wait_until_tenant_tr_succeeds, wait_until_tenant_tr_fails
from util import (
random_alphanum_str,
random_alphanum_bytes,
to_str,
to_bytes,
KeyFileReverter,
wait_until_tenant_tr_succeeds,
wait_until_tenant_tr_fails,
)
from test_util import ScopedTraceChecker
from local_cluster import TLSConfig
from tmp_cluster import TempCluster
special_key_ranges = [
# (description, range_begin, range_end, readable, writable)
("transaction description", b"\xff\xff/description", b"\xff\xff/description\x00", True, False),
(
"transaction description",
b"\xff\xff/description",
b"\xff\xff/description\x00",
True,
False,
),
("global knobs", b"\xff\xff/globalKnobs", b"\xff\xff/globalKnobs\x00", True, False),
("knobs", b"\xff\xff/knobs/", b"\xff\xff/knobs0\x00", True, False),
("conflicting keys", b"\xff\xff/transaction/conflicting_keys/", b"\xff\xff/transaction/conflicting_keys/\xff\xff", True, False),
("read conflict range", b"\xff\xff/transaction/read_conflict_range/", b"\xff\xff/transaction/read_conflict_range/\xff\xff", True, False),
("conflicting keys", b"\xff\xff/transaction/write_conflict_range/", b"\xff\xff/transaction/write_conflict_range/\xff\xff", True, False),
("data distribution stats", b"\xff\xff/metrics/data_distribution_stats/", b"\xff\xff/metrics/data_distribution_stats/\xff\xff", False, False),
("kill storage", b"\xff\xff/globals/killStorage", b"\xff\xff/globals/killStorage\x00", True, False),
(
"conflicting keys",
b"\xff\xff/transaction/conflicting_keys/",
b"\xff\xff/transaction/conflicting_keys/\xff\xff",
True,
False,
),
(
"read conflict range",
b"\xff\xff/transaction/read_conflict_range/",
b"\xff\xff/transaction/read_conflict_range/\xff\xff",
True,
False,
),
(
"conflicting keys",
b"\xff\xff/transaction/write_conflict_range/",
b"\xff\xff/transaction/write_conflict_range/\xff\xff",
True,
False,
),
(
"data distribution stats",
b"\xff\xff/metrics/data_distribution_stats/",
b"\xff\xff/metrics/data_distribution_stats/\xff\xff",
False,
False,
),
(
"kill storage",
b"\xff\xff/globals/killStorage",
b"\xff\xff/globals/killStorage\x00",
True,
False,
),
]
# handler for when looping is assumed with usage
@ -60,12 +104,14 @@ def loop_until_success(tr: fdb.Transaction, func):
except fdb.FDBError as e:
tr.on_error(e).wait()
# test that token option on a transaction should survive soft transaction resets,
# be cleared by hard transaction resets, and also clearable by setting empty value
def test_token_option(cluster, default_tenant, tenant_tr_gen, token_claim_1h):
token = token_gen(cluster.private_key, token_claim_1h(default_tenant))
tr = tenant_tr_gen(default_tenant)
tr.options.set_authorization_token(token)
def commit_some_value(tr):
tr[b"abc"] = b"def"
return tr.commit().wait()
@ -73,13 +119,15 @@ def test_token_option(cluster, default_tenant, tenant_tr_gen, token_claim_1h):
loop_until_success(tr, commit_some_value)
# token option should survive a soft reset by a retryable error
tr.on_error(fdb.FDBError(1020)).wait() # not_committed (conflict)
tr.on_error(fdb.FDBError(1020)).wait() # not_committed (conflict)
def read_back_value(tr):
return tr[b"abc"].value
value = loop_until_success(tr, read_back_value)
assert value == b"def", f"unexpected value found: {value}"
tr.reset() # token shouldn't survive a hard reset
tr.reset() # token shouldn't survive a hard reset
try:
value = read_back_value(tr)
assert False, "expected permission_denied, but succeeded"
@ -88,7 +136,7 @@ def test_token_option(cluster, default_tenant, tenant_tr_gen, token_claim_1h):
tr.reset()
tr.options.set_authorization_token(token)
tr.options.set_authorization_token() # option set with no arg should clear the token
tr.options.set_authorization_token() # option set with no arg should clear the token
try:
value = read_back_value(tr)
@ -96,7 +144,10 @@ def test_token_option(cluster, default_tenant, tenant_tr_gen, token_claim_1h):
except fdb.FDBError as e:
assert e.code == 6000, f"expected permission_denied, got {e} instead"
def test_simple_tenant_access(cluster, default_tenant, tenant_tr_gen, token_claim_1h, tenant_id_from_name):
def test_simple_tenant_access(
cluster, default_tenant, tenant_tr_gen, token_claim_1h, tenant_id_from_name
):
def check_token_usage_trace(trace_entries, token_claim, token_signature_part):
found = False
for filename, ev_type, entry in trace_entries:
@ -109,19 +160,36 @@ def test_simple_tenant_access(cluster, default_tenant, tenant_tr_gen, token_clai
if jti_actual == jti_expect and tenantid_actual == tenantid_expect:
found = True
else:
print(f"found unknown tenant in token usage audit log; tokenid={jti_actual} vs. {jti_expect}, tenantid={tenantid_actual} vs. {tenantid_expect}")
print(
f"found unknown tenant in token usage audit log; tokenid={jti_actual} vs. {jti_expect}, tenantid={tenantid_actual} vs. {tenantid_expect}"
)
for k, v in entry.items():
if k.find(token_signature_part) != -1 or v.find(token_signature_part) != -1:
pytest.fail(f"token usage trace includes sensitive token signature: key={k} value={v}")
if (
k.find(token_signature_part) != -1
or v.find(token_signature_part) != -1
):
pytest.fail(
f"token usage trace includes sensitive token signature: key={k} value={v}"
)
if not found:
pytest.fail("failed to find any AuditTokenUsed entry matching token from the testcase")
pytest.fail(
"failed to find any AuditTokenUsed entry matching token from the testcase"
)
token_claim = token_claim_1h(default_tenant)
token = token_gen(cluster.private_key, token_claim)
token_sig_part = to_str(token[token.rfind(b".") + 1:])
with ScopedTraceChecker(cluster, functools.partial(check_token_usage_trace, token_claim=token_claim, token_signature_part=token_sig_part)):
token_sig_part = to_str(token[token.rfind(b".") + 1 :])
with ScopedTraceChecker(
cluster,
functools.partial(
check_token_usage_trace,
token_claim=token_claim,
token_signature_part=token_sig_part,
),
):
tr = tenant_tr_gen(default_tenant)
tr.options.set_authorization_token(token)
def commit_some_value(tr):
tr[b"abc"] = b"def"
tr.commit().wait()
@ -129,21 +197,28 @@ def test_simple_tenant_access(cluster, default_tenant, tenant_tr_gen, token_clai
loop_until_success(tr, commit_some_value)
tr = tenant_tr_gen(default_tenant)
tr.options.set_authorization_token(token)
def read_back_value(tr):
return tr[b"abc"].value
return tr[b"abc"].value
value = loop_until_success(tr, read_back_value)
assert value == b"def", "tenant write transaction not visible"
def test_cross_tenant_access_disallowed(cluster, default_tenant, tenant_gen, tenant_tr_gen, token_claim_1h):
def test_cross_tenant_access_disallowed(
cluster, default_tenant, tenant_gen, tenant_tr_gen, token_claim_1h
):
# use default tenant token with second tenant transaction and see it fail
second_tenant = random_alphanum_bytes(12)
tenant_gen(second_tenant)
token_second = token_gen(cluster.private_key, token_claim_1h(second_tenant))
tr_second = tenant_tr_gen(second_tenant)
tr_second.options.set_authorization_token(token_second)
def commit_some_value(tr):
tr[b"abc"] = b"def"
return tr.commit().wait()
loop_until_success(tr_second, commit_some_value)
token_default = token_gen(cluster.private_key, token_claim_1h(default_tenant))
tr_second = tenant_tr_gen(second_tenant)
@ -151,7 +226,9 @@ def test_cross_tenant_access_disallowed(cluster, default_tenant, tenant_gen, ten
# test that read transaction fails
try:
value = tr_second[b"abc"].value
assert False, f"expected permission denied, but read transaction went through, value: {value}"
assert (
False
), f"expected permission denied, but read transaction went through, value: {value}"
except fdb.FDBError as e:
assert e.code == 6000, f"expected permission_denied, got {e} instead"
# test that write transaction fails
@ -164,7 +241,10 @@ def test_cross_tenant_access_disallowed(cluster, default_tenant, tenant_gen, ten
except fdb.FDBError as e:
assert e.code == 6000, f"expected permission_denied, got {e} instead"
def test_cross_tenant_raw_access_disallowed_with_token(cluster, db, default_tenant, tenant_gen, tenant_tr_gen, token_claim_1h):
def test_cross_tenant_raw_access_disallowed_with_token(
cluster, db, default_tenant, tenant_gen, tenant_tr_gen, token_claim_1h
):
def commit_some_value(tr):
tr[b"abc"] = b"def"
return tr.commit().wait()
@ -195,13 +275,14 @@ def test_cross_tenant_raw_access_disallowed_with_token(cluster, db, default_tena
lhs = min(prefix_first, prefix_second)
rhs = max(prefix_first, prefix_second)
rhs = bytearray(rhs)
rhs[-1] += 1 # exclusive end
rhs[-1] += 1 # exclusive end
try:
value = tr[lhs:bytes(rhs)].to_list()
value = tr[lhs : bytes(rhs)].to_list()
assert False, f"expected permission_denied, but succeeded, value: {value}"
except fdb.FDBError as e:
assert e.code == 6000, f"expected permission_denied, got {e} instead"
def test_system_and_special_key_range_disallowed(db, tenant_tr_gen):
second_tenant = random_alphanum_bytes(12)
try:
@ -214,22 +295,37 @@ def test_system_and_special_key_range_disallowed(db, tenant_tr_gen):
tr = db.create_transaction()
tr.options.set_access_system_keys()
kvs = tr.get_range(b"\xff", b"\xff\xff", limit=1).to_list()
assert False, f"disallowed system keyspace read has succeeded. found item: {kvs}"
assert (
False
), f"disallowed system keyspace read has succeeded. found item: {kvs}"
except fdb.FDBError as e:
assert e.code == 6000, f"expected permission_denied, got {e} instead"
for range_name, special_range_begin, special_range_end, readable, _ in special_key_ranges:
for (
range_name,
special_range_begin,
special_range_end,
readable,
_,
) in special_key_ranges:
tr = db.create_transaction()
tr.options.set_access_system_keys()
tr.options.set_special_key_space_relaxed()
try:
kvs = tr.get_range(special_range_begin, special_range_end, limit=1).to_list()
kvs = tr.get_range(
special_range_begin, special_range_end, limit=1
).to_list()
if readable:
pass
else:
pytest.fail(f"disallowed special keyspace read for range '{range_name}' has succeeded. found item {kvs}")
pytest.fail(
f"disallowed special keyspace read for range '{range_name}' has succeeded. found item {kvs}"
)
except fdb.FDBError as e:
assert e.code in [6000, 6001], f"expected authz error from attempted read to range '{range_name}', got {e} instead"
assert e.code in [
6000,
6001,
], f"expected authz error from attempted read to range '{range_name}', got {e} instead"
try:
tr = db.create_transaction()
@ -240,7 +336,13 @@ def test_system_and_special_key_range_disallowed(db, tenant_tr_gen):
except fdb.FDBError as e:
assert e.code == 6000, f"expected permission_denied, got {e} instead"
for range_name, special_range_begin, special_range_end, _, writable in special_key_ranges:
for (
range_name,
special_range_begin,
special_range_end,
_,
writable,
) in special_key_ranges:
tr = db.create_transaction()
tr.options.set_access_system_keys()
tr.options.set_special_key_space_relaxed()
@ -251,10 +353,14 @@ def test_system_and_special_key_range_disallowed(db, tenant_tr_gen):
if writable:
pass
else:
pytest.fail(f"write to disallowed special keyspace range '{range_name}' has succeeded")
pytest.fail(
f"write to disallowed special keyspace range '{range_name}' has succeeded"
)
except fdb.FDBError as e:
error_range = [6000, 6001, 2115] if not writable else []
assert e.code in error_range, f"expected errors {error_range} from attempted write to range '{range_name}', got {e} instead"
assert (
e.code in error_range
), f"expected errors {error_range} from attempted write to range '{range_name}', got {e} instead"
try:
tr = db.create_transaction()
@ -264,9 +370,16 @@ def test_system_and_special_key_range_disallowed(db, tenant_tr_gen):
except fdb.FDBError as e:
assert e.code == 6000, f"expected permission_denied, got {e} instead"
def test_public_key_set_rollover(
kty, public_key_refresh_interval,
cluster, default_tenant, tenant_gen, tenant_tr_gen, token_claim_1h):
kty,
public_key_refresh_interval,
cluster,
default_tenant,
tenant_gen,
tenant_tr_gen,
token_claim_1h,
):
new_kid = random_alphanum_str(12)
new_kty = "EC" if kty == "RSA" else "RSA"
new_key = private_key_gen(kty=new_kty, kid=new_kid)
@ -289,66 +402,119 @@ def test_public_key_set_rollover(
with KeyFileReverter(cluster.public_key_json_file, old_key_json, delay):
with open(cluster.public_key_json_file, "w") as keyfile:
keyfile.write(interim_set)
wait_until_tenant_tr_succeeds(second_tenant, new_key, tenant_tr_gen, max_repeat, delay, token_claim_1h)
wait_until_tenant_tr_succeeds(
second_tenant, new_key, tenant_tr_gen, max_repeat, delay, token_claim_1h
)
print("interim key set activated")
final_set = public_keyset_from_keys([new_key])
print(f"final keyset: {final_set}")
with open(cluster.public_key_json_file, "w") as keyfile:
keyfile.write(final_set)
wait_until_tenant_tr_fails(default_tenant, cluster.private_key, tenant_tr_gen, max_repeat, delay, token_claim_1h)
wait_until_tenant_tr_fails(
default_tenant,
cluster.private_key,
tenant_tr_gen,
max_repeat,
delay,
token_claim_1h,
)
def test_public_key_set_broken_file_tolerance(
cluster, public_key_refresh_interval, default_tenant, tenant_tr_gen, token_claim_1h):
cluster, public_key_refresh_interval, default_tenant, tenant_tr_gen, token_claim_1h
):
delay = public_key_refresh_interval
# retry limit in waiting for keyset file update to propagate to FDB server's internal keyset
max_repeat = 10
with KeyFileReverter(cluster.public_key_json_file, cluster.public_key_jwks_str, delay):
with KeyFileReverter(
cluster.public_key_json_file, cluster.public_key_jwks_str, delay
):
# key file update should take effect even after witnessing broken key file
with open(cluster.public_key_json_file, "w") as keyfile:
keyfile.write(cluster.public_key_jwks_str.strip()[:10]) # make the file partial, injecting parse error
keyfile.write(
cluster.public_key_jwks_str.strip()[:10]
) # make the file partial, injecting parse error
time.sleep(delay * 2)
# should still work; internal key set only clears with a valid, empty key set file
tr_default = tenant_tr_gen(default_tenant)
tr_default.options.set_authorization_token(token_gen(cluster.private_key, token_claim_1h(default_tenant)))
tr_default.options.set_authorization_token(
token_gen(cluster.private_key, token_claim_1h(default_tenant))
)
tr_default[b"abc"] = b"def"
tr_default.commit().wait()
with open(cluster.public_key_json_file, "w") as keyfile:
keyfile.write('{"keys":[]}')
# eventually internal key set will become empty and won't accept any new tokens
wait_until_tenant_tr_fails(default_tenant, cluster.private_key, tenant_tr_gen, max_repeat, delay, token_claim_1h)
wait_until_tenant_tr_fails(
default_tenant,
cluster.private_key,
tenant_tr_gen,
max_repeat,
delay,
token_claim_1h,
)
def test_public_key_set_deletion_tolerance(
cluster, public_key_refresh_interval, default_tenant, tenant_tr_gen, token_claim_1h):
cluster, public_key_refresh_interval, default_tenant, tenant_tr_gen, token_claim_1h
):
delay = public_key_refresh_interval
# retry limit in waiting for keyset file update to propagate to FDB server's internal keyset
max_repeat = 10
with KeyFileReverter(cluster.public_key_json_file, cluster.public_key_jwks_str, delay):
with KeyFileReverter(
cluster.public_key_json_file, cluster.public_key_jwks_str, delay
):
# key file update should take effect even after witnessing deletion of key file
with open(cluster.public_key_json_file, "w") as keyfile:
keyfile.write('{"keys":[]}')
time.sleep(delay)
wait_until_tenant_tr_fails(default_tenant, cluster.private_key, tenant_tr_gen, max_repeat, delay, token_claim_1h)
wait_until_tenant_tr_fails(
default_tenant,
cluster.private_key,
tenant_tr_gen,
max_repeat,
delay,
token_claim_1h,
)
os.remove(cluster.public_key_json_file)
time.sleep(delay * 2)
with open(cluster.public_key_json_file, "w") as keyfile:
keyfile.write(cluster.public_key_jwks_str)
# eventually updated key set should take effect and transaction should be accepted
wait_until_tenant_tr_succeeds(default_tenant, cluster.private_key, tenant_tr_gen, max_repeat, delay, token_claim_1h)
wait_until_tenant_tr_succeeds(
default_tenant,
cluster.private_key,
tenant_tr_gen,
max_repeat,
delay,
token_claim_1h,
)
def test_public_key_set_empty_file_tolerance(
cluster, public_key_refresh_interval, default_tenant, tenant_tr_gen, token_claim_1h):
cluster, public_key_refresh_interval, default_tenant, tenant_tr_gen, token_claim_1h
):
delay = public_key_refresh_interval
# retry limit in waiting for keyset file update to propagate to FDB server's internal keyset
max_repeat = 10
with KeyFileReverter(cluster.public_key_json_file, cluster.public_key_jwks_str, delay):
with KeyFileReverter(
cluster.public_key_json_file, cluster.public_key_jwks_str, delay
):
# key file update should take effect even after witnessing an empty file
with open(cluster.public_key_json_file, "w") as keyfile:
keyfile.write('{"keys":[]}')
# eventually internal key set will become empty and won't accept any new tokens
wait_until_tenant_tr_fails(default_tenant, cluster.private_key, tenant_tr_gen, max_repeat, delay, token_claim_1h)
wait_until_tenant_tr_fails(
default_tenant,
cluster.private_key,
tenant_tr_gen,
max_repeat,
delay,
token_claim_1h,
)
# empty the key file
with open(cluster.public_key_json_file, "w") as keyfile:
pass
@ -356,7 +522,15 @@ def test_public_key_set_empty_file_tolerance(
with open(cluster.public_key_json_file, "w") as keyfile:
keyfile.write(cluster.public_key_jwks_str)
# eventually key file should update and transactions should go through
wait_until_tenant_tr_succeeds(default_tenant, cluster.private_key, tenant_tr_gen, max_repeat, delay, token_claim_1h)
wait_until_tenant_tr_succeeds(
default_tenant,
cluster.private_key,
tenant_tr_gen,
max_repeat,
delay,
token_claim_1h,
)
def test_bad_token(cluster, default_tenant, tenant_tr_gen, token_claim_1h):
def del_attr(d, attr):
@ -371,10 +545,18 @@ def test_bad_token(cluster, default_tenant, tenant_tr_gen, token_claim_1h):
("no nbf", lambda claim: del_attr(claim, "nbf"), "NoNotBefore"),
("no exp", lambda claim: del_attr(claim, "exp"), "NoExpirationTime"),
("no iat", lambda claim: del_attr(claim, "iat"), "NoIssuedAt"),
("too early", lambda claim: set_attr(claim, "nbf", time.time() + 30), "TokenNotYetValid"),
(
"too early",
lambda claim: set_attr(claim, "nbf", time.time() + 30),
"TokenNotYetValid",
),
("too late", lambda claim: set_attr(claim, "exp", time.time() - 10), "Expired"),
("no tenants", lambda claim: del_attr(claim, "tenants"), "NoTenants"),
("empty tenants", lambda claim: set_attr(claim, "tenants", []), "TenantTokenMismatch"),
(
"empty tenants",
lambda claim: set_attr(claim, "tenants", []),
"TenantTokenMismatch",
),
]
def check_invalid_token_trace(trace_entries, expected_reason, case_name):
@ -386,65 +568,113 @@ def test_bad_token(cluster, default_tenant, tenant_tr_gen, token_claim_1h):
if actual_reason == expected_reason:
invalid_token_found = True
else:
print("InvalidToken reason mismatch: expected '{}' got '{}'".format(expected_reason, actual_reason))
print(
"InvalidToken reason mismatch: expected '{}' got '{}'".format(
expected_reason, actual_reason
)
)
print("trace entry: {}".format(entry.items()))
elif ev_type == "UnauthorizedAccessPrevented":
unauthorized_access_found = True
if not invalid_token_found:
pytest.fail("Failed to find invalid token reason '{}' in trace for case '{}'".format(expected_reason, case_name))
pytest.fail(
"Failed to find invalid token reason '{}' in trace for case '{}'".format(
expected_reason, case_name
)
)
if not unauthorized_access_found:
pytest.fail("Failed to find 'UnauthorizedAccessPrevented' event in trace for case '{}'".format(case_name))
pytest.fail(
"Failed to find 'UnauthorizedAccessPrevented' event in trace for case '{}'".format(
case_name
)
)
for case_name, mutation, expected_failure_reason in claim_mutations:
with ScopedTraceChecker(cluster, functools.partial(check_invalid_token_trace, expected_reason=expected_failure_reason, case_name=case_name)) as checker:
with ScopedTraceChecker(
cluster,
functools.partial(
check_invalid_token_trace,
expected_reason=expected_failure_reason,
case_name=case_name,
),
) as checker:
tr = tenant_tr_gen(default_tenant)
tr.options.set_authorization_token(token_gen(cluster.private_key, mutation(token_claim_1h(default_tenant))))
tr.options.set_authorization_token(
token_gen(cluster.private_key, mutation(token_claim_1h(default_tenant)))
)
print(f"Trace check begin for '{case_name}': {checker.begin}")
try:
value = tr[b"abc"].value
assert False, f"expected permission_denied for case '{case_name}', but read transaction went through"
assert (
False
), f"expected permission_denied for case '{case_name}', but read transaction went through"
except fdb.FDBError as e:
assert e.code == 6000, f"expected permission_denied for case '{case_name}', got {e} instead"
assert (
e.code == 6000
), f"expected permission_denied for case '{case_name}', got {e} instead"
tr = tenant_tr_gen(default_tenant)
tr.options.set_authorization_token(token_gen(cluster.private_key, mutation(token_claim_1h(default_tenant))))
tr.options.set_authorization_token(
token_gen(cluster.private_key, mutation(token_claim_1h(default_tenant)))
)
tr[b"abc"] = b"def"
try:
tr.commit().wait()
assert False, f"expected permission_denied for case '{case_name}', but write transaction went through"
assert (
False
), f"expected permission_denied for case '{case_name}', but write transaction went through"
except fdb.FDBError as e:
assert e.code == 6000, f"expected permission_denied for case '{case_name}', got {e} instead"
assert (
e.code == 6000
), f"expected permission_denied for case '{case_name}', got {e} instead"
print(f"Trace check end for '{case_name}': {time.time()}")
with ScopedTraceChecker(cluster, functools.partial(check_invalid_token_trace, expected_reason="UnknownKey", case_name="untrusted key")):
with ScopedTraceChecker(
cluster,
functools.partial(
check_invalid_token_trace,
expected_reason="UnknownKey",
case_name="untrusted key",
),
):
# unknown key case: override "kid" field in header
# first, update only the kid field of key with export-update-import
key_dict = cluster.private_key.as_dict(is_private=True)
key_dict["kid"] = random_alphanum_str(10)
renamed_key = authlib.jose.JsonWebKey.import_key(key_dict)
unknown_key_token = token_gen(
renamed_key,
token_claim_1h(default_tenant),
headers={
"typ": "JWT",
"kty": renamed_key.kty,
"alg": alg_from_kty(renamed_key.kty),
"kid": renamed_key.kid,
})
renamed_key,
token_claim_1h(default_tenant),
headers={
"typ": "JWT",
"kty": renamed_key.kty,
"alg": alg_from_kty(renamed_key.kty),
"kid": renamed_key.kid,
},
)
tr = tenant_tr_gen(default_tenant)
tr.options.set_authorization_token(unknown_key_token)
try:
value = tr[b"abc"].value
assert False, f"expected permission_denied for 'unknown key' case, but read transaction went through"
assert (
False
), f"expected permission_denied for 'unknown key' case, but read transaction went through"
except fdb.FDBError as e:
assert e.code == 6000, f"expected permission_denied for 'unknown key' case, got {e} instead"
assert (
e.code == 6000
), f"expected permission_denied for 'unknown key' case, got {e} instead"
tr = tenant_tr_gen(default_tenant)
tr.options.set_authorization_token(unknown_key_token)
tr[b"abc"] = b"def"
try:
tr.commit().wait()
assert False, f"expected permission_denied for 'unknown key' case, but write transaction went through"
assert (
False
), f"expected permission_denied for 'unknown key' case, but write transaction went through"
except fdb.FDBError as e:
assert e.code == 6000, f"expected permission_denied for 'unknown key' case, got {e} instead"
assert (
e.code == 6000
), f"expected permission_denied for 'unknown key' case, got {e} instead"
def test_authz_not_enabled_trace(build_dir):
# spin up a cluster without authz and see it logs as expected
@ -464,12 +694,13 @@ def test_authz_not_enabled_trace(build_dir):
pytest.fail(f"failed to locate tokenless mode trace '{keyfile_unset_ev}'")
with TempCluster(
build_dir=build_dir,
tls_config = TLSConfig(server_chain_len=3, client_chain_len=2),
authorization_kty = "", # this ensures that no public key files are generated and produces AuthzPublicKeyFileNotSet
remove_at_exit=True,
custom_config={
"knob-allow-tokenless-tenant-access": "true",
}) as cluster:
build_dir=build_dir,
tls_config=TLSConfig(server_chain_len=3, client_chain_len=2),
authorization_kty="", # this ensures that no public key files are generated and produces AuthzPublicKeyFileNotSet
remove_at_exit=True,
custom_config={
"knob-allow-tokenless-tenant-access": "true",
},
) as cluster:
cluster.add_trace_check(check_authz_disablement_traces)
# safe to drop cluster immediately. TempCluster.__enter__ returns only after fdbcli "create database" succeeds.

View File

@ -37,64 +37,88 @@ fdb.api_version(fdb.LATEST_API_VERSION)
cluster_scope = "module"
def pytest_addoption(parser):
parser.addoption(
"--build-dir", action="store", dest="build_dir", help="FDB build directory", required=True)
"--build-dir",
action="store",
dest="build_dir",
help="FDB build directory",
required=True,
)
parser.addoption(
"--kty", action="store", choices=["EC", "RSA"], default="EC", dest="kty", help="Token signature algorithm")
"--kty",
action="store",
choices=["EC", "RSA"],
default="EC",
dest="kty",
help="Token signature algorithm",
)
parser.addoption(
"--trusted-client",
action="store_true",
default=False,
dest="trusted_client",
help="Whether client shall be configured trusted, i.e. mTLS-ready")
"--trusted-client",
action="store_true",
default=False,
dest="trusted_client",
help="Whether client shall be configured trusted, i.e. mTLS-ready",
)
parser.addoption(
"--public-key-refresh-interval",
action="store",
default=1,
dest="public_key_refresh_interval",
help="How frequently server refreshes authorization public key file")
"--public-key-refresh-interval",
action="store",
default=1,
dest="public_key_refresh_interval",
help="How frequently server refreshes authorization public key file",
)
parser.addoption(
"--force-multi-version-client",
action="store_true",
default=False,
dest="force_multi_version_client",
help="Whether to force multi-version client mode")
"--force-multi-version-client",
action="store_true",
default=False,
dest="force_multi_version_client",
help="Whether to force multi-version client mode",
)
parser.addoption(
"--use-grv-cache",
action="store_true",
default=False,
dest="use_grv_cache",
help="Whether to make client use cached GRV from database context")
"--use-grv-cache",
action="store_true",
default=False,
dest="use_grv_cache",
help="Whether to make client use cached GRV from database context",
)
@pytest.fixture(scope="session")
def build_dir(request):
return request.config.option.build_dir
@pytest.fixture(scope="session")
def kty(request):
return request.config.option.kty
@pytest.fixture(scope="session")
def trusted_client(request):
return request.config.option.trusted_client
@pytest.fixture(scope="session")
def public_key_refresh_interval(request):
return request.config.option.public_key_refresh_interval
@pytest.fixture(scope="session")
def force_multi_version_client(request):
return request.config.option.force_multi_version_client
@pytest.fixture(scope="session")
def use_grv_cache(request):
return request.config.option.use_grv_cache
@pytest.fixture(scope="session")
def kid():
return random_alphanum_str(12)
@pytest.fixture(scope=cluster_scope)
def admin_ipc():
server = admin_server.Server()
@ -102,18 +126,27 @@ def admin_ipc():
yield server
server.join()
@pytest.fixture(autouse=True, scope=cluster_scope)
def cluster(admin_ipc, build_dir, public_key_refresh_interval, trusted_client, force_multi_version_client, use_grv_cache):
def cluster(
admin_ipc,
build_dir,
public_key_refresh_interval,
trusted_client,
force_multi_version_client,
use_grv_cache,
):
cluster_creation_time = time.time()
with TempCluster(
build_dir=build_dir,
tls_config=TLSConfig(server_chain_len=3, client_chain_len=2),
authorization_kty="EC",
authorization_keypair_id="authz-key",
remove_at_exit=True,
custom_config={
"knob-public-key-file-refresh-interval-seconds": public_key_refresh_interval,
}) as cluster:
build_dir=build_dir,
tls_config=TLSConfig(server_chain_len=3, client_chain_len=2),
authorization_kty="EC",
authorization_keypair_id="authz-key",
remove_at_exit=True,
custom_config={
"knob-public-key-file-refresh-interval-seconds": public_key_refresh_interval,
},
) as cluster:
keyfile = str(cluster.client_key_file)
certfile = str(cluster.client_cert_file)
cafile = str(cluster.server_ca_file)
@ -125,14 +158,20 @@ def cluster(admin_ipc, build_dir, public_key_refresh_interval, trusted_client, f
fdb.options.set_trace_file_identifier("testclient")
if force_multi_version_client:
fdb.options.set_disable_client_bypass()
admin_ipc.request("configure_client", [force_multi_version_client, use_grv_cache, logdir])
admin_ipc.request(
"configure_client", [force_multi_version_client, use_grv_cache, logdir]
)
admin_ipc.request("configure_tls", [keyfile, certfile, cafile])
admin_ipc.request("connect", [str(cluster.cluster_file)])
def check_no_invalid_traces(entries):
for filename, ev_type, entry in entries:
if ev_type.startswith("InvalidAuditLogType_"):
pytest.fail("Invalid audit log detected in file {}: {}".format(filename, entry.items()))
pytest.fail(
"Invalid audit log detected in file {}: {}".format(
filename, entry.items()
)
)
cluster.add_trace_check(check_no_invalid_traces)
@ -142,21 +181,35 @@ def cluster(admin_ipc, build_dir, public_key_refresh_interval, trusted_client, f
apply_trace_time = None
bad_trace_time = None
for filename, ev_type, entry in entries:
if apply_trace_time is None and ev_type == keyset_apply_ev_type and int(entry.attrib["NumPublicKeys"]) > 0:
if (
apply_trace_time is None
and ev_type == keyset_apply_ev_type
and int(entry.attrib["NumPublicKeys"]) > 0
):
apply_trace_time = float(entry.attrib["Time"])
if bad_trace_time is None and ev_type == bad_ev_type:
bad_trace_found = float(entry.attrib["Time"])
if apply_trace_time is None:
pytest.fail(f"failed to find '{keyset_apply_ev_type}' event with >0 public keys")
pytest.fail(
f"failed to find '{keyset_apply_ev_type}' event with >0 public keys"
)
else:
print(f"'{keyset_apply_ev_type}' found at {apply_trace_time - cluster_creation_time}s since cluster creation")
print(
f"'{keyset_apply_ev_type}' found at {apply_trace_time - cluster_creation_time}s since cluster creation"
)
if bad_trace_time is not None:
pytest.fail(f"unexpected '{bad_ev_type}' trace found at {bad_trace_time}")
pytest.fail(
f"unexpected '{bad_ev_type}' trace found at {bad_trace_time}"
)
cluster.add_trace_check(functools.partial(check_public_keyset_apply, cluster_creation_time=cluster_creation_time))
cluster.add_trace_check(
functools.partial(
check_public_keyset_apply, cluster_creation_time=cluster_creation_time
)
)
def check_connection_traces(entries, look_for_untrusted):
trusted_conns_traced = False # admin connections
trusted_conns_traced = False # admin connections
untrusted_conns_traced = False
ev_target = "IncomingConnection"
for _, ev_type, entry in entries:
@ -165,27 +218,42 @@ def cluster(admin_ipc, build_dir, public_key_refresh_interval, trusted_client, f
from_addr = entry.attrib["FromAddr"]
client_ip, port, tls_suffix = from_addr.split(":")
if tls_suffix != "tls":
pytest.fail(f"{ev_target} trace entry's FromAddr does not have a valid ':tls' suffix: found '{tls_suffix}'")
pytest.fail(
f"{ev_target} trace entry's FromAddr does not have a valid ':tls' suffix: found '{tls_suffix}'"
)
try:
ip = ipaddress.ip_address(client_ip)
except ValueError as e:
pytest.fail(f"{ev_target} trace entry's FromAddr '{client_ip}' has an invalid IP format: {e}")
pytest.fail(
f"{ev_target} trace entry's FromAddr '{client_ip}' has an invalid IP format: {e}"
)
if trusted == "1":
trusted_conns_traced = True
elif trusted == "0":
untrusted_conns_traced = True
else:
pytest.fail(f"{ev_target} trace entry's Trusted field has an unexpected value: {trusted}")
pytest.fail(
f"{ev_target} trace entry's Trusted field has an unexpected value: {trusted}"
)
if look_for_untrusted and not untrusted_conns_traced:
pytest.fail("failed to find any 'IncomingConnection' traces for untrusted clients")
pytest.fail(
"failed to find any 'IncomingConnection' traces for untrusted clients"
)
if not trusted_conns_traced:
pytest.fail("failed to find any 'IncomingConnection' traces for trusted clients")
pytest.fail(
"failed to find any 'IncomingConnection' traces for trusted clients"
)
cluster.add_trace_check(functools.partial(check_connection_traces, look_for_untrusted=not trusted_client))
cluster.add_trace_check(
functools.partial(
check_connection_traces, look_for_untrusted=not trusted_client
)
)
yield cluster
@pytest.fixture
def db(cluster, admin_ipc):
db = fdb.open(str(cluster.cluster_file))
@ -194,20 +262,25 @@ def db(cluster, admin_ipc):
admin_ipc.request("cleanup_database")
db = None
@pytest.fixture
def tenant_gen(db, admin_ipc):
def fn(tenant):
tenant = to_bytes(tenant)
admin_ipc.request("create_tenant", [tenant])
return fn
@pytest.fixture
def tenant_del(db, admin_ipc):
def fn(tenant):
tenant = to_str(tenant)
admin_ipc.request("delete_tenant", [tenant])
return fn
@pytest.fixture
def default_tenant(tenant_gen, tenant_del):
tenant = random_alphanum_bytes(8)
@ -215,6 +288,7 @@ def default_tenant(tenant_gen, tenant_del):
yield tenant
tenant_del(tenant)
@pytest.fixture
def tenant_tr_gen(db, use_grv_cache):
def fn(tenant):
@ -223,15 +297,19 @@ def tenant_tr_gen(db, use_grv_cache):
if use_grv_cache:
tr.options.set_use_grv_cache()
return tr
return fn
@pytest.fixture
def tenant_id_from_name(db):
def fn(tenant_name):
tenant = db.open_tenant(to_bytes(tenant_name))
return tenant.get_id().wait() # returns int
return tenant.get_id().wait() # returns int
return fn
@pytest.fixture
def token_claim_1h(tenant_id_from_name):
# JWT claim that is valid for 1 hour since time of invocation
@ -248,4 +326,5 @@ def token_claim_1h(tenant_id_from_name):
"jti": random_alphanum_str(10),
"tenants": [to_str(base64.b64encode(tenant_id.to_bytes(8, "big")))],
}
return fn

View File

@ -6,33 +6,39 @@ import time
from typing import Union
from authz_util import token_gen
def to_str(s: Union[str, bytes]):
if isinstance(s, bytes):
s = s.decode("utf8")
return s
def to_bytes(s: Union[str, bytes]):
if isinstance(s, str):
s = s.encode("utf8")
return s
def random_alphanum_str(k: int):
return ''.join(random.choices(string.ascii_letters + string.digits, k=k))
return "".join(random.choices(string.ascii_letters + string.digits, k=k))
def random_alphanum_bytes(k: int):
return random_alphanum_str(k).encode("ascii")
def cleanup_tenant(db, tenant_name):
try:
tenant = db.open_tenant(tenant_name)
del tenant[:]
fdb.tenant_management.delete_tenant(db, tenant_name)
except fdb.FDBError as e:
if e.code == 2131: # tenant not found
if e.code == 2131: # tenant not found
pass
else:
raise
class KeyFileReverter(object):
def __init__(self, filename: str, content: str, refresh_delay: int):
self.filename = filename
@ -45,12 +51,17 @@ class KeyFileReverter(object):
def __exit__(self, exc_type, exc_value, exc_traceback):
with open(self.filename, "w") as keyfile:
keyfile.write(self.content)
print(f"key file reverted. waiting {self.refresh_delay * 2} seconds for the update to take effect...")
print(
f"key file reverted. waiting {self.refresh_delay * 2} seconds for the update to take effect..."
)
time.sleep(self.refresh_delay * 2)
# repeat try-wait loop up to max_repeat times until both read and write tr fails for tenant with permission_denied
# important: only use this function if you don't have any data dependencies to key "abc"
def wait_until_tenant_tr_fails(tenant, private_key, tenant_tr_gen, max_repeat, delay, token_claim_1h):
def wait_until_tenant_tr_fails(
tenant, private_key, tenant_tr_gen, max_repeat, delay, token_claim_1h
):
repeat = 0
read_blocked = False
write_blocked = False
@ -59,7 +70,9 @@ def wait_until_tenant_tr_fails(tenant, private_key, tenant_tr_gen, max_repeat, d
tr = tenant_tr_gen(tenant)
# a token needs to be generated at every iteration because once it is accepted/cached,
# it will pass verification by caching until it expires
tr.options.set_authorization_token(token_gen(private_key, token_claim_1h(tenant)))
tr.options.set_authorization_token(
token_gen(private_key, token_claim_1h(tenant))
)
try:
if not read_blocked:
value = tr[b"abc"].value
@ -79,11 +92,16 @@ def wait_until_tenant_tr_fails(tenant, private_key, tenant_tr_gen, max_repeat, d
write_blocked = True
if not write_blocked:
repeat += 1
assert repeat < max_repeat, f"tenant transaction did not start to fail in {max_repeat * delay} seconds"
assert (
repeat < max_repeat
), f"tenant transaction did not start to fail in {max_repeat * delay} seconds"
# repeat try-wait loop up to max_repeat times until both read and write tr succeeds for tenant
# important: only use this function if you don't have any data dependencies to key "abc"
def wait_until_tenant_tr_succeeds(tenant, private_key, tenant_tr_gen, max_repeat, delay, token_claim_1h):
def wait_until_tenant_tr_succeeds(
tenant, private_key, tenant_tr_gen, max_repeat, delay, token_claim_1h
):
repeat = 0
token = token_gen(private_key, token_claim_1h(tenant))
while repeat < max_repeat:
@ -98,4 +116,6 @@ def wait_until_tenant_tr_succeeds(tenant, private_key, tenant_tr_gen, max_repeat
except fdb.FDBError as e:
assert e.code == 6000, f"expected permission_denied, got {e} instead"
repeat += 1
assert repeat < max_repeat, f"tenant transaction did not start to succeed in {max_repeat * delay} seconds"
assert (
repeat < max_repeat
), f"tenant transaction did not start to succeed in {max_repeat * delay} seconds"