Add admin server for proxying management ops
This commit is contained in:
parent
e37bc29dc2
commit
b72f6e39df
|
@ -417,7 +417,7 @@ if(WITH_PYTHON)
|
|||
|
||||
set(authz_test_cmd "")
|
||||
string(APPEND authz_test_cmd "${authz_venv_activate} && ")
|
||||
string(APPEND authz_test_cmd "LD_LIBRARY_PATH=${CMAKE_BINARY_DIR}/lib pytest ${CMAKE_SOURCE_DIR}/tests/authorization/authz_test.py --build-dir ${CMAKE_BINARY_DIR}")
|
||||
string(APPEND authz_test_cmd "LD_LIBRARY_PATH=${CMAKE_BINARY_DIR}/lib ${authz_venv_dir}/bin/python3 ${CMAKE_SOURCE_DIR}/tests/authorization/authz_test.py --build-dir ${CMAKE_BINARY_DIR}")
|
||||
add_test(
|
||||
NAME token_based_tenant_authorization
|
||||
WORKING_DIRECTORY ${authz_venv_dir}
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
from multiprocessing import Pipe, Process
|
||||
from typing import Union, List
|
||||
from util import to_str, to_bytes, cleanup_tenant
|
||||
|
||||
class _admin_request(object):
|
||||
def __init__(self, op: str, args: List[Union[str, bytes]]=[]):
|
||||
self.op = op
|
||||
self.args = args
|
||||
|
||||
def __str__(self):
|
||||
return f"admin_request({self.op}, {self.args})"
|
||||
|
||||
def __repr__(self):
|
||||
return f"admin_request({self.op}, {self.args})"
|
||||
|
||||
def _admin_loop(pipe):
|
||||
db = None
|
||||
while True:
|
||||
req = pipe.recv()
|
||||
if not isinstance(req, request):
|
||||
pipe.send(TypeError("unexpected type {}".format(type(req))))
|
||||
continue
|
||||
op = req.op
|
||||
args = req.args
|
||||
resp = True
|
||||
try:
|
||||
if op == "connect":
|
||||
fdb.options.set_trace_enable("admin_server_as_fdb_client")
|
||||
db = fdb.open(req.args[0])
|
||||
elif op == "configure_tls":
|
||||
keyfile, certfile, cafile = req.args[:3]
|
||||
fdb.options.set_tls_key_path(keyfile)
|
||||
fdb.options.set_tls_cert_path(certfile)
|
||||
fdb.options.set_tls_ca_path(cafile)
|
||||
elif op == "create_tenant":
|
||||
if db is None:
|
||||
resp = Exception("db not open")
|
||||
else:
|
||||
for tenant in req.args:
|
||||
tenant_str = to_str(tenant)
|
||||
tenant_bytes = to_bytes(tenant)
|
||||
fdb.tenant_management.create_tenant(db, tenant_bytes)
|
||||
print("created tenant: {}".format(tenant_str))
|
||||
elif op == "delete_tenant":
|
||||
if db is None:
|
||||
resp = Exception("db not open")
|
||||
else:
|
||||
for tenant in req.args:
|
||||
tenant_str = to_str(tenant)
|
||||
tenant_bytes = to_bytes(tenant)
|
||||
cleanup_tenant(tenant_bytes)
|
||||
print("deleted tenant: {}".format(tenant_str))
|
||||
elif op == "cleanup_database":
|
||||
if db is None:
|
||||
resp Exception("db not open")
|
||||
else:
|
||||
print("initiating database cleanup")
|
||||
tr = db.create_transaction()
|
||||
del tr[b"\\x00":b"\\xff"]
|
||||
tr.commit().wait()
|
||||
print("global keyspace cleared, deleting tenants")
|
||||
tenants = fdb.tenant_management.list_tenants(db)
|
||||
print("active tenants: {}".format(map(to_str, tenants)))
|
||||
for tenant in tenants:
|
||||
fdb.tenant_management.delete_tenant(db, tenant)
|
||||
print("tenant {} deleted".format(to_str(tenant)))
|
||||
else:
|
||||
resp = ValueError("unknown operation: {}".format(req))
|
||||
except EOFError:
|
||||
print("test process has closed pipe to admin. exiting.")
|
||||
break
|
||||
except Exception as e:
|
||||
resp = e
|
||||
pipe.send(resp)
|
||||
|
||||
_admin_server = None
|
||||
|
||||
def get():
|
||||
return _admin_server
|
||||
|
||||
class AdminServer(object):
|
||||
def __init__(self):
|
||||
assert __name__ == "__main__"
|
||||
assert _admin_server is None, "admin server may be setup once per process"
|
||||
self._main_pipe, self._admin_pipe = Pipe()
|
||||
|
||||
def start(self):
|
||||
self._admin_proc = Process(target=_admin_loop, args=(admin_pipe,))
|
||||
|
||||
def join(self):
|
||||
self._main_pipe.close()
|
||||
self._admin_proc.join()
|
||||
|
||||
def __enter__(self):
|
||||
self.start()
|
||||
|
||||
def __exit__(self):
|
||||
self.join()
|
||||
|
||||
def request(op, args=[]):
|
||||
req = _admin_request(op, args)
|
||||
try:
|
||||
self._main_pipe.send(req)
|
||||
resp = self._main_pipe.recv()
|
||||
if resp != True:
|
||||
print("Request {} failed: {}".format(req, resp))
|
||||
raise resp
|
||||
else:
|
||||
print("Request {} succeeded".format(req))
|
||||
except ExceptionBase as e:
|
||||
print("admin request {} failed by exception: {}".format(req, e))
|
||||
raise
|
|
@ -3,11 +3,12 @@ import argparse
|
|||
import fdb
|
||||
import pytest
|
||||
import time
|
||||
from fdb import FDBError
|
||||
import sys
|
||||
import fdb
|
||||
from multiprocessing import Process, Pipe
|
||||
from typing import Union
|
||||
from conftest import random_alphanum_str, random_alphanum_bytes, to_str, to_bytes
|
||||
|
||||
fdb.api_version(720)
|
||||
from util import random_alphanum_str, random_alphanum_bytes, to_str, to_bytes
|
||||
from admin_server import AdminServer
|
||||
|
||||
def token_claim_1h(tenant_name):
|
||||
now = time.time()
|
||||
|
@ -48,7 +49,7 @@ def test_cross_tenant_access_disallowed(default_tenant, token_gen, tenant_gen, t
|
|||
try:
|
||||
value = tr_second[b"abc"].value()
|
||||
assert False, "expected permission denied, but read transaction went through, value: {}".format(value)
|
||||
except FDBError as e:
|
||||
except fdb.FDBError as e:
|
||||
assert e.code == 6000, "expected permission_denied, got {} instead".format(e)
|
||||
# test that write transaction fails
|
||||
tr_second = tenant_tr_gen(second_tenant)
|
||||
|
@ -57,5 +58,10 @@ def test_cross_tenant_access_disallowed(default_tenant, token_gen, tenant_gen, t
|
|||
tr_second[b"def"] = b"ghi"
|
||||
tr_second.commit().wait()
|
||||
assert False, "expected permission denied, but write transaction went through"
|
||||
except FDBError as e:
|
||||
except fdb.FDBError as e:
|
||||
assert e.code == 6000, "expected permission_denied, got {} instead".format(e)
|
||||
|
||||
if __name__ == "__main__":
|
||||
fdb.api_version(720)
|
||||
with AdminServer() as server:
|
||||
sys.exit(pytest.main(args=sys.argv[1:]))
|
||||
|
|
|
@ -1,26 +1,15 @@
|
|||
import pytest
|
||||
import fdb
|
||||
import random
|
||||
import string
|
||||
import subprocess
|
||||
import admin_server
|
||||
from authlib.jose import JsonWebKey, KeySet, jwt
|
||||
from local_cluster import TLSConfig
|
||||
from tmp_cluster import TempCluster
|
||||
from typing import Union
|
||||
from util import random_alphanum_str, random_alphanum_bytes, to_str, to_bytes
|
||||
|
||||
fdb.api_version(720)
|
||||
cluster_scope = "module"
|
||||
|
||||
def to_str(s: Union[str, bytes]):
|
||||
if isinstance(s, bytes):
|
||||
s = s.decode("utf8")
|
||||
return s
|
||||
|
||||
def to_bytes(s: Union[str, bytes]):
|
||||
if isinstance(s, str):
|
||||
s = s.encode("utf8")
|
||||
return s
|
||||
|
||||
def pytest_addoption(parser):
|
||||
parser.addoption(
|
||||
"--build-dir", action="store", dest="build_dir", help="FDB build directory", required=True)
|
||||
|
@ -33,12 +22,6 @@ def pytest_addoption(parser):
|
|||
dest="trusted_client",
|
||||
help="Whether client shall be configured trusted, i.e. mTLS-ready")
|
||||
|
||||
def random_alphanum_str(k: int):
|
||||
return ''.join(random.choices(string.ascii_letters + string.digits, k=k))
|
||||
|
||||
def random_alphanum_bytes(k: int):
|
||||
return random_alphanum_str(k).encode("ascii")
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def build_dir(request):
|
||||
return request.config.option.build_dir
|
||||
|
@ -98,40 +81,49 @@ def cluster(build_dir, public_key_jwks_str, trusted_client):
|
|||
public_key_json_str=public_key_jwks_str,
|
||||
remove_at_exit=True,
|
||||
custom_config={"code-probes": "all"}) as cluster:
|
||||
fdb.options.set_tls_key_path(str(cluster.client_key_file) if trusted_client else "")
|
||||
fdb.options.set_tls_cert_path(str(cluster.client_cert_file) if trusted_client else "")
|
||||
fdb.options.set_tls_ca_path(str(cluster.server_ca_file))
|
||||
keyfile = str(cluster.client_key_file)
|
||||
certfile = str(cluster.client_cert_file)
|
||||
cafile = str(cluster.server_ca_file)
|
||||
fdb.options.set_tls_key_path(keyfile if trusted_client else "")
|
||||
fdb.options.set_tls_cert_path(certfile if trusted_client else "")
|
||||
fdb.options.set_tls_ca_path(cafile)
|
||||
fdb.options.set_trace_enable()
|
||||
admin = admin_server.get()
|
||||
admin.request("configure_tls", [keyfile, certfile, cafile])
|
||||
admin.request("connect")
|
||||
yield cluster
|
||||
|
||||
@pytest.fixture(scope=cluster_scope)
|
||||
@pytest.fixture
|
||||
def db(cluster):
|
||||
db = fdb.open(str(cluster.cluster_file))
|
||||
db.options.set_transaction_timeout(2000) # 2 seconds
|
||||
db.options.set_transaction_retry_limit(3)
|
||||
return db
|
||||
yield db
|
||||
admin_server.get().request("cleanup_database")
|
||||
db = None
|
||||
|
||||
@pytest.fixture(scope=cluster_scope)
|
||||
def tenant_gen(cluster):
|
||||
@pytest.fixture
|
||||
def tenant_gen(db):
|
||||
def fn(tenant):
|
||||
cluster.fdbcli_exec("createtenant {}".format(to_str(tenant)))
|
||||
tenant = to_bytes(tenant)
|
||||
admin_server.get().request("create_tenant", [tenant])
|
||||
return fn
|
||||
|
||||
@pytest.fixture(scope=cluster_scope)
|
||||
def tenant_del(cluster):
|
||||
@pytest.fixture
|
||||
def tenant_del(db):
|
||||
def fn(tenant):
|
||||
tenant = to_str(tenant)
|
||||
cluster.fdbcli_exec("writemode on;usetenant {};clearrange \\x00 \\xff;defaulttenant;deletetenant {}".format(tenant, tenant))
|
||||
admin_server.get().request("delete_tenant", [tenant])
|
||||
return fn
|
||||
|
||||
@pytest.fixture(scope=cluster_scope)
|
||||
@pytest.fixture
|
||||
def default_tenant(tenant_gen, tenant_del):
|
||||
tenant = random_alphanum_bytes(8)
|
||||
tenant_gen(tenant)
|
||||
yield tenant
|
||||
tenant_del(tenant)
|
||||
|
||||
@pytest.fixture(scope=cluster_scope)
|
||||
@pytest.fixture
|
||||
def tenant_tr_gen(db):
|
||||
def fn(tenant):
|
||||
tenant = db.open_tenant(to_bytes(tenant))
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
import random
|
||||
import string
|
||||
import fdb
|
||||
|
||||
def to_str(s: Union[str, bytes]):
|
||||
if isinstance(s, bytes):
|
||||
s = s.decode("utf8")
|
||||
return s
|
||||
|
||||
def to_bytes(s: Union[str, bytes]):
|
||||
if isinstance(s, str):
|
||||
s = s.encode("utf8")
|
||||
return s
|
||||
|
||||
def random_alphanum_str(k: int):
|
||||
return ''.join(random.choices(string.ascii_letters + string.digits, k=k))
|
||||
|
||||
def random_alphanum_bytes(k: int):
|
||||
return random_alphanum_str(k).encode("ascii")
|
||||
|
||||
def cleanup_tenant(db, tenant_name):
|
||||
try:
|
||||
tenant = db.open_tenant(tenant_name)
|
||||
del tenant[:]
|
||||
fdb.tenant_management.delete_tenant(db, tenant_name)
|
||||
except fdb.FDBError as e:
|
||||
if e.code == 2131: # tenant not found
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
|
||||
|
Loading…
Reference in New Issue