From ccf97eb1876d819d5b121e572775311ecf694f29 Mon Sep 17 00:00:00 2001 From: Markus Pilman Date: Fri, 22 Apr 2022 17:06:05 -0600 Subject: [PATCH] Make transport work --- fdbrpc/FlowTransport.actor.cpp | 45 +++++++++++++++++++++------- fdbrpc/TokenSign.cpp | 4 +-- fdbserver/CMakeLists.txt | 1 + fdbserver/SimulatedCluster.actor.cpp | 3 ++ fdbserver/tester.actor.cpp | 4 ++- flow/serialize.h | 7 ++--- 6 files changed, 46 insertions(+), 18 deletions(-) diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index 7944391f44..279a7b0e67 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -251,7 +251,9 @@ struct TenantAuthorizer final : NetworkMessageReceiver { for (const auto& t : req.tokens) { auto key = transport.getPublicKeyByName(t.keyName); if (key.present() && verifyToken(t, key.get())) { - auto token = ObjectReader::fromStringRef(t.token, Unversioned()); + ObjectReader r(t.token.begin(), AssumeVersion(reader.protocolVersion())); + AuthTokenRef token; + r.deserialize(token); Reference& auth = std::any_cast&>(reader.variable("AuthorizedTenants")); auth->add(token.expiresAt, token.tenants); @@ -287,10 +289,14 @@ struct UnauthorizedEndpointReceiver final : NetworkMessageReceiver { bool isPublic() const override { return true; } }; -template , class Cmp = std::less> -class IterablePriorityQueue { +// A priority queue with two additional properties: +// - All values within the queue are unique +// - One can iterate over the queue +template , class Cmp = std::less, class ValEq = std::equal_to> +class IterableUniquePriorityQueue { Container queue; Cmp cmp; + ValEq valCmp; using const_iterator = typename Container::const_iterator; public: @@ -302,10 +308,18 @@ public: queue.push_back(std::move(val)); std::push_heap(queue.begin(), queue.end(), cmp); } + // runs in O(n) -- so this class should only be used if we expect the max size of the queue to be small template - void emplace(Args&&... args) { - queue.emplace_back(std::forward(args)...); + bool emplace(Args&&... args) { + T el(std::forward(args)...); + for (const auto& element : *this) { + if (valCmp(element, el)) { + return false; + } + } + queue.emplace_back(std::move(el)); std::push_heap(queue.begin(), queue.end(), cmp); + return true; } const T& front() const { return queue.begin(); } void pop() { @@ -319,9 +333,14 @@ public: using SignedAuthTokenTTL = std::pair; struct SignedAuthTokenTTLCmp { - bool operator()(const SignedAuthTokenTTL& lhs, const SignedAuthTokenTTL& rhs) { return lhs.first > rhs.first; } + constexpr bool operator()(const SignedAuthTokenTTL& lhs, const SignedAuthTokenTTL& rhs) const { return lhs.first > rhs.first; } }; -using TokenQueue = IterablePriorityQueue, SignedAuthTokenTTLCmp>; +struct SignedAuthTokenCmp { + constexpr bool operator()(const SignedAuthTokenTTL& lhs, const SignedAuthTokenTTL& rhs) const { return lhs.second.signature == rhs.second.signature; } +}; + + +using TokenQueue = IterableUniquePriorityQueue, SignedAuthTokenTTLCmp, SignedAuthTokenCmp>; class TransportData { public: @@ -954,7 +973,6 @@ void Peer::prependConnectPacket() { for (auto t : transport->tokens) { req.tokens.push_back(req.arena, t.second); } - SerializeSource what(req); ++transport->countPacketsGenerated; SplitBuffer packetInfoBuffer; uint32_t len; @@ -969,7 +987,10 @@ void Peer::prependConnectPacket() { } wr.writeAhead(packetInfoSize, &packetInfoBuffer); wr << Endpoint::wellKnownToken(WLTOKEN_AUTH_TENANT); - what.serializePacketWriter(wr); + ObjectWriter writer([&wr](size_t size) { return wr.writeBytes(size); }, + AssumeVersion(g_network->protocolVersion())); + writer.serialize(req); + // what.serializePacketWriter(wr); pb_end = wr.finish(); len = wr.size() - packetInfoSize - pkt.totalPacketSize(); if (checksumEnabled) { @@ -2046,10 +2067,12 @@ HealthMonitor* FlowTransport::healthMonitor() { } void FlowTransport::authorizationTokenAdd(StringRef signedToken) { - auto tokenRef = ObjectReader::fromStringRef(signedToken, Unversioned()); + ObjectReader reader(signedToken.begin(), AssumeVersion(g_network->protocolVersion())); + SignedAuthTokenRef tokenRef; + reader.deserialize(tokenRef); SignedAuthToken token(tokenRef); // we need the TTL to invalidate tokens on the client side - auto authToken = ObjectReader::fromStringRef(token.token, Unversioned()); + auto authToken = ObjectReader::fromStringRef(token.token, AssumeVersion(g_network->protocolVersion())); if (authToken.expiresAt < now()) { TraceEvent(SevWarnAlways, "AddedExpiredToken").detail("Expired", authToken.expiresAt); return; diff --git a/fdbrpc/TokenSign.cpp b/fdbrpc/TokenSign.cpp index 6e7a0092cf..6d4907014f 100644 --- a/fdbrpc/TokenSign.cpp +++ b/fdbrpc/TokenSign.cpp @@ -109,10 +109,10 @@ Standalone generateEcdsaKeyPair() { Standalone signToken(AuthTokenRef token, StringRef keyName, StringRef privateKeyDer) { auto ret = Standalone{}; - auto arena = ret.arena(); + auto& arena = ret.arena(); auto writer = ObjectWriter([&arena](size_t len) { return new (arena) uint8_t[len]; }, Unversioned()); writer.serialize(token); - auto tokenStr = writer.toStringRef(); + auto tokenStr = StringRef(arena, writer.toStringRef()); auto rawPrivKeyDer = privateKeyDer.begin(); auto key = ::d2i_AutoPrivateKey(nullptr, &rawPrivKeyDer, privateKeyDer.size()); diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 8ec4b437d3..ddbf67e3e9 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -207,6 +207,7 @@ set(FDBSERVER_SRCS workloads/ConflictRange.actor.cpp workloads/ConsistencyCheck.actor.cpp workloads/CpuProfiler.actor.cpp + workloads/CreateTenant.actor.cpp workloads/Cycle.actor.cpp workloads/DataDistributionMetrics.actor.cpp workloads/DataLossRecovery.actor.cpp diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index 70e286cfe2..a35e464477 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -588,6 +588,9 @@ ACTOR Future simulatedFDBDRebooter(Reference> futures; diff --git a/fdbserver/tester.actor.cpp b/fdbserver/tester.actor.cpp index 64b9924560..fb2d726a44 100644 --- a/fdbserver/tester.actor.cpp +++ b/fdbserver/tester.actor.cpp @@ -370,7 +370,9 @@ ACTOR Future> getWorkloadIface(WorkloadRequest work, wcx.sharedRandomNumber = work.sharedRandomNumber; workload = IWorkloadFactory::create(testName.toString(), wcx); - wait(workload->initialized()); + if (workload) { + wait(workload->initialized()); + } auto unconsumedOptions = checkAllOptionsConsumed(workload ? workload->options : VectorRef()); if (!workload || unconsumedOptions.size()) { diff --git a/flow/serialize.h b/flow/serialize.h index db58461eed..ba88b919e4 100644 --- a/flow/serialize.h +++ b/flow/serialize.h @@ -851,10 +851,6 @@ struct PacketWriter { } ProtocolVersion protocolVersion() const { return m_protocolVersion; } void setProtocolVersion(ProtocolVersion pv) { m_protocolVersion = pv; } - -private: - void serializeBytesAcrossBoundary(const void* data, int bytes); - void nextBuffer(size_t size = 0 /* downstream it will default to at least 4k minus some padding */); uint8_t* writeBytes(size_t size) { if (size > buffer->bytes_unwritten()) { nextBuffer(size); @@ -865,6 +861,9 @@ private: return result; } +private: + void serializeBytesAcrossBoundary(const void* data, int bytes); + void nextBuffer(size_t size = 0 /* downstream it will default to at least 4k minus some padding */); template friend class MakeSerializeSource;