Implemented public endpoints, started allow list

This commit is contained in:
Markus Pilman 2022-02-16 18:12:03 +01:00
parent 356ba4fc4b
commit c7899b9d39
10 changed files with 289 additions and 57 deletions

View File

@ -42,13 +42,13 @@ struct CommitProxyInterface {
Optional<Key> processId;
bool provisional;
RequestStream<struct CommitTransactionRequest> commit;
RequestStream<struct GetReadVersionRequest>
RequestStream<struct CommitTransactionRequest, true> commit;
RequestStream<struct GetReadVersionRequest, true>
getConsistentReadVersion; // Returns a version which (1) is committed, and (2) is >= the latest version reported
// committed (by a commit response) when this request was sent
// (at some point between when this request is sent and when its response is
// received, the latest version reported committed)
RequestStream<struct GetKeyServerLocationsRequest> getKeyServersLocations;
RequestStream<struct GetKeyServerLocationsRequest, true> getKeyServersLocations;
RequestStream<struct GetStorageServerRejoinInfoRequest> getStorageServerRejoinInfo;
RequestStream<ReplyPromise<Void>> waitFailure;

View File

@ -32,8 +32,8 @@
const int MAX_CLUSTER_FILE_BYTES = 60000;
struct ClientLeaderRegInterface {
RequestStream<struct GetLeaderRequest> getLeader;
RequestStream<struct OpenDatabaseCoordRequest> openDatabase;
RequestStream<struct GetLeaderRequest, true> getLeader;
RequestStream<struct OpenDatabaseCoordRequest, true> openDatabase;
RequestStream<struct CheckDescriptorMutableRequest> checkDescriptorMutable;
ClientLeaderRegInterface() {}

View File

@ -36,7 +36,7 @@ struct GrvProxyInterface {
Optional<Key> processId;
bool provisional;
RequestStream<struct GetReadVersionRequest>
RequestStream<struct GetReadVersionRequest, true>
getConsistentReadVersion; // Returns a version which (1) is committed, and (2) is >= the latest version reported
// committed (by a commit response) when this request was sent
// (at some point between when this request is sent and when its response is received, the latest version reported

View File

@ -60,30 +60,30 @@ struct StorageServerInterface {
UID uniqueID;
Optional<UID> tssPairID;
RequestStream<struct GetValueRequest> getValue;
RequestStream<struct GetKeyRequest> getKey;
RequestStream<struct GetValueRequest, true> getValue;
RequestStream<struct GetKeyRequest, true> getKey;
// Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large
// selector offset prevents all data from being read in one range read
RequestStream<struct GetKeyValuesRequest> getKeyValues;
RequestStream<struct GetKeyValuesAndFlatMapRequest> getKeyValuesAndFlatMap;
RequestStream<struct GetKeyValuesRequest, true> getKeyValues;
RequestStream<struct GetKeyValuesAndFlatMapRequest, true> getKeyValuesAndFlatMap;
RequestStream<struct GetShardStateRequest> getShardState;
RequestStream<struct GetShardStateRequest, true> getShardState;
RequestStream<struct WaitMetricsRequest> waitMetrics;
RequestStream<struct SplitMetricsRequest> splitMetrics;
RequestStream<struct GetStorageMetricsRequest> getStorageMetrics;
RequestStream<ReplyPromise<Void>> waitFailure;
RequestStream<ReplyPromise<Void>, true> waitFailure;
RequestStream<struct StorageQueuingMetricsRequest> getQueuingMetrics;
RequestStream<ReplyPromise<KeyValueStoreType>> getKeyValueStoreType;
RequestStream<struct WatchValueRequest> watchValue;
RequestStream<struct WatchValueRequest, true> watchValue;
RequestStream<struct ReadHotSubRangeRequest> getReadHotRanges;
RequestStream<struct SplitRangeRequest> getRangeSplitPoints;
RequestStream<struct GetKeyValuesStreamRequest> getKeyValuesStream;
RequestStream<struct ChangeFeedStreamRequest> changeFeedStream;
RequestStream<struct OverlappingChangeFeedsRequest> overlappingChangeFeeds;
RequestStream<struct ChangeFeedPopRequest> changeFeedPop;
RequestStream<struct ChangeFeedVersionUpdateRequest> changeFeedVersionUpdate;
RequestStream<struct GetKeyValuesStreamRequest, true> getKeyValuesStream;
RequestStream<struct ChangeFeedStreamRequest, true> changeFeedStream;
RequestStream<struct OverlappingChangeFeedsRequest, true> overlappingChangeFeeds;
RequestStream<struct ChangeFeedPopRequest, true> changeFeedPop;
RequestStream<struct ChangeFeedVersionUpdateRequest, true> changeFeedVersionUpdate;
explicit StorageServerInterface(UID uid) : uniqueID(uid) {}
StorageServerInterface() : uniqueID(deterministicRandom()->randomUniqueID()) {}

View File

@ -27,6 +27,7 @@
#include <memcheck.h>
#endif
#include "fdbrpc/TenantInfo.h"
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/FailureMonitor.h"
#include "fdbrpc/HealthMonitor.h"
@ -204,6 +205,7 @@ struct EndpointNotFoundReceiver final : NetworkMessageReceiver {
Endpoint e = FlowTransport::transport().loadedEndpoint(token);
IFailureMonitor::failureMonitor().endpointNotFound(e);
}
bool isPublic() const override { return true; }
};
struct PingRequest {
@ -228,6 +230,29 @@ struct PingReceiver final : NetworkMessageReceiver {
PeerCompatibilityPolicy peerCompatibilityPolicy() const override {
return PeerCompatibilityPolicy{ RequirePeer::AtLeast, ProtocolVersion::withStableInterfaces() };
}
bool isPublic() const override { return true; }
};
struct TenantAuthorizer final : NetworkMessageReceiver {
void receive(ArenaObjectReader& reader) override {
AuthorizationRequest req;
try {
reader.deserialize(req);
// TODO: verify that token is valid
AuthorizedTenants& auth = reader.variable<AuthorizedTenants>("AuthorizedTenants");
for (auto const& t : req.tenants) {
auth.authorizedTenants.insert(TenantInfoRef(auth.arena, t));
}
req.reply.send(Void());
} catch (Error& e) {
if (e.code() == error_code_permission_denied) {
req.reply.sendError(e);
} else {
throw;
}
}
}
bool isPublic() const override { return true; }
};
class TransportData {
@ -918,6 +943,8 @@ ACTOR static void deliver(TransportData* self,
Endpoint destination,
TaskPriority priority,
ArenaReader reader,
NetworkAddress peerAddress,
AuthorizedTenants authorizedTenants,
bool inReadSocket) {
// We want to run the task at the right priority. If the priority is higher than the current priority (which is
// ReadSocket) we can just upgrade. Otherwise we'll context switch so that we don't block other tasks that might run
@ -932,6 +959,11 @@ ACTOR static void deliver(TransportData* self,
auto receiver = self->endpoints.get(destination.token);
if (receiver) {
if (!authorizedTenants.trusted && !receiver->isPublic()) {
TraceEvent(SevWarnAlways, "AttemptedRPCToPrivatePrevented")
.detail("From", peerAddress);
throw connection_failed();
}
if (!checkCompatible(receiver->peerCompatibilityPolicy(), reader.protocolVersion())) {
return;
}
@ -940,6 +972,9 @@ ACTOR static void deliver(TransportData* self,
StringRef data = reader.arenaReadAll();
ASSERT(data.size() > 8);
ArenaObjectReader objReader(reader.arena(), reader.arenaReadAll(), AssumeVersion(reader.protocolVersion()));
bool didInsert = objReader.variable<AuthorizedTenants>("AuthorizedTenants", &authorizedTenants);
didInsert = didInsert && objReader.variable<NetworkAddress>("PeerAddress", &peerAddress);
ASSERT(didInsert); // check that we could set both context variables
receiver->receive(objReader);
g_currentDeliveryPeerAddress = { NetworkAddress() };
} catch (Error& e) {
@ -977,6 +1012,7 @@ static void scanPackets(TransportData* transport,
const uint8_t* e,
Arena& arena,
NetworkAddress const& peerAddress,
AuthorizedTenants authorizedTenants,
ProtocolVersion peerProtocolVersion) {
// Find each complete packet in the given byte range and queue a ready task to deliver it.
// Remove the complete packets from the range by increasing unprocessed_begin.
@ -1090,7 +1126,13 @@ static void scanPackets(TransportData* transport,
// we have many messages to UnknownEndpoint we want to optimize earlier. As deliver is an actor it
// will allocate some state on the heap and this prevents it from doing that.
if (priority != TaskPriority::UnknownEndpoint || (token.first() & TOKEN_STREAM_FLAG) != 0) {
deliver(transport, Endpoint({ peerAddress }, token), priority, std::move(reader), true);
deliver(transport,
Endpoint({ peerAddress }, token),
priority,
std::move(reader),
peerAddress,
authorizedTenants,
true);
}
unprocessed_begin = p = p + packetLen;
@ -1136,8 +1178,11 @@ ACTOR static Future<Void> connectionReader(TransportData* transport,
state bool incompatibleProtocolVersionNewer = false;
state NetworkAddress peerAddress;
state ProtocolVersion peerProtocolVersion;
state AuthorizedTenants authorizedTenants;
peerAddress = conn->getPeerAddress();
// TODO: check whether peers ip is in trusted range
authorizedTenants.trusted = true;
if (!peer) {
ASSERT(!peerAddress.isPublic());
}
@ -1286,7 +1331,7 @@ ACTOR static Future<Void> connectionReader(TransportData* transport,
if (!expectConnectPacket) {
if (compatible || peerProtocolVersion.hasStableInterfaces()) {
scanPackets(
transport, unprocessed_begin, unprocessed_end, arena, peerAddress, peerProtocolVersion);
transport, unprocessed_begin, unprocessed_end, arena, peerAddress, authorizedTenants, peerProtocolVersion);
} else {
unprocessed_begin = unprocessed_end;
peer->resetPing.trigger();
@ -1554,8 +1599,15 @@ static void sendLocal(TransportData* self, ISerializeSource const& what, const E
ASSERT(copy.size() > 0);
TaskPriority priority = self->endpoints.getPriority(destination.token);
if (priority != TaskPriority::UnknownEndpoint || (destination.token.first() & TOKEN_STREAM_FLAG) != 0) {
deliver(
self, destination, priority, ArenaReader(copy.arena(), copy, AssumeVersion(currentProtocolVersion)), false);
AuthorizedTenants authorizedTenants;
authorizedTenants.trusted = true;
deliver(self,
destination,
priority,
ArenaReader(copy.arena(), copy, AssumeVersion(currentProtocolVersion)),
NetworkAddress(),
authorizedTenants,
false);
}
}

73
fdbrpc/TenantInfo.h Normal file
View File

@ -0,0 +1,73 @@
/*
* TenantInfo.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#ifndef FDBRPC_TENANTINFO_H_
#define FDBRPC_TENANTINFO_H_
#include "flow/Arena.h"
#include "fdbrpc/fdbrpc.h"
#include <set>
struct TenantInfoRef {
TenantInfoRef() {}
TenantInfoRef(Arena& p, StringRef toCopy) : tenantName(StringRef(p, toCopy)) {}
TenantInfoRef(Arena& p, TenantInfoRef toCopy)
: tenantName(toCopy.tenantName.present() ? Optional<StringRef>(StringRef(p, toCopy.tenantName.get()))
: Optional<StringRef>()) {}
// Empty tenant name means that the peer is trusted
Optional<StringRef> tenantName;
bool operator<(TenantInfoRef const& other) const {
if (!other.tenantName.present()) {
return false;
}
if (!tenantName.present()) {
return true;
}
return tenantName.get() < other.tenantName.get();
}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, tenantName);
}
};
struct AuthorizedTenants {
Arena arena;
std::set<TenantInfoRef> authorizedTenants;
bool trusted = false;
};
// TODO: receive and validate token instead
struct AuthorizationRequest {
constexpr static FileIdentifier file_identifier = 11499331;
Arena arena;
VectorRef<TenantInfoRef> tenants;
ReplyPromise<Void> reply;
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, tenants, reply, arena);
}
};
#endif // FDBRPC_TENANTINFO_H_

View File

@ -101,6 +101,7 @@ enum {
OPT_TRACECLOCK, OPT_NUMTESTERS, OPT_DEVHELP, OPT_ROLLSIZE, OPT_MAXLOGS, OPT_MAXLOGSSIZE, OPT_KNOB, OPT_UNITTESTPARAM, OPT_TESTSERVERS, OPT_TEST_ON_SERVERS, OPT_METRICSCONNFILE,
OPT_METRICSPREFIX, OPT_LOGGROUP, OPT_LOCALITY, OPT_IO_TRUST_SECONDS, OPT_IO_TRUST_WARN_ONLY, OPT_FILESYSTEM, OPT_PROFILER_RSS_SIZE, OPT_KVFILE,
OPT_TRACE_FORMAT, OPT_WHITELIST_BINPATH, OPT_BLOB_CREDENTIAL_FILE, OPT_CONFIG_PATH, OPT_USE_TEST_CONFIG_DB, OPT_FAULT_INJECTION, OPT_PROFILER, OPT_PRINT_SIMTIME,
OPT_IP_TRUSTED_MASK,
};
CSimpleOpt::SOption g_rgOptions[] = {
@ -188,7 +189,8 @@ CSimpleOpt::SOption g_rgOptions[] = {
{ OPT_FAULT_INJECTION, "-fi", SO_REQ_SEP },
{ OPT_FAULT_INJECTION, "--fault-injection", SO_REQ_SEP },
{ OPT_PROFILER, "--profiler-", SO_REQ_SEP},
{ OPT_PRINT_SIMTIME, "--print-sim-time", SO_NONE },
{ OPT_PRINT_SIMTIME, "--print-sim-time", SO_NONE },
{ OPT_IP_TRUSTED_MASK, "--trusted-subnet-", SO_REQ_SEP },
#ifndef TLS_DISABLED
TLS_OPTION_FLAGS
@ -294,7 +296,7 @@ UID getSharedMemoryMachineId() {
std::string sharedMemoryIdentifier = "fdbserver_shared_memory_id";
loop {
try {
// "0" is the default parameter "addr"
// "0" is the default netPrefix "addr"
boost::interprocess::managed_shared_memory segment(
boost::interprocess::open_or_create, sharedMemoryIdentifier.c_str(), 1000, 0, p.permission);
machineId = segment.find_or_construct<UID>("machineId")(deterministicRandom()->randomUniqueID());
@ -1666,7 +1668,103 @@ private:
};
} // namespace
#include <fmt/printf.h>
struct AuthAllowedSubnet {
IPAddress baseAddress;
IPAddress addressMask;
AuthAllowedSubnet(IPAddress const& baseAddress, IPAddress const& addressMask)
: baseAddress(baseAddress), addressMask(addressMask) {
ASSERT(baseAddress.isV4() == addressMask.isV4());
}
static AuthAllowedSubnet fromString(std::string_view addressString) {
auto pos = addressString.find('/');
if (pos == std::string_view::npos) {
fmt::print("ERROR: {} is not a valid (use Network-Prefix/hostcount syntax)\n");
throw invalid_option();
}
auto address = addressString.substr(0, pos);
auto hostCount = std::stoi(std::string(addressString.substr(pos + 1)));
auto addr = boost::asio::ip::make_address(address);
if (addr.is_v4()) {
auto bM = createBitMask(addr.to_v4().to_bytes(), hostCount);
// we typically would expect a base address has been passed, but to be safe we still
// will make the last bits 0
auto mask = boost::asio::ip::address_v4(bM).to_uint();
auto baseAddress = addr.to_v4().to_uint() & mask;
return AuthAllowedSubnet(IPAddress(baseAddress), IPAddress(mask));
} else {
auto mask = createBitMask(addr.to_v6().to_bytes(), hostCount);
auto baseAddress = addr.to_v6().to_bytes();
for (int i = 0; i < mask.size(); ++i) {
baseAddress[i] &= mask[i];
}
return AuthAllowedSubnet(IPAddress(baseAddress), IPAddress(mask));
}
}
template <std::size_t sz>
static auto createBitMask(std::array<unsigned char, sz> const& addr, int hostCount) -> std::array<unsigned char, sz> {
std::array<unsigned char, sz> res;
res.fill((unsigned char)0xff);
for (auto idx = (hostCount / 8) - 1; idx < res.size(); ++idx) {
if (hostCount > 0) {
// 2^(hostCount % 8) - 1 sets the last (hostCount % 8) number of bits to 1
// everything else will be zero. For example: 2^3 - 1 == 7 == 0b111
unsigned char bitmask = (1 << (hostCount % 8)) - ((unsigned char)1);
res[idx] ^= bitmask;
} else {
res[idx] = (unsigned char)0;
}
hostCount = 0;
}
return res;
}
bool operator() (IPAddress const& address) const {
if (addressMask.isV4() != address.isV4()) {
return false;
}
if (addressMask.isV4()) {
return (addressMask.toV4() & address.toV4()) == baseAddress.toV4();
} else {
auto res = address.toV6();
auto const& mask = addressMask.toV6();
for (int i = 0; i < res.size(); ++i) {
res[i] &= mask[i];
}
return res == baseAddress.toV6();
}
}
};
template<std::size_t C>
void printIP(std::array<unsigned char, C> const& addr) {
for (auto c : addr) {
fmt::print(" {:02x}", int(c));
}
}
void printIP(std::string_view txt, IPAddress const& address) {
fmt::print("{}:", txt);
if (address.isV4()) {
printIP(boost::asio::ip::address_v4(address.toV4()).to_bytes());
} else {
printIP(address.toV6());
}
fmt::print("\n");
}
int main(int argc, char* argv[]) {
//auto allowed = AuthAllowedSubnet::fromString(argv[1]);
//printIP("Base Address", allowed.baseAddress);
//printIP("Address Mask", allowed.addressMask);
//for (int idx = 1; idx < argc; ++idx) {
// auto addr = IPAddress::parse(argv[idx]);
//}
//return 0;
// TODO: Remove later, this is just to force the statics to be initialized
// otherwise the unit test won't run
#ifdef ENABLE_SAMPLING

View File

@ -24,9 +24,13 @@
#include "flow/flat_buffers.h"
#include "flow/ProtocolVersion.h"
#include <unordered_map>
template <class Ar>
struct LoadContext {
Ar* ar;
std::unordered_map<std::string_view, void*> variables;
LoadContext(Ar* ar) : ar(ar) {}
Arena& arena() { return ar->arena(); }
@ -47,6 +51,23 @@ struct LoadContext {
void addArena(Arena& arena) { arena = ar->arena(); }
LoadContext& context() { return *this; }
template <class T>
bool variable(std::string_view name, T* val) {
auto p = variables.insert(std::make_pair(name, val));
return p.second;
}
template <class T>
T& variable(std::string_view name) {
auto res = variables.at(name);
return *reinterpret_cast<T*>(res);
}
template <class T>
T const& variable(std::string_view name) const {
return const_cast<LoadContext<Ar>*>(this)->variable<T>(name);
}
};
template <class Ar, class Allocator>
@ -69,15 +90,16 @@ template <class ReaderImpl>
class _ObjectReader {
protected:
ProtocolVersion mProtocolVersion;
LoadContext<ReaderImpl> context;
public:
_ObjectReader() : context(static_cast<ReaderImpl*>(this)) {}
ProtocolVersion protocolVersion() const { return mProtocolVersion; }
void setProtocolVersion(ProtocolVersion v) { mProtocolVersion = v; }
template <class... Items>
void deserialize(FileIdentifier file_identifier, Items&... items) {
const uint8_t* data = static_cast<ReaderImpl*>(this)->data();
LoadContext<ReaderImpl> context(static_cast<ReaderImpl*>(this));
if (read_file_identifier(data) != file_identifier) {
// Some file identifiers are changed in 7.0, so file identifier mismatches
// are expected during a downgrade from 7.0 to 6.3
@ -100,6 +122,21 @@ public:
void deserialize(Item& item) {
deserialize(FileIdentifierFor<Item>::value, item);
}
template <class T>
bool variable(std::string_view name, T* val) {
return context.template variable<T>(name, val);
}
template <class T>
T& variable(std::string_view name) {
return context.template variable<T>(name);
}
template <class T>
T const& variable(std::string_view name) const {
return context.template variable<T>(name);
}
};
class ObjectReader : public _ObjectReader<ObjectReader> {

View File

@ -273,6 +273,10 @@ ERROR( snap_invalid_uid_string, 2509, "The given uid string is not a 32-length h
// 4xxx Internal errors (those that should be generated only by bugs) are decimal 4xxx
ERROR( unknown_error, 4000, "An unknown error occurred" ) // C++ exception not of type Error
ERROR( internal_error, 4100, "An internal error occurred" )
// 5xxx Authorization and authentication error codes
ERROR( permission_denied, 5000, "Client is not allowed to access endpoint" )
ERROR( unauthorized_attempt, 5001, "A untrusted client tried to send a message to a private endpoint" )
// clang-format on
#undef ERROR

View File

@ -6,35 +6,3 @@ testTitle = 'Clogged'
transactionsPerSecond = 2500.0
testDuration = 10.0
expectedRate = 0
[[test.workload]]
testName = 'RandomClogging'
testDuration = 10.0
[[test.workload]]
testName = 'Rollback'
meanDelay = 10.0
testDuration = 10.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 10.0
[[test.workload]]
testName = 'Attrition'
machinesToKill = 10
machinesToLeave = 3
reboot = true
testDuration = 10.0
[[test]]
testTitle = 'Unclogged'
[[test.workload]]
testName = 'Cycle'
transactionsPerSecond = 250.0
testDuration = 10.0
expectedRate = 0.80