Replace AuthZ's use of tenant names in token with tenant ID

Also, to minimize audit log loss, handle token usage audit logging at each usage.
This has a side-effect of making the token use log less bursty.
This also subtly changes the dedup cache policy.
Dedup time window used to be 5 seconds (default) since the start of batch-logging.
Now it's 5 seconds from the first usage since the closing of the previous dedup window
This commit is contained in:
Junhyun Shim 2023-02-03 15:43:58 +01:00
parent b911bcfce2
commit ce652fa284
18 changed files with 310 additions and 193 deletions

View File

@ -1685,6 +1685,10 @@ int Arguments::validate() {
}
if (enable_token_based_authorization) {
if (num_fdb_clusters > 1) {
logr.error("for simplicity, --enable_token_based_authorization must be used with exactly one fdb cluster");
return -1;
}
if (active_tenants <= 0 || total_tenants <= 0) {
logr.error("--enable_token_based_authorization must be used with at least one tenant");
return -1;
@ -1708,14 +1712,54 @@ bool Arguments::isAuthorizationEnabled() const noexcept {
private_key_pem.has_value();
}
void Arguments::collectTenantIds() {
auto db = Database(cluster_files[0]);
tenant_ids.clear();
tenant_ids.reserve(active_tenants);
for (auto tenant_idx = 0; tenant_idx < active_tenants; tenant_idx++) {
auto tenant_name = getTenantNameByIndex(tenant_idx);
auto tenant = db.openTenant(toBytesRef(tenant_name));
while (true) {
auto f = tenant.getId();
if (auto err = f.blockUntilReady()) {
logr.error("error while waiting for tenant id of tenant name '{}': {}", tenant_name, err.what());
throwError("ERROR: Tenant::getId().blockUntilReady(): ", err);
}
if (auto err = f.error()) {
if (err.retryable()) {
logr.info(
"get tenant id for tenant name '{}' returned a retryable error: {}", tenant_name, err.what());
continue;
} else {
logr.error("get tenant id for tenant name '{}' returned an unretryable error: {}",
tenant_name,
err.what());
throwError("ERROR: Tenant::getId() returned an unretryable error: ", err);
}
} else {
tenant_ids.push_back(f.get());
break;
}
}
}
}
void Arguments::generateAuthorizationTokens() {
assert(active_tenants > 0);
assert(keypair_id.has_value());
assert(private_key_pem.has_value());
authorization_tokens.clear();
logr.info("generating authorization tokens to be used by worker threads");
assert(num_fdb_clusters == 1);
// assumes tenants have already been populated
logr.info("collecting tenant ids");
auto stopwatch = Stopwatch(StartAtCtor{});
authorization_tokens = generateAuthorizationTokenMap(active_tenants, keypair_id.value(), private_key_pem.value());
collectTenantIds();
logr.info(
"collected IDs of {} tenants in {:6.3f} seconds", active_tenants, toDoubleSeconds(stopwatch.stop().diff()));
logr.info("generating authorization tokens to be used by worker threads");
stopwatch.start();
authorization_tokens =
generateAuthorizationTokenMap(active_tenants, keypair_id.value(), private_key_pem.value(), tenant_ids);
assert(authorization_tokens.size() == active_tenants);
logr.info("generated {} tokens in {:6.3f} seconds", active_tenants, toDoubleSeconds(stopwatch.stop().diff()));
}

View File

@ -143,10 +143,12 @@ constexpr const int MAX_REPORT_FILES = 200;
struct Arguments {
Arguments();
int validate();
void collectTenantIds();
bool isAuthorizationEnabled() const noexcept;
void generateAuthorizationTokens();
// Needs to be called once per fdb-accessing process
// Needs to be called once per fdb client process from a clean state:
// i.e. no FDB API called
int setGlobalOptions() const;
bool isAnyTimeoutEnabled() const;
@ -206,6 +208,7 @@ struct Arguments {
std::optional<std::string> keypair_id;
std::optional<std::string> private_key_pem;
std::map<std::string, std::string> authorization_tokens; // maps tenant name to token string
std::vector<int64_t> tenant_ids; // maps tenant index to tenant id for signing tokens
int transaction_timeout_db;
int transaction_timeout_tx;
};

View File

@ -28,7 +28,8 @@ namespace mako {
std::map<std::string, std::string> generateAuthorizationTokenMap(int num_tenants,
std::string public_key_id,
std::string private_key_pem) {
std::string private_key_pem,
const std::vector<int64_t>& tenant_ids) {
std::map<std::string, std::string> m;
auto t = authz::jwt::stdtypes::TokenSpec{};
auto const now = toIntegerSeconds(std::chrono::system_clock::now().time_since_epoch());
@ -40,14 +41,14 @@ std::map<std::string, std::string> generateAuthorizationTokenMap(int num_tenants
t.issuedAtUnixTime = now;
t.expiresAtUnixTime = now + 60 * 60 * 12; // Good for 12 hours
t.notBeforeUnixTime = now - 60 * 5; // activated 5 mins ago
const int tokenIdLen = 36; // UUID length
auto tokenId = std::string(tokenIdLen, '\0');
const int tokenid_len = 36; // UUID length
auto tokenid = std::string(tokenid_len, '\0');
for (auto i = 0; i < num_tenants; i++) {
std::string tenant_name = getTenantNameByIndex(i);
// swap out only the token ids and tenant names
randomAlphanumString(tokenId.data(), tokenIdLen);
t.tokenId = tokenId;
t.tenants = std::vector<std::string>{ tenant_name };
randomAlphanumString(tokenid.data(), tokenid_len);
t.tokenId = tokenid;
t.tenants = std::vector<int64_t>{ tenant_ids[i] };
m[tenant_name] = authz::jwt::stdtypes::signToken(t, private_key_pem);
}
return m;

View File

@ -21,6 +21,7 @@
#include <cassert>
#include <map>
#include <string>
#include <vector>
#include "fdb_api.hpp"
#include "utils.hpp"
@ -28,7 +29,8 @@ namespace mako {
std::map<std::string, std::string> generateAuthorizationTokenMap(int tenants,
std::string public_key_id,
std::string private_key_pem);
std::string private_key_pem,
const std::vector<int64_t>& tenant_ids);
inline std::string getTenantNameByIndex(int index) {
assert(index >= 0);

View File

@ -22,6 +22,8 @@
#include "flow/actorcompiler.h" // has to be last include
using authz::TenantId;
template <class Key, class Value>
class LRUCache {
public:
@ -132,28 +134,26 @@ TEST_CASE("/fdbrpc/authz/LRUCache") {
struct CacheEntry {
Arena arena;
VectorRef<TenantNameRef> tenants;
VectorRef<TenantId> tenants;
Optional<StringRef> tokenId;
double expirationTime = 0.0;
};
struct AuditEntry {
NetworkAddress address;
TenantId tenantId;
Optional<Standalone<StringRef>> tokenId;
explicit AuditEntry(NetworkAddress const& address, CacheEntry const& cacheEntry)
: address(address),
bool operator==(const AuditEntry& other) const noexcept = default;
explicit AuditEntry(NetworkAddress const& address, TenantId tenantId, CacheEntry const& cacheEntry)
: address(address), tenantId(tenantId),
tokenId(cacheEntry.tokenId.present() ? Standalone<StringRef>(cacheEntry.tokenId.get(), cacheEntry.arena)
: Optional<Standalone<StringRef>>()) {}
};
bool operator==(AuditEntry const& lhs, AuditEntry const& rhs) {
return (lhs.address == rhs.address) && (lhs.tokenId.present() == rhs.tokenId.present()) &&
(!lhs.tokenId.present() || lhs.tokenId.get() == rhs.tokenId.get());
}
std::size_t hash_value(AuditEntry const& value) {
std::size_t seed = 0;
boost::hash_combine(seed, value.address);
boost::hash_combine(seed, value.tenantId);
if (value.tokenId.present()) {
boost::hash_combine(seed, value.tokenId.get());
}
@ -161,38 +161,17 @@ std::size_t hash_value(AuditEntry const& value) {
}
struct TokenCacheImpl {
TokenCacheImpl();
LRUCache<StringRef, CacheEntry> cache;
boost::unordered_set<AuditEntry> usedTokens;
Future<Void> auditor;
TokenCacheImpl();
double lastResetTime;
bool validate(TenantNameRef tenant, StringRef token);
bool validate(TenantId tenantId, StringRef token);
bool validateAndAdd(double currentTime, StringRef token, NetworkAddress const& peer);
void logTokenUsage(double currentTime, AuditEntry&& entry);
};
ACTOR Future<Void> tokenCacheAudit(TokenCacheImpl* self) {
state boost::unordered_set<AuditEntry> audits;
state boost::unordered_set<AuditEntry>::iterator iter;
state double lastLoggedTime = 0;
loop {
auto const timeSinceLog = g_network->timer() - lastLoggedTime;
if (timeSinceLog < FLOW_KNOBS->AUDIT_TIME_WINDOW) {
wait(delay(FLOW_KNOBS->AUDIT_TIME_WINDOW - timeSinceLog));
}
lastLoggedTime = g_network->timer();
audits.swap(self->usedTokens);
for (iter = audits.begin(); iter != audits.end(); ++iter) {
CODE_PROBE(true, "Audit Logging Running");
TraceEvent("AuditTokenUsed").detail("Client", iter->address).detail("TokenId", iter->tokenId).log();
wait(yield());
}
audits.clear();
}
}
TokenCacheImpl::TokenCacheImpl() : cache(FLOW_KNOBS->TOKEN_CACHE_SIZE) {
auditor = tokenCacheAudit(this);
}
TokenCacheImpl::TokenCacheImpl() : cache(FLOW_KNOBS->TOKEN_CACHE_SIZE), usedTokens(), lastResetTime(0) {}
TokenCache::TokenCache() : impl(new TokenCacheImpl()) {}
TokenCache::~TokenCache() {
@ -207,8 +186,8 @@ TokenCache& TokenCache::instance() {
return *reinterpret_cast<TokenCache*>(g_network->global(INetwork::enTokenCache));
}
bool TokenCache::validate(TenantNameRef name, StringRef token) {
return impl->validate(name, token);
bool TokenCache::validate(TenantId tenantId, StringRef token) {
return impl->validate(tenantId, token);
}
#define TRACE_INVALID_PARSED_TOKEN(reason, token) \
@ -280,8 +259,8 @@ bool TokenCacheImpl::validateAndAdd(double currentTime, StringRef token, Network
CacheEntry c;
c.expirationTime = t.expiresAtUnixTime.get();
c.tenants.reserve(c.arena, t.tenants.get().size());
for (auto tenant : t.tenants.get()) {
c.tenants.push_back_deep(c.arena, tenant);
for (auto tenantId : t.tenants.get()) {
c.tenants.push_back(c.arena, tenantId);
}
if (t.tokenId.present()) {
c.tokenId = StringRef(c.arena, t.tokenId.get());
@ -291,7 +270,7 @@ bool TokenCacheImpl::validateAndAdd(double currentTime, StringRef token, Network
}
}
bool TokenCacheImpl::validate(TenantNameRef name, StringRef token) {
bool TokenCacheImpl::validate(TenantId tenantId, StringRef token) {
NetworkAddress peer = FlowTransport::transport().currentDeliveryPeerAddress();
auto cachedEntry = cache.get(token);
double currentTime = g_network->timer();
@ -314,21 +293,43 @@ bool TokenCacheImpl::validate(TenantNameRef name, StringRef token) {
}
bool tenantFound = false;
for (auto const& t : entry->tenants) {
if (t == name) {
if (t == tenantId) {
tenantFound = true;
break;
}
}
if (!tenantFound) {
CODE_PROBE(true, "Valid token doesn't reference tenant");
TraceEvent(SevWarn, "TenantTokenMismatch").detail("From", peer).detail("Tenant", name.toString());
TraceEvent(SevWarn, "TenantTokenMismatch")
.detail("From", peer)
.detail("RequestedTenant", fmt::format("{:#x}", tenantId))
.detail("TenantsInToken", fmt::format("{:#x}", fmt::join(entry->tenants, " ")));
return false;
}
// audit logging
usedTokens.insert(AuditEntry(peer, *cachedEntry.get()));
logTokenUsage(currentTime, AuditEntry(peer, tenantId, *cachedEntry.get()));
return true;
}
void TokenCacheImpl::logTokenUsage(double currentTime, AuditEntry&& entry) {
if (currentTime > lastResetTime + FLOW_KNOBS->AUDIT_TIME_WINDOW) {
// clear usage cache every AUDIT_TIME_WINDOW seconds
usedTokens.clear();
lastResetTime = currentTime;
}
auto [iter, inserted] = usedTokens.insert(std::move(entry));
if (inserted) {
// access in the context of this (client_ip, tenant, token_id) tuple hasn't been logged in current window. log
// usage.
CODE_PROBE(true, "Audit Logging Running");
TraceEvent("AuditTokenUsed")
.detail("Client", iter->address)
.detail("TenantId", fmt::format("{:#x}", iter->tenantId))
.detail("TokenId", iter->tokenId)
.log();
}
}
namespace authz::jwt {
extern TokenRef makeRandomTokenSpec(Arena&, IRandom&, authz::Algorithm);
}
@ -375,9 +376,9 @@ TEST_CASE("/fdbrpc/authz/TokenCache/BadTokens") {
},
{
[](Arena& arena, IRandom&, authz::jwt::TokenRef& token) {
StringRef* newTenants = new (arena) StringRef[1];
*newTenants = token.tenants.get()[0].substr(1);
token.tenants = VectorRef<StringRef>(newTenants, 1);
TenantId* newTenants = new (arena) TenantId[1];
*newTenants = token.tenants.get()[0] + 1;
token.tenants = VectorRef<TenantId>(newTenants, 1);
},
"UnmatchedTenant",
},
@ -443,15 +444,15 @@ TEST_CASE("/fdbrpc/authz/TokenCache/BadTokens") {
}
}
}
if (TokenCache::instance().validate("TenantNameDontMatterHere"_sr, StringRef())) {
if (TokenCache::instance().validate(TenantInfo::INVALID_TENANT, StringRef())) {
fmt::print("Unexpected successful validation of ill-formed token (no signature part)\n");
ASSERT(false);
}
if (TokenCache::instance().validate("TenantNameDontMatterHere"_sr, "1111.22"_sr)) {
if (TokenCache::instance().validate(TenantInfo::INVALID_TENANT, "1111.22"_sr)) {
fmt::print("Unexpected successful validation of ill-formed token (no signature part)\n");
ASSERT(false);
}
if (TokenCache::instance().validate("TenantNameDontMatterHere2"_sr, "////.////.////"_sr)) {
if (TokenCache::instance().validate(TenantInfo::INVALID_TENANT, "////.////.////"_sr)) {
fmt::print("Unexpected successful validation of unparseable token\n");
ASSERT(false);
}

View File

@ -52,11 +52,11 @@
namespace {
// test-only constants for generating random tenant/key names
// test-only constants for generating random tenant ID and key names
constexpr int MinIssuerNameLen = 16;
constexpr int MaxIssuerNameLenPlus1 = 25;
constexpr int MinTenantNameLen = 8;
constexpr int MaxTenantNameLenPlus1 = 17;
constexpr authz::TenantId MinTenantId = 1;
constexpr authz::TenantId MaxTenantIdPlus1 = 0xffffffffll;
constexpr int MinKeyNameLen = 10;
constexpr int MaxKeyNameLenPlus1 = 21;
@ -176,6 +176,14 @@ void appendField(fmt::memory_buffer& b, char const (&name)[NameLen], Optional<Fi
fmt::format_to(bi, fmt::runtime(f[i].toStringView()));
}
fmt::format_to(bi, "]");
} else if constexpr (std::is_same_v<FieldType, VectorRef<TenantId>>) {
fmt::format_to(bi, " {}=[", name);
for (auto i = 0; i < f.size(); i++) {
if (i)
fmt::format_to(bi, ",");
fmt::format_to(bi, "{:#x}", f[i]);
}
fmt::format_to(bi, "]");
} else if constexpr (std::is_same_v<FieldType, StringRef>) {
fmt::format_to(bi, " {}={}", name, f.toStringView());
} else {
@ -202,33 +210,34 @@ StringRef toStringRef(Arena& arena, const TokenRef& tokenSpec) {
return StringRef(str, buf.size());
}
template <class FieldType, class Writer, bool MakeStringArrayBase64 = false>
void putField(Optional<FieldType> const& field,
Writer& wr,
const char* fieldName,
std::bool_constant<MakeStringArrayBase64> _ = std::bool_constant<false>{}) {
template <class FieldType, class Writer>
void putField(Optional<FieldType> const& field, Writer& wr, const char* fieldName) {
if (!field.present())
return;
wr.Key(fieldName);
auto const& value = field.get();
static_assert(std::is_same_v<StringRef, FieldType> || std::is_same_v<FieldType, uint64_t> ||
std::is_same_v<FieldType, VectorRef<StringRef>>);
std::is_same_v<FieldType, VectorRef<StringRef>> || std::is_same_v<FieldType, VectorRef<TenantId>>);
if constexpr (std::is_same_v<StringRef, FieldType>) {
wr.String(reinterpret_cast<const char*>(value.begin()), value.size());
} else if constexpr (std::is_same_v<FieldType, uint64_t>) {
wr.Uint64(value);
} else if constexpr (std::is_same_v<FieldType, VectorRef<TenantId>>) {
// "tenants" array = array of base64-encoded tenant key prefix
// key prefix = bytestring representation of big-endian tenant ID (int64_t)
Arena arena;
wr.StartArray();
for (auto elem : value) {
auto const bigEndianId = bigEndian64(elem);
auto encodedElem =
base64::encode(arena, StringRef(reinterpret_cast<const uint8_t*>(&bigEndianId), sizeof(bigEndianId)));
wr.String(reinterpret_cast<const char*>(encodedElem.begin()), encodedElem.size());
}
wr.EndArray();
} else {
wr.StartArray();
if constexpr (MakeStringArrayBase64) {
Arena arena;
for (auto elem : value) {
auto encodedElem = base64::encode(arena, elem);
wr.String(reinterpret_cast<const char*>(encodedElem.begin()), encodedElem.size());
}
} else {
for (auto elem : value) {
wr.String(reinterpret_cast<const char*>(elem.begin()), elem.size());
}
for (auto elem : value) {
wr.String(reinterpret_cast<const char*>(elem.begin()), elem.size());
}
wr.EndArray();
}
@ -259,7 +268,7 @@ StringRef makeSignInput(Arena& arena, const TokenRef& tokenSpec) {
putField(tokenSpec.expiresAtUnixTime, payload, "exp");
putField(tokenSpec.notBeforeUnixTime, payload, "nbf");
putField(tokenSpec.tokenId, payload, "jti");
putField(tokenSpec.tenants, payload, "tenants", std::bool_constant<true>{} /* encode tenants in base64 */);
putField(tokenSpec.tenants, payload, "tenants");
payload.EndObject();
auto const headerPartLen = base64::url::encodedLength(headerBuffer.GetSize());
auto const payloadPartLen = base64::url::encodedLength(payloadBuffer.GetSize());
@ -347,18 +356,17 @@ Optional<StringRef> parseHeaderPart(Arena& arena, TokenRef& token, StringRef b64
return {};
}
template <class FieldType, bool ExpectBase64StringArray = false>
template <class FieldType>
Optional<StringRef> parseField(Arena& arena,
Optional<FieldType>& out,
const rapidjson::Document& d,
const char* fieldName,
std::bool_constant<ExpectBase64StringArray> _ = std::bool_constant<false>{}) {
const char* fieldName) {
auto fieldItr = d.FindMember(fieldName);
if (fieldItr == d.MemberEnd())
return {};
auto const& field = fieldItr->value;
static_assert(std::is_same_v<StringRef, FieldType> || std::is_same_v<FieldType, uint64_t> ||
std::is_same_v<FieldType, VectorRef<StringRef>>);
std::is_same_v<FieldType, VectorRef<StringRef>> || std::is_same_v<FieldType, VectorRef<TenantId>>);
if constexpr (std::is_same_v<FieldType, StringRef>) {
if (!field.IsString()) {
return StringRef(arena, fmt::format("'{}' is not a string", fieldName));
@ -369,7 +377,7 @@ Optional<StringRef> parseField(Arena& arena,
return StringRef(arena, fmt::format("'{}' is not a number", fieldName));
}
out = static_cast<uint64_t>(field.GetDouble());
} else {
} else if constexpr (std::is_same_v<FieldType, VectorRef<StringRef>>) {
if (!field.IsArray()) {
return StringRef(arena, fmt::format("'{}' is not an array", fieldName));
}
@ -379,26 +387,50 @@ Optional<StringRef> parseField(Arena& arena,
if (!field[i].IsString()) {
return StringRef(arena, fmt::format("{}th element of '{}' is not a string", i + 1, fieldName));
}
if constexpr (ExpectBase64StringArray) {
Optional<StringRef> decodedString = base64::decode(
arena,
StringRef(reinterpret_cast<const uint8_t*>(field[i].GetString()), field[i].GetStringLength()));
if (decodedString.present()) {
vector[i] = decodedString.get();
} else {
CODE_PROBE(true, "Base64 token field has failed to be parsed");
return StringRef(arena,
fmt::format("Failed to base64-decode {}th element of '{}'", i + 1, fieldName));
}
} else {
vector[i] = StringRef(
arena, reinterpret_cast<const uint8_t*>(field[i].GetString()), field[i].GetStringLength());
}
vector[i] = StringRef(
arena, reinterpret_cast<const uint8_t*>(field[i].GetString()), field[i].GetStringLength());
}
out = VectorRef<StringRef>(vector, field.Size());
} else {
out = VectorRef<StringRef>();
}
} else {
// tenant ids case: convert array of base64-encoded length-8 bytestring containing big-endian int64_t to
// local-endian int64_t
if (!field.IsArray()) {
return StringRef(arena, fmt::format("'{}' is not an array", fieldName));
}
if (field.Size() > 0) {
auto vector = new (arena) TenantId[field.Size()];
for (auto i = 0; i < field.Size(); i++) {
if (!field[i].IsString()) {
return StringRef(arena, fmt::format("{}th element of '{}' is not a string", i + 1, fieldName));
}
Optional<StringRef> decodedString = base64::decode(
arena,
StringRef(reinterpret_cast<const uint8_t*>(field[i].GetString()), field[i].GetStringLength()));
if (decodedString.present()) {
auto const tenantPrefix = decodedString.get();
if (tenantPrefix.size() != sizeof(TenantId)) {
CODE_PROBE(true, "Tenant prefix has an invalid length");
return StringRef(arena,
fmt::format("{}th element of '{}' has an invalid bytewise length of {}",
i + 1,
fieldName,
tenantPrefix.size()));
}
TenantId tenantId = *reinterpret_cast<const TenantId*>(tenantPrefix.begin());
vector[i] = fromBigEndian64(tenantId);
} else {
CODE_PROBE(true, "Tenant field has failed to be parsed");
return StringRef(arena,
fmt::format("Failed to base64-decode {}th element of '{}'", i + 1, fieldName));
}
}
out = VectorRef<TenantId>(vector, field.Size());
} else {
out = VectorRef<TenantId>();
}
}
return {};
}
@ -431,12 +463,7 @@ Optional<StringRef> parsePayloadPart(Arena& arena, TokenRef& token, StringRef b6
return err;
if ((err = parseField(arena, token.notBeforeUnixTime, d, "nbf")).present())
return err;
if ((err = parseField(arena,
token.tenants,
d,
"tenants",
std::bool_constant<true>{} /* expect field elements encoded in base64 */))
.present())
if ((err = parseField(arena, token.tenants, d, "tenants")).present())
return err;
return {};
}
@ -526,16 +553,16 @@ TokenRef makeRandomTokenSpec(Arena& arena, IRandom& rng, Algorithm alg) {
auto numAudience = rng.randomInt(1, 5);
auto aud = new (arena) StringRef[numAudience];
for (auto i = 0; i < numAudience; i++)
aud[i] = genRandomAlphanumStringRef(arena, rng, MinTenantNameLen, MaxTenantNameLenPlus1);
aud[i] = genRandomAlphanumStringRef(arena, rng, MinIssuerNameLen, MaxIssuerNameLenPlus1);
ret.audience = VectorRef<StringRef>(aud, numAudience);
ret.issuedAtUnixTime = g_network->timer();
ret.notBeforeUnixTime = ret.issuedAtUnixTime.get();
ret.expiresAtUnixTime = ret.issuedAtUnixTime.get() + rng.randomInt(360, 1080 + 1);
auto numTenants = rng.randomInt(1, 3);
auto tenants = new (arena) StringRef[numTenants];
auto tenants = new (arena) TenantId[numTenants];
for (auto i = 0; i < numTenants; i++)
tenants[i] = genRandomAlphanumStringRef(arena, rng, MinTenantNameLen, MaxTenantNameLenPlus1);
ret.tenants = VectorRef<StringRef>(tenants, numTenants);
tenants[i] = rng.randomInt64(MinTenantId, MaxTenantIdPlus1);
ret.tenants = VectorRef<TenantId>(tenants, numTenants);
return ret;
}
@ -584,8 +611,7 @@ TEST_CASE("/fdbrpc/TokenSign/JWT") {
ASSERT(verifyOk);
}
// try tampering with signed token by adding one more tenant
tokenSpec.tenants.get().push_back(
arena, genRandomAlphanumStringRef(arena, rng, MinTenantNameLen, MaxTenantNameLenPlus1));
tokenSpec.tenants.get().push_back(arena, rng.randomInt64(MinTenantId, MaxTenantIdPlus1));
auto tamperedTokenPart = makeSignInput(arena, tokenSpec);
auto tamperedTokenString = fmt::format("{}.{}", tamperedTokenPart.toString(), signaturePart.toString());
std::tie(verifyOk, verifyErr) = authz::jwt::verifyToken(StringRef(tamperedTokenString), publicKey);
@ -608,12 +634,12 @@ TEST_CASE("/fdbrpc/TokenSign/JWT/ToStringRef") {
t.notBeforeUnixTime = 789ul;
t.keyId = "keyId"_sr;
t.tokenId = "tokenId"_sr;
StringRef tenants[2]{ "tenant1"_sr, "tenant2"_sr };
t.tenants = VectorRef<StringRef>(tenants, 2);
authz::TenantId tenants[2]{ 0x1ll, 0xabcdefabcdefll };
t.tenants = VectorRef<authz::TenantId>(tenants, 2);
auto arena = Arena();
auto tokenStr = toStringRef(arena, t);
auto tokenStrExpected =
"alg=ES256 kid=keyId iss=issuer sub=subject aud=[aud1,aud2,aud3] iat=123 exp=456 nbf=789 jti=tokenId tenants=[tenant1,tenant2]"_sr;
"alg=ES256 kid=keyId iss=issuer sub=subject aud=[aud1,aud2,aud3] iat=123 exp=456 nbf=789 jti=tokenId tenants=[0x1,0xabcdefabcdef]"_sr;
if (tokenStr != tokenStrExpected) {
fmt::print("Expected: {}\nGot : {}\n", tokenStrExpected.toStringView(), tokenStr.toStringView());
ASSERT(false);

View File

@ -29,38 +29,48 @@ namespace {
// converts std::optional<STANDARD_TYPE(S)> to Optional<FLOW_TYPE(T)>
template <class T, class S>
void convertAndAssign(Arena& arena, Optional<T>& to, const std::optional<S>& from) {
if (!from.has_value()) {
to.reset();
return;
}
if constexpr (std::is_same_v<S, std::vector<std::string>>) {
static_assert(std::is_same_v<T, VectorRef<StringRef>>,
"Source type std::vector<std::string> must convert to VectorRef<StringRef>");
if (from.has_value()) {
const auto& value = from.value();
if (value.empty()) {
to = VectorRef<StringRef>();
} else {
// no need to deep copy string because we have the underlying memory for the duration of token signing.
auto buf = new (arena) StringRef[value.size()];
for (auto i = 0u; i < value.size(); i++) {
buf[i] = StringRef(value[i]);
}
to = VectorRef<StringRef>(buf, value.size());
const auto& value = from.value();
if (value.empty()) {
to = VectorRef<StringRef>();
} else {
// no need to deep copy string because we have the underlying memory for the duration of token signing.
auto buf = new (arena) StringRef[value.size()];
for (auto i = 0u; i < value.size(); i++) {
buf[i] = StringRef(value[i]);
}
to = VectorRef<StringRef>(buf, value.size());
}
} else if constexpr (std::is_same_v<S, std::vector<int64_t>>) {
static_assert(std::is_same_v<T, VectorRef<int64_t>>,
"Source type std::vector<int64_t> must convert to VectorRef<int64_t>");
const auto& value = from.value();
if (value.empty()) {
to = VectorRef<int64_t>();
} else {
auto buf = new (arena) int64_t[value.size()];
for (auto i = 0; i < value.size(); i++)
buf[i] = value[i];
to = VectorRef<int64_t>(buf, value.size());
}
} else if constexpr (std::is_same_v<S, std::string>) {
static_assert(std::is_same_v<T, StringRef>, "Source type std::string must convert to StringRef");
if (from.has_value()) {
const auto& value = from.value();
// no need to deep copy string because we have the underlying memory for the duration of token signing.
to = StringRef(value);
}
const auto& value = from.value();
// no need to deep copy string because we have the underlying memory for the duration of token signing.
to = StringRef(value);
} else {
static_assert(
std::is_same_v<S, T>,
"Source types that aren't std::vector<std::string> or std::string must have the same destination type");
static_assert(std::is_same_v<S, T>,
"Source types that aren't std::vector<std::string>, std::vector<int64_t>, or std::string must "
"have the same destination type");
static_assert(std::is_trivially_copy_assignable_v<S>,
"Source types that aren't std::vector<std::string> or std::string must not use heap memory");
if (from.has_value()) {
to = from.value();
}
to = from.value();
}
}

View File

@ -76,9 +76,7 @@ struct serializable_traits<TenantInfo> : std::true_type {
if constexpr (Archiver::isDeserializing) {
bool tenantAuthorized = FLOW_KNOBS->ALLOW_TOKENLESS_TENANT_ACCESS;
if (!tenantAuthorized && v.tenantId != TenantInfo::INVALID_TENANT && v.token.present()) {
// TODO: update tokens to be ID based
// tenantAuthorized = TokenCache::instance().validate(v.tenantId, v.token.get());
tenantAuthorized = true;
tenantAuthorized = TokenCache::instance().validate(v.tenantId, v.token.get());
}
v.trusted = FlowTransport::transport().currentDeliveryPeerIsTrusted();
v.tenantAuthorized = tenantAuthorized;

View File

@ -21,6 +21,7 @@
#ifndef TOKENCACHE_H_
#define TOKENCACHE_H_
#include "fdbrpc/TenantName.h"
#include "fdbrpc/TokenSpec.h"
#include "flow/Arena.h"
class TokenCache : NonCopyable {
@ -31,7 +32,7 @@ public:
~TokenCache();
static void createInstance();
static TokenCache& instance();
bool validate(TenantNameRef tenant, StringRef token);
bool validate(authz::TenantId tenant, StringRef token);
};
#endif // TOKENCACHE_H_

View File

@ -28,6 +28,8 @@
namespace authz {
using TenantId = int64_t;
enum class Algorithm : int {
RS256,
ES256,
@ -67,7 +69,7 @@ struct BasicTokenSpec {
OptionalType<uint64_t> expiresAtUnixTime; // exp
OptionalType<uint64_t> notBeforeUnixTime; // nbf
OptionalType<StringType> tokenId; // jti
OptionalType<VectorType<StringType>> tenants; // tenants
OptionalType<VectorType<TenantId>> tenants; // tenants
// signature part
StringType signature;
};

View File

@ -346,7 +346,7 @@ public:
double checkDisabled(const std::string& desc) const;
// generate authz token for use in simulation environment
Standalone<StringRef> makeToken(StringRef tenantName, uint64_t ttlSecondsFromNow);
Standalone<StringRef> makeToken(int64_t tenantId, uint64_t ttlSecondsFromNow);
static thread_local ProcessInfo* currentProcess;

View File

@ -172,7 +172,7 @@ void ISimulator::displayWorkers() const {
return;
}
Standalone<StringRef> ISimulator::makeToken(StringRef tenantName, uint64_t ttlSecondsFromNow) {
Standalone<StringRef> ISimulator::makeToken(int64_t tenantId, uint64_t ttlSecondsFromNow) {
ASSERT_GT(authKeys.size(), 0);
auto tokenSpec = authz::jwt::TokenRef{};
auto [keyName, key] = *authKeys.begin();
@ -186,7 +186,7 @@ Standalone<StringRef> ISimulator::makeToken(StringRef tenantName, uint64_t ttlSe
tokenSpec.expiresAtUnixTime = now + ttlSecondsFromNow;
auto const tokenId = deterministicRandom()->randomAlphaNumeric(10);
tokenSpec.tokenId = StringRef(tokenId);
tokenSpec.tenants = VectorRef<StringRef>(&tenantName, 1);
tokenSpec.tenants = VectorRef<int64_t>(&tenantId, 1);
auto ret = Standalone<StringRef>();
ret.contents() = authz::jwt::signToken(ret.arena(), tokenSpec, key);
return ret;

View File

@ -46,6 +46,7 @@ struct AuthzSecurityWorkload : TestWorkload {
std::vector<Future<Void>> clients;
Arena arena;
Reference<Tenant> tenant;
Reference<Tenant> anotherTenant;
TenantName tenantName;
TenantName anotherTenantName;
Standalone<StringRef> signedToken;
@ -68,10 +69,6 @@ struct AuthzSecurityWorkload : TestWorkload {
tLogConfigKey = getOption(options, "tLogConfigKey"_sr, "TLogInterface"_sr);
ASSERT(g_network->isSimulated());
// make it comfortably longer than the timeout of the workload
signedToken = g_simulator->makeToken(
tenantName, uint64_t(std::lround(getCheckTimeout())) + uint64_t(std::lround(testDuration)) + 100);
signedTokenAnotherTenant = g_simulator->makeToken(
anotherTenantName, uint64_t(std::lround(getCheckTimeout())) + uint64_t(std::lround(testDuration)) + 100);
testFunctions.push_back(
[this](Database cx) { return testCrossTenantGetDisallowed(this, cx, PositiveTestcase::True); });
testFunctions.push_back(
@ -87,10 +84,15 @@ struct AuthzSecurityWorkload : TestWorkload {
Future<Void> setup(Database const& cx) override {
tenant = makeReference<Tenant>(cx, tenantName);
return tenant->ready();
anotherTenant = makeReference<Tenant>(cx, anotherTenantName);
return tenant->ready() && anotherTenant->ready();
}
Future<Void> start(Database const& cx) override {
signedToken = g_simulator->makeToken(
tenant->id(), uint64_t(std::lround(getCheckTimeout())) + uint64_t(std::lround(testDuration)) + 100);
signedTokenAnotherTenant = g_simulator->makeToken(
anotherTenant->id(), uint64_t(std::lround(getCheckTimeout())) + uint64_t(std::lround(testDuration)) + 100);
for (int c = 0; c < actorCount; c++)
clients.push_back(timeout(runTestClient(this, cx->clone()), testDuration, Void()));
return waitForAll(clients);

View File

@ -24,6 +24,7 @@
#include "flow/serialize.h"
#include "fdbrpc/simulator.h"
#include "fdbrpc/TokenSign.h"
#include "fdbrpc/TenantInfo.h"
#include "fdbclient/FDBOptions.g.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/TesterInterface.actor.h"
@ -39,13 +40,20 @@ template <>
struct CycleMembers<true> {
Arena arena;
TenantName tenant;
int64_t tenantId;
Standalone<StringRef> signedToken;
bool useToken;
};
template <bool>
struct CycleWorkload;
ACTOR Future<Void> prepareToken(Database cx, CycleWorkload<true>* self);
template <bool MultiTenancy>
struct CycleWorkload : TestWorkload, CycleMembers<MultiTenancy> {
static constexpr auto NAME = MultiTenancy ? "TenantCycle" : "Cycle";
static constexpr auto TenantEnabled = MultiTenancy;
int actorCount, nodeCount;
double testDuration, transactionsPerSecond, minExpectedTransactionsPerSecond, traceParentProbability;
Key keyPrefix;
@ -68,17 +76,18 @@ struct CycleWorkload : TestWorkload, CycleMembers<MultiTenancy> {
ASSERT(g_network->isSimulated());
this->useToken = getOption(options, "useToken"_sr, true);
this->tenant = getOption(options, "tenant"_sr, "CycleTenant"_sr);
// make it comfortably longer than the timeout of the workload
this->signedToken = g_simulator->makeToken(
this->tenant, uint64_t(std::lround(getCheckTimeout())) + uint64_t(std::lround(testDuration)) + 100);
this->tenantId = TenantInfo::INVALID_TENANT;
}
}
Future<Void> setup(Database const& cx) override {
Future<Void> prepare;
if constexpr (MultiTenancy) {
cx->defaultTenant = this->tenant;
prepare = prepareToken(cx, this);
} else {
prepare = Void();
}
return bulkSetup(cx, this, nodeCount, Promise<double>());
return runAfter(prepare, [this, cx](Void) { return bulkSetup(cx, this, nodeCount, Promise<double>()); });
}
Future<Void> start(Database const& cx) override {
if constexpr (MultiTenancy) {
@ -316,5 +325,17 @@ struct CycleWorkload : TestWorkload, CycleMembers<MultiTenancy> {
}
};
ACTOR Future<Void> prepareToken(Database cx, CycleWorkload<true>* self) {
cx->defaultTenant = self->tenant;
int64_t tenantId = wait(cx->lookupTenant(self->tenant));
self->tenantId = tenantId;
ASSERT_NE(self->tenantId, TenantInfo::INVALID_TENANT);
// make the lifetime comfortably longer than the timeout of the workload
self->signedToken = g_simulator->makeToken(self->tenantId,
uint64_t(std::lround(self->getCheckTimeout())) +
uint64_t(std::lround(self->testDuration)) + 100);
return Void();
}
WorkloadFactory<CycleWorkload<false>> CycleWorkloadFactory(UntrustedMode::False);
WorkloadFactory<CycleWorkload<true>> TenantCycleWorkloadFactory(UntrustedMode::True);

View File

@ -126,7 +126,7 @@ if(WITH_PYTHON)
add_fdb_test(TEST_FILES fast/AtomicBackupToDBCorrectness.toml)
add_fdb_test(TEST_FILES fast/AtomicOps.toml)
add_fdb_test(TEST_FILES fast/AtomicOpsApiCorrectness.toml)
add_fdb_test(TEST_FILES fast/AuthzSecurity.toml IGNORE) # TODO re-enable once authz uses ID tokens
add_fdb_test(TEST_FILES fast/AuthzSecurity.toml)
add_fdb_test(TEST_FILES fast/AutomaticIdempotency.toml)
add_fdb_test(TEST_FILES fast/BackupAzureBlobCorrectness.toml IGNORE)
add_fdb_test(TEST_FILES fast/BackupS3BlobCorrectness.toml IGNORE)
@ -491,14 +491,13 @@ if(WITH_PYTHON)
set(authz_script_dir ${CMAKE_SOURCE_DIR}/tests/authorization)
set(authz_test_cmd "${authz_venv_activate} && pytest ${authz_script_dir}/authz_test.py -rA --build-dir ${CMAKE_BINARY_DIR} -vvv")
# TODO: reenable when authz is updated to validate based on tenant IDs
#add_test(
#NAME token_based_tenant_authorization
#WORKING_DIRECTORY ${authz_venv_dir}
#COMMAND bash -c ${authz_test_cmd})
#set_tests_properties(token_based_tenant_authorization PROPERTIES ENVIRONMENT "PYTHONPATH=${CMAKE_SOURCE_DIR}/tests/TestRunner;${ld_env_name}=${CMAKE_BINARY_DIR}/lib")
#set_tests_properties(token_based_tenant_authorization PROPERTIES FIXTURES_REQUIRED authz_virtual_env)
#set_tests_properties(token_based_tenant_authorization PROPERTIES TIMEOUT 120)
add_test(
NAME token_based_tenant_authorization
WORKING_DIRECTORY ${authz_venv_dir}
COMMAND bash -c ${authz_test_cmd})
set_tests_properties(token_based_tenant_authorization PROPERTIES ENVIRONMENT "PYTHONPATH=${CMAKE_SOURCE_DIR}/tests/TestRunner;${ld_env_name}=${CMAKE_BINARY_DIR}/lib")
set_tests_properties(token_based_tenant_authorization PROPERTIES FIXTURES_REQUIRED authz_virtual_env)
set_tests_properties(token_based_tenant_authorization PROPERTIES TIMEOUT 120)
endif()
else()
message(WARNING "Python not found, won't configure ctest")

View File

@ -30,7 +30,7 @@ import time
from multiprocessing import Process, Pipe
from typing import Union
from authz_util import token_gen, private_key_gen, public_keyset_from_keys, alg_from_kty
from util import random_alphanum_str, random_alphanum_bytes, to_str, to_bytes, KeyFileReverter, token_claim_1h, wait_until_tenant_tr_succeeds, wait_until_tenant_tr_fails
from util import random_alphanum_str, random_alphanum_bytes, to_str, to_bytes, KeyFileReverter, wait_until_tenant_tr_succeeds, wait_until_tenant_tr_fails
special_key_ranges = [
("transaction description", b"/description", b"/description\x00"),
@ -43,7 +43,7 @@ special_key_ranges = [
("kill storage", b"/globals/killStorage", b"/globals/killStorage\x00"),
]
def test_simple_tenant_access(cluster, default_tenant, tenant_tr_gen):
def test_simple_tenant_access(cluster, default_tenant, tenant_tr_gen, token_claim_1h):
token = token_gen(cluster.private_key, token_claim_1h(default_tenant))
tr = tenant_tr_gen(default_tenant)
tr.options.set_authorization_token(token)
@ -53,7 +53,7 @@ def test_simple_tenant_access(cluster, default_tenant, tenant_tr_gen):
tr.options.set_authorization_token(token)
assert tr[b"abc"] == b"def", "tenant write transaction not visible"
def test_cross_tenant_access_disallowed(cluster, default_tenant, tenant_gen, tenant_tr_gen):
def test_cross_tenant_access_disallowed(cluster, default_tenant, tenant_gen, tenant_tr_gen, token_claim_1h):
# use default tenant token with second tenant transaction and see it fail
second_tenant = random_alphanum_bytes(12)
tenant_gen(second_tenant)
@ -137,7 +137,7 @@ def test_system_and_special_key_range_disallowed(db, tenant_tr_gen):
def test_public_key_set_rollover(
kty, public_key_refresh_interval,
cluster, default_tenant, tenant_gen, tenant_tr_gen):
cluster, default_tenant, tenant_gen, tenant_tr_gen, token_claim_1h):
new_kid = random_alphanum_str(12)
new_kty = "EC" if kty == "RSA" else "RSA"
new_key = private_key_gen(kty=new_kty, kid=new_kid)
@ -160,16 +160,16 @@ def test_public_key_set_rollover(
with KeyFileReverter(cluster.public_key_json_file, old_key_json, delay):
with open(cluster.public_key_json_file, "w") as keyfile:
keyfile.write(interim_set)
wait_until_tenant_tr_succeeds(second_tenant, new_key, tenant_tr_gen, max_repeat, delay)
wait_until_tenant_tr_succeeds(second_tenant, new_key, tenant_tr_gen, max_repeat, delay, token_claim_1h)
print("interim key set activated")
final_set = public_keyset_from_keys([new_key])
print(f"final keyset: {final_set}")
with open(cluster.public_key_json_file, "w") as keyfile:
keyfile.write(final_set)
wait_until_tenant_tr_fails(default_tenant, cluster.private_key, tenant_tr_gen, max_repeat, delay)
wait_until_tenant_tr_fails(default_tenant, cluster.private_key, tenant_tr_gen, max_repeat, delay, token_claim_1h)
def test_public_key_set_broken_file_tolerance(
cluster, public_key_refresh_interval, default_tenant, tenant_tr_gen):
cluster, public_key_refresh_interval, default_tenant, tenant_tr_gen, token_claim_1h):
delay = public_key_refresh_interval
# retry limit in waiting for keyset file update to propagate to FDB server's internal keyset
max_repeat = 10
@ -187,10 +187,10 @@ def test_public_key_set_broken_file_tolerance(
with open(cluster.public_key_json_file, "w") as keyfile:
keyfile.write('{"keys":[]}')
# eventually internal key set will become empty and won't accept any new tokens
wait_until_tenant_tr_fails(default_tenant, cluster.private_key, tenant_tr_gen, max_repeat, delay)
wait_until_tenant_tr_fails(default_tenant, cluster.private_key, tenant_tr_gen, max_repeat, delay, token_claim_1h)
def test_public_key_set_deletion_tolerance(
cluster, public_key_refresh_interval, default_tenant, tenant_tr_gen):
cluster, public_key_refresh_interval, default_tenant, tenant_tr_gen, token_claim_1h):
delay = public_key_refresh_interval
# retry limit in waiting for keyset file update to propagate to FDB server's internal keyset
max_repeat = 10
@ -200,16 +200,16 @@ def test_public_key_set_deletion_tolerance(
with open(cluster.public_key_json_file, "w") as keyfile:
keyfile.write('{"keys":[]}')
time.sleep(delay)
wait_until_tenant_tr_fails(default_tenant, cluster.private_key, tenant_tr_gen, max_repeat, delay)
wait_until_tenant_tr_fails(default_tenant, cluster.private_key, tenant_tr_gen, max_repeat, delay, token_claim_1h)
os.remove(cluster.public_key_json_file)
time.sleep(delay * 2)
with open(cluster.public_key_json_file, "w") as keyfile:
keyfile.write(cluster.public_key_jwks_str)
# eventually updated key set should take effect and transaction should be accepted
wait_until_tenant_tr_succeeds(default_tenant, cluster.private_key, tenant_tr_gen, max_repeat, delay)
wait_until_tenant_tr_succeeds(default_tenant, cluster.private_key, tenant_tr_gen, max_repeat, delay, token_claim_1h)
def test_public_key_set_empty_file_tolerance(
cluster, public_key_refresh_interval, default_tenant, tenant_tr_gen):
cluster, public_key_refresh_interval, default_tenant, tenant_tr_gen, token_claim_1h):
delay = public_key_refresh_interval
# retry limit in waiting for keyset file update to propagate to FDB server's internal keyset
max_repeat = 10
@ -219,7 +219,7 @@ def test_public_key_set_empty_file_tolerance(
with open(cluster.public_key_json_file, "w") as keyfile:
keyfile.write('{"keys":[]}')
# eventually internal key set will become empty and won't accept any new tokens
wait_until_tenant_tr_fails(default_tenant, cluster.private_key, tenant_tr_gen, max_repeat, delay)
wait_until_tenant_tr_fails(default_tenant, cluster.private_key, tenant_tr_gen, max_repeat, delay, token_claim_1h)
# empty the key file
with open(cluster.public_key_json_file, "w") as keyfile:
pass
@ -227,9 +227,9 @@ def test_public_key_set_empty_file_tolerance(
with open(cluster.public_key_json_file, "w") as keyfile:
keyfile.write(cluster.public_key_jwks_str)
# eventually key file should update and transactions should go through
wait_until_tenant_tr_succeeds(default_tenant, cluster.private_key, tenant_tr_gen, max_repeat, delay)
wait_until_tenant_tr_succeeds(default_tenant, cluster.private_key, tenant_tr_gen, max_repeat, delay, token_claim_1h)
def test_bad_token(cluster, default_tenant, tenant_tr_gen):
def test_bad_token(cluster, default_tenant, tenant_tr_gen, token_claim_1h):
def del_attr(d, attr):
del d[attr]
return d

View File

@ -18,10 +18,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import base64
import fdb
import pytest
import subprocess
import admin_server
import time
from local_cluster import TLSConfig
from tmp_cluster import TempCluster
from typing import Union
@ -134,3 +136,22 @@ def tenant_tr_gen(db):
tenant = db.open_tenant(to_bytes(tenant))
return tenant.create_transaction()
return fn
@pytest.fixture
def token_claim_1h(db):
# JWT claim that is valid for 1 hour since time of invocation
def get_claim(tenant_name):
tenant = db.open_tenant(to_bytes(tenant_name))
tenant_id = tenant.get_id().wait()
now = time.time()
return {
"iss": "fdb-authz-tester",
"sub": "authz-test",
"aud": ["tmp-cluster"],
"iat": now,
"nbf": now - 1,
"exp": now + 60 * 60,
"jti": random_alphanum_str(10),
"tenants": [to_str(base64.b64encode(tenant_id.to_bytes(8, "big")))],
}
return get_claim

View File

@ -48,23 +48,9 @@ class KeyFileReverter(object):
print(f"key file reverted. waiting {self.refresh_delay * 2} seconds for the update to take effect...")
time.sleep(self.refresh_delay * 2)
# JWT claim that is valid for 1 hour since time of invocation
def token_claim_1h(tenant_name):
now = time.time()
return {
"iss": "fdb-authz-tester",
"sub": "authz-test",
"aud": ["tmp-cluster"],
"iat": now,
"nbf": now - 1,
"exp": now + 60 * 60,
"jti": random_alphanum_str(10),
"tenants": [to_str(base64.b64encode(tenant_name))],
}
# repeat try-wait loop up to max_repeat times until both read and write tr fails for tenant with permission_denied
# important: only use this function if you don't have any data dependencies to key "abc"
def wait_until_tenant_tr_fails(tenant, private_key, tenant_tr_gen, max_repeat, delay):
def wait_until_tenant_tr_fails(tenant, private_key, tenant_tr_gen, max_repeat, delay, token_claim_1h):
repeat = 0
read_blocked = False
write_blocked = False
@ -97,7 +83,7 @@ def wait_until_tenant_tr_fails(tenant, private_key, tenant_tr_gen, max_repeat, d
# repeat try-wait loop up to max_repeat times until both read and write tr succeeds for tenant
# important: only use this function if you don't have any data dependencies to key "abc"
def wait_until_tenant_tr_succeeds(tenant, private_key, tenant_tr_gen, max_repeat, delay):
def wait_until_tenant_tr_succeeds(tenant, private_key, tenant_tr_gen, max_repeat, delay, token_claim_1h):
repeat = 0
token = token_gen(private_key, token_claim_1h(tenant))
while repeat < max_repeat: