Migrated Authz code to use JWT
This commit is contained in:
parent
799fe32346
commit
13d8b13722
|
@ -7478,7 +7478,7 @@ ACTOR Future<TenantMapEntry> blobGranuleGetTenantEntry(Transaction* self, Key ra
|
|||
self->trState->cx->getCachedLocation(self->getTenant().get(), rangeStartKey, Reverse::False);
|
||||
if (!cachedLocationInfo.present()) {
|
||||
KeyRangeLocationInfo l = wait(getKeyLocation_internal(self->trState->cx,
|
||||
self->getTenant().get(),
|
||||
self->trState->getTenantInfo(true),
|
||||
rangeStartKey,
|
||||
self->trState->spanContext,
|
||||
self->trState->debugID,
|
||||
|
|
|
@ -407,7 +407,7 @@ std::string constructResourcePath(Reference<S3BlobStoreEndpoint> b, std::string
|
|||
resource += object;
|
||||
}
|
||||
|
||||
return std::move(resource);
|
||||
return resource;
|
||||
}
|
||||
|
||||
ACTOR Future<bool> bucketExists_impl(Reference<S3BlobStoreEndpoint> b, std::string bucket) {
|
||||
|
|
|
@ -308,15 +308,21 @@ public:
|
|||
bool empty() const { return queue.empty(); }
|
||||
};
|
||||
|
||||
using SignedAuthTokenTTL = std::pair<double, SignedAuthToken>;
|
||||
struct SignedAuthTokenTTL {
|
||||
Arena arena;
|
||||
double expirationTime;
|
||||
StringRef serializedToken;
|
||||
authz::jwt::TokenRef token;
|
||||
};
|
||||
struct SignedAuthTokenTTLCmp {
|
||||
constexpr bool operator()(const SignedAuthTokenTTL& lhs, const SignedAuthTokenTTL& rhs) const {
|
||||
return lhs.first > rhs.first;
|
||||
return lhs.expirationTime > rhs.expirationTime;
|
||||
}
|
||||
};
|
||||
|
||||
struct SignedAuthTokenCmp {
|
||||
bool operator()(const SignedAuthTokenTTL& lhs, const SignedAuthTokenTTL& rhs) const {
|
||||
return lhs.second.signature == rhs.second.signature;
|
||||
return lhs.token.signature == rhs.token.signature;
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -337,7 +343,7 @@ class TokenCache : NonCopyable {
|
|||
};
|
||||
|
||||
class LRUCache {
|
||||
using ListEntry = std::pair<StringRef, CachedToken>;
|
||||
using ListEntry = StringRef;
|
||||
using List = std::list<ListEntry>;
|
||||
List list;
|
||||
boost::unordered_map<StringRef, List::iterator> map;
|
||||
|
@ -346,13 +352,14 @@ class TokenCache : NonCopyable {
|
|||
void deleteOldestIfFull() {
|
||||
while (map.size() > max_size) {
|
||||
auto last = list.rbegin();
|
||||
map.erase(last->first);
|
||||
map.erase(*last);
|
||||
list.pop_back();
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
void add(StringRef signature, CachedToken&& token) {
|
||||
list.emplace_front(signature, std::move(token));
|
||||
void add(StringRef signature) {
|
||||
list.emplace_front(signature);
|
||||
map[signature] = list.begin();
|
||||
deleteOldestIfFull();
|
||||
}
|
||||
|
@ -364,7 +371,7 @@ class TokenCache : NonCopyable {
|
|||
// we don't need to update the LRU
|
||||
return true;
|
||||
} else {
|
||||
list.emplace_front(signature, iter->second->second);
|
||||
list.emplace_front(signature);
|
||||
list.erase(iter->second);
|
||||
return true;
|
||||
}
|
||||
|
@ -383,7 +390,7 @@ class TokenCache : NonCopyable {
|
|||
StringRef signature = self->priorityQueue.top().second;
|
||||
auto iter = self->tokens.find(signature);
|
||||
ASSERT(iter != self->tokens.end());
|
||||
self->expiredTokens.add(signature, CachedToken(iter->second));
|
||||
self->expiredTokens.add(signature);
|
||||
self->tokens.erase(iter);
|
||||
self->priorityQueue.pop();
|
||||
}
|
||||
|
@ -399,37 +406,63 @@ class TokenCache : NonCopyable {
|
|||
public:
|
||||
explicit TokenCache() { cleaner = cleanerJob(this); }
|
||||
|
||||
void addToken(Reference<AuthorizedTenants> tenants, SignedAuthTokenRef token) {
|
||||
auto iter = tokens.find(token.signature);
|
||||
void addToken(Reference<AuthorizedTenants> tenants, StringRef token) {
|
||||
auto sig = authz::jwt::signaturePart(token);
|
||||
auto iter = tokens.find(sig);
|
||||
if (iter != tokens.end()) {
|
||||
tenants->add(iter->second.expiresAt, iter->second.tenants);
|
||||
} else if (expiredTokens.has(token.signature)) {
|
||||
} else if (expiredTokens.has(sig)) {
|
||||
TraceEvent(SevWarn, "InvalidToken")
|
||||
.detail("From", g_currentDeliveryPeerAddress.address)
|
||||
.detail("Reason", "Expired");
|
||||
} else {
|
||||
auto key = FlowTransport::transport().getPublicKeyByName(token.keyName);
|
||||
if (key.present() && verifyToken(token, key.get())) {
|
||||
AuthTokenRef ref;
|
||||
ObjectReader r(token.token.begin(), AssumeVersion(g_network->protocolVersion()));
|
||||
r.deserialize(ref);
|
||||
Arena arena;
|
||||
authz::jwt::TokenRef t;
|
||||
if (!authz::jwt::parseToken(arena, t, token)) {
|
||||
return;
|
||||
}
|
||||
if (!t.keyId.present()) {
|
||||
TraceEvent(SevWarn, "InvalidToken")
|
||||
.detail("From", g_currentDeliveryPeerAddress.address)
|
||||
.detail("Reason", "NoKeyID");
|
||||
return;
|
||||
}
|
||||
auto key = FlowTransport::transport().getPublicKeyByName(t.keyId.get());
|
||||
if (key.present() && authz::jwt::verifyToken(token, key.get())) {
|
||||
double currentTime = g_network->timer();
|
||||
if (!t.expiresAtUnixTime.present()) {
|
||||
TraceEvent(SevWarn, "InvalidToken")
|
||||
.detail("From", g_currentDeliveryPeerAddress.address)
|
||||
.detail("Reason", "NoExpirationTime");
|
||||
} else if (double(t.expiresAtUnixTime.get()) <= currentTime) {
|
||||
TraceEvent(SevWarn, "InvalidToken")
|
||||
.detail("From", g_currentDeliveryPeerAddress.address)
|
||||
.detail("Reason", "Expired");
|
||||
}
|
||||
if (!t.notBeforeUnixTime.present()) {
|
||||
TraceEvent(SevWarn, "InvalidToken")
|
||||
.detail("From", g_currentDeliveryPeerAddress.address)
|
||||
.detail("Reason", "NoNotBefore");
|
||||
} else if (double(t.notBeforeUnixTime.get()) > currentTime) {
|
||||
TraceEvent(SevWarn, "InvalidToken")
|
||||
.detail("From", g_currentDeliveryPeerAddress.address)
|
||||
.detail("Reason", "TokenNotYetValid");
|
||||
}
|
||||
if (!t.tenants.present()) {
|
||||
TraceEvent(SevWarn, "InvalidToken")
|
||||
.detail("From", g_currentDeliveryPeerAddress.address)
|
||||
.detail("Reason", "NoTenants");
|
||||
}
|
||||
CachedToken c;
|
||||
c.expiresAt = ref.expiresAt;
|
||||
for (auto tenant : ref.tenants) {
|
||||
c.expiresAt = double(t.expiresAtUnixTime.get());
|
||||
for (auto tenant : t.tenants.get()) {
|
||||
c.tenants.push_back_deep(c.arena, tenant);
|
||||
}
|
||||
StringRef signature(c.arena, token.signature);
|
||||
if (ref.expiresAt <= now()) {
|
||||
expiredTokens.add(signature, std::move(c));
|
||||
TraceEvent(SevWarn, "InvalidToken")
|
||||
.detail("From", g_currentDeliveryPeerAddress.address)
|
||||
.detail("Reason", "Expired");
|
||||
} else {
|
||||
StringRef signature(c.arena, sig);
|
||||
priorityQueue.emplace(c.expiresAt, signature);
|
||||
tenants->add(c.expiresAt, c.tenants);
|
||||
tokens.emplace(signature, std::move(c));
|
||||
insert.trigger();
|
||||
}
|
||||
} else {
|
||||
TraceEvent(SevWarn, "InvalidSignature")
|
||||
.detail("From", g_currentDeliveryPeerAddress.address)
|
||||
|
@ -446,6 +479,7 @@ public:
|
|||
// needs to change, do not attempt to update it directly, use the setNetworkAddress API as it
|
||||
// will ensure the new toString() cached value is updated.
|
||||
class NetworkAddressCachedString {
|
||||
public:
|
||||
NetworkAddressCachedString() { setAddressList(NetworkAddressList()); }
|
||||
NetworkAddressCachedString(NetworkAddressList const& list) { setAddressList(list); }
|
||||
NetworkAddressList const& getAddressList() const { return addressList; }
|
||||
|
@ -1117,7 +1151,7 @@ void Peer::prependConnectPacket() {
|
|||
if (!transport->tokens.empty()) {
|
||||
AuthorizationRequest req;
|
||||
for (auto t : transport->tokens) {
|
||||
req.tokens.push_back(req.arena, t.second);
|
||||
req.tokens.push_back(req.arena, t.serializedToken);
|
||||
}
|
||||
++transport->countPacketsGenerated;
|
||||
SplitBuffer packetInfoBuffer;
|
||||
|
@ -1845,7 +1879,9 @@ FlowTransport::FlowTransport(uint64_t transportId, int maxWellKnownEndpoints, IP
|
|||
self->multiVersionCleanup = multiVersionCleanupWorker(self);
|
||||
if (g_network->isSimulated()) {
|
||||
for (auto const& p : g_simulator.authKeys) {
|
||||
self->publicKeys.emplace(p.first, p.second.publicKey);
|
||||
Standalone<StringRef> pk;
|
||||
pk = p.second.toPublicKey().writeDer(pk.arena());
|
||||
self->publicKeys.emplace(p.first, pk);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2224,20 +2260,25 @@ HealthMonitor* FlowTransport::healthMonitor() {
|
|||
}
|
||||
|
||||
void FlowTransport::authorizationTokenAdd(StringRef signedToken) {
|
||||
ObjectReader reader(signedToken.begin(), AssumeVersion(g_network->protocolVersion()));
|
||||
SignedAuthTokenRef tokenRef;
|
||||
reader.deserialize(tokenRef);
|
||||
SignedAuthToken token(tokenRef);
|
||||
// we need the TTL to invalidate tokens on the client side
|
||||
auto authToken =
|
||||
ObjectReader::fromStringRef<AuthTokenRef>(token.token, AssumeVersion(g_network->protocolVersion()));
|
||||
if (authToken.expiresAt < now()) {
|
||||
TraceEvent(SevWarnAlways, "AddedExpiredToken").detail("Expired", authToken.expiresAt);
|
||||
SignedAuthTokenTTL token;
|
||||
token.serializedToken = StringRef(token.arena, signedToken);
|
||||
if (authz::jwt::parseToken(token.arena, token.token, token.serializedToken)) {
|
||||
TraceEvent(SevWarnAlways, "AddedInvalidToken").detail("Reason", "CanNotParse").log();
|
||||
return;
|
||||
}
|
||||
self->tokens.emplace(authToken.expiresAt, token);
|
||||
if (!token.token.expiresAtUnixTime.present()) {
|
||||
TraceEvent(SevWarnAlways, "AddedInvalidToken").detail("Reason", "NoExpirationTime").log();
|
||||
}
|
||||
double expiresAt = double(token.token.expiresAtUnixTime.get());
|
||||
if (expiresAt < timer()) {
|
||||
TraceEvent(SevWarnAlways, "AddedInvalidToken")
|
||||
.detail("Reason", "Expired")
|
||||
.detail("ExpirationTime", expiresAt)
|
||||
.log();
|
||||
}
|
||||
self->tokens.emplace(token);
|
||||
AuthorizationRequest req;
|
||||
req.tokens.push_back(req.arena, token);
|
||||
req.tokens.push_back(req.arena, signedToken);
|
||||
// send the token to all existing peers
|
||||
for (auto peer : self->peers) {
|
||||
NetworkAddressList addr;
|
||||
|
@ -2259,7 +2300,8 @@ Optional<StringRef> FlowTransport::getPublicKeyByName(StringRef name) const {
|
|||
}
|
||||
|
||||
void FlowTransport::addPublicKey(StringRef name, StringRef key) {
|
||||
self->publicKeys[name] = key;
|
||||
Standalone<StringRef>& pk = self->publicKeys[name];
|
||||
pk = StringRef(pk.arena(), key);
|
||||
}
|
||||
|
||||
void FlowTransport::removePublicKey(StringRef name) {
|
||||
|
|
|
@ -20,17 +20,19 @@
|
|||
|
||||
#ifndef FLOW_TRANSPORT_H
|
||||
#define FLOW_TRANSPORT_H
|
||||
#include "flow/Arena.h"
|
||||
#pragma once
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "fdbrpc/HealthMonitor.h"
|
||||
#include "flow/genericactors.actor.h"
|
||||
#include "flow/network.h"
|
||||
#include "flow/FileIdentifier.h"
|
||||
#include "flow/ProtocolVersion.h"
|
||||
#include "flow/Net2Packet.h"
|
||||
#include "fdbrpc/ContinuousSample.h"
|
||||
#include "flow/Arena.h"
|
||||
#include "flow/PKey.h"
|
||||
|
||||
enum {
|
||||
WLTOKEN_ENDPOINT_NOT_FOUND = 0,
|
||||
|
|
|
@ -71,7 +71,7 @@ struct AuthorizationRequest {
|
|||
constexpr static FileIdentifier file_identifier = 11499331;
|
||||
|
||||
Arena arena;
|
||||
VectorRef<SignedAuthTokenRef> tokens;
|
||||
VectorRef<StringRef> tokens;
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
|
|
|
@ -302,6 +302,10 @@ StringRef signToken(Arena& arena, TokenRef tokenSpec, PrivateKey privateKey) {
|
|||
return StringRef(out, totalLen);
|
||||
}
|
||||
|
||||
StringRef signToken(Arena& arena, TokenRef tokenSpec, StringRef privateKeyDer) {
|
||||
return signToken(arena, tokenSpec, PrivateKey(DerEncoded{}, privateKeyDer));
|
||||
}
|
||||
|
||||
bool parseHeaderPart(TokenRef& token, StringRef b64urlHeader) {
|
||||
auto tmpArena = Arena();
|
||||
auto [header, valid] = base64url::decode(tmpArena, b64urlHeader);
|
||||
|
@ -412,6 +416,10 @@ bool parseSignaturePart(Arena& arena, TokenRef& token, StringRef b64urlSignature
|
|||
return valid;
|
||||
}
|
||||
|
||||
StringRef signaturePart(StringRef token) {
|
||||
return token.eat("."_sr).eat("."_sr);
|
||||
}
|
||||
|
||||
bool parseToken(Arena& arena, TokenRef& token, StringRef signedToken) {
|
||||
auto b64urlHeader = signedToken.eat("."_sr);
|
||||
auto b64urlPayload = signedToken.eat("."_sr);
|
||||
|
@ -446,6 +454,10 @@ bool verifyToken(StringRef signedToken, PublicKey publicKey) {
|
|||
return verifyStringSignature(b64urlTokenPart, sig, publicKey, keyAlgNid, digest);
|
||||
}
|
||||
|
||||
bool verifyToken(StringRef signedToken, StringRef publicKeyDer) {
|
||||
return verifyToken(signedToken, PublicKey(DerEncoded{}, publicKeyDer));
|
||||
}
|
||||
|
||||
TokenRef makeRandomTokenSpec(Arena& arena, IRandom& rng, Algorithm alg) {
|
||||
if (alg != Algorithm::ES256) {
|
||||
throw unsupported_operation();
|
||||
|
|
|
@ -56,10 +56,6 @@ struct TokenRef {
|
|||
|
||||
struct SignedTokenRef {
|
||||
static constexpr FileIdentifier file_identifier = 5916732;
|
||||
SignedAuthTokenRef() {}
|
||||
SignedAuthTokenRef(Arena& p, const SignedAuthTokenRef& other)
|
||||
: token(p, other.token), keyName(p, other.keyName), signature(p, other.signature) {}
|
||||
|
||||
StringRef token;
|
||||
StringRef keyName;
|
||||
StringRef signature;
|
||||
|
@ -124,6 +120,9 @@ bool parsePayloadPart(Arena& arena, TokenRef& tokenOut, StringRef b64urlPayloadI
|
|||
// using memory allocated from arena
|
||||
bool parseSignaturePart(Arena& arena, TokenRef& tokenOut, StringRef b64urlSignatureIn);
|
||||
|
||||
// Returns the base64 encoded signature of the token
|
||||
StringRef signaturePart(StringRef token);
|
||||
|
||||
// Parse passed token string and materialize its contents into tokenOut,
|
||||
// using memory allocated from arena
|
||||
// Return whether the signed token string is well-formed
|
||||
|
@ -134,13 +133,4 @@ bool verifyToken(StringRef signedToken, StringRef publicKeyDer);
|
|||
|
||||
} // namespace authz::jwt
|
||||
|
||||
// Below method is intented to be used for testing only
|
||||
|
||||
struct KeyPairRef {
|
||||
StringRef privateKey;
|
||||
StringRef publicKey;
|
||||
};
|
||||
|
||||
Standalone<KeyPairRef> generateEcdsaKeyPair();
|
||||
|
||||
#endif // FDBRPC_TOKEN_SIGN_H
|
||||
|
|
|
@ -461,7 +461,7 @@ public:
|
|||
|
||||
bool allowStorageMigrationTypeChange = false;
|
||||
|
||||
std::unordered_map<Standalone<StringRef>, Standalone<KeyPairRef>> authKeys;
|
||||
std::unordered_map<Standalone<StringRef>, PrivateKey> authKeys;
|
||||
|
||||
flowGlobalType global(int id) const final { return getCurrentProcess()->global(id); };
|
||||
void setGlobal(size_t id, flowGlobalType v) final { getCurrentProcess()->setGlobal(id, v); };
|
||||
|
|
|
@ -42,6 +42,7 @@
|
|||
#include "fdbclient/BackupAgent.actor.h"
|
||||
#include "fdbclient/versions.h"
|
||||
#include "fdbclient/WellKnownEndpoints.h"
|
||||
#include "flow/MkCert.h"
|
||||
#include "flow/ProtocolVersion.h"
|
||||
#include "flow/network.h"
|
||||
#include "flow/TypeTraits.h"
|
||||
|
@ -59,7 +60,7 @@ ISimulator::ISimulator()
|
|||
allowLogSetKills(true), tssMode(TSSMode::Disabled), isStopped(false), lastConnectionFailure(0),
|
||||
connectionFailuresDisableDuration(0), speedUpSimulation(false), backupAgents(BackupAgentType::WaitForType),
|
||||
drAgents(BackupAgentType::WaitForType), allSwapsDisabled(false) {
|
||||
authKeys["defaultKey"_sr] = generateEcdsaKeyPair();
|
||||
authKeys["defaultKey"_sr] = mkcert::makeEcP256();
|
||||
}
|
||||
ISimulator::~ISimulator() = default;
|
||||
|
||||
|
@ -590,7 +591,8 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<IClusterConne
|
|||
WLTOKEN_RESERVED_COUNT,
|
||||
&allowList);
|
||||
for (const auto& p : g_simulator.authKeys) {
|
||||
FlowTransport::transport().addPublicKey(p.first, p.second.publicKey);
|
||||
Arena tmp;
|
||||
FlowTransport::transport().addPublicKey(p.first, p.second.toPublicKey().writeDer(tmp));
|
||||
}
|
||||
Sim2FileSystem::newFileSystem();
|
||||
|
||||
|
|
|
@ -39,8 +39,10 @@ struct CycleMembers {};
|
|||
|
||||
template <>
|
||||
struct CycleMembers<true> {
|
||||
Arena arena;
|
||||
TenantName tenant;
|
||||
SignedAuthToken token;
|
||||
authz::jwt::TokenRef token;
|
||||
StringRef signedToken;
|
||||
};
|
||||
|
||||
template <bool MultiTenancy>
|
||||
|
@ -64,15 +66,20 @@ struct CycleWorkload : TestWorkload, CycleMembers<MultiTenancy> {
|
|||
traceParentProbability = getOption(options, "traceParentProbability"_sr, 0.01);
|
||||
minExpectedTransactionsPerSecond = transactionsPerSecond * getOption(options, "expectedRate"_sr, 0.7);
|
||||
if constexpr (MultiTenancy) {
|
||||
this->tenant = getOption(options, "tenant"_sr, "CycleTenant"_sr);
|
||||
AuthTokenRef authToken;
|
||||
// make it confortably longer than the timeout of the workload
|
||||
authToken.expiresAt = now() + getCheckTimeout() + 100;
|
||||
authToken.tenants.push_back_deep(this->token.arena(), this->tenant);
|
||||
// we currently don't support this workload to be run outside of simulation
|
||||
ASSERT(g_network->isSimulated());
|
||||
auto k = g_simulator.authKeys.begin();
|
||||
this->token = signToken(authToken, k->first, k->second.privateKey);
|
||||
this->tenant = getOption(options, "tenant"_sr, "CycleTenant"_sr);
|
||||
// make it confortably longer than the timeout of the workload
|
||||
auto currentTime = uint64_t(lround(g_network->timer()));
|
||||
this->token.algorithm = authz::Algorithm::ES256;
|
||||
this->token.issuedAtUnixTime = currentTime;
|
||||
this->token.expiresAtUnixTime = currentTime + uint64_t(std::lround(getCheckTimeout())) + uint64_t(100);
|
||||
this->token.keyId = k->first;
|
||||
VectorRef<StringRef> tenants;
|
||||
tenants.push_back_deep(this->arena, this->tenant);
|
||||
this->token.tenants = tenants;
|
||||
// we currently don't support this workload to be run outside of simulation
|
||||
this->signedToken = authz::jwt::signToken(this->arena, this->token, k->second.writeDer(this->arena));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -85,9 +92,7 @@ struct CycleWorkload : TestWorkload, CycleMembers<MultiTenancy> {
|
|||
}
|
||||
Future<Void> setup(Database const& cx) override {
|
||||
if constexpr (MultiTenancy) {
|
||||
cx->defaultTenant = this->tenant;
|
||||
Value v = ObjectWriter::toValue(this->token, Unversioned());
|
||||
FlowTransport::transport().authorizationTokenAdd(v);
|
||||
FlowTransport::transport().authorizationTokenAdd(this->signedToken);
|
||||
}
|
||||
return bulkSetup(cx, this, nodeCount, Promise<double>());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue