foundationdb/fdbrpc/FlowTransport.h

227 lines
8.4 KiB
C
Raw Normal View History

2017-05-26 04:48:44 +08:00
/*
* FlowTransport.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
2017-05-26 04:48:44 +08:00
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
2017-05-26 04:48:44 +08:00
* http://www.apache.org/licenses/LICENSE-2.0
*
2017-05-26 04:48:44 +08:00
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FLOW_TRANSPORT_H
#define FLOW_TRANSPORT_H
#pragma once
#include <algorithm>
2019-06-25 06:19:33 +08:00
#include "flow/genericactors.actor.h"
2017-05-26 04:48:44 +08:00
#include "flow/network.h"
#include "flow/FileIdentifier.h"
#include "flow/Net2Packet.h"
2020-10-07 08:01:13 +08:00
#include "fdbrpc/ContinuousSample.h"
2017-05-26 04:48:44 +08:00
#pragma pack(push, 4)
class Endpoint {
public:
// Endpoint represents a particular service (e.g. a serialized Promise<T> or PromiseStream<T>)
// An endpoint is either "local" (used for receiving data) or "remote" (used for sending data)
constexpr static FileIdentifier file_identifier = 10618805;
2017-05-26 04:48:44 +08:00
typedef UID Token;
NetworkAddressList addresses;
2017-05-26 04:48:44 +08:00
Token token;
Endpoint() {}
Endpoint(const NetworkAddressList& addresses, Token token) : addresses(addresses), token(token) {
choosePrimaryAddress();
}
void choosePrimaryAddress() {
if(addresses.secondaryAddress.present() && !g_network->getLocalAddresses().secondaryAddress.present() && (addresses.address.isTLS() != g_network->getLocalAddresses().address.isTLS())) {
std::swap(addresses.address, addresses.secondaryAddress.get());
}
}
2017-05-26 04:48:44 +08:00
bool isValid() const { return token.isValid(); }
bool isLocal() const;
// Return the primary network address, which is the first network address among
// all addresses this endpoint listens to.
const NetworkAddress& getPrimaryAddress() const {
return addresses.address;
}
bool operator == (Endpoint const& r) const {
2019-01-09 23:41:02 +08:00
return getPrimaryAddress() == r.getPrimaryAddress() && token == r.token;
}
bool operator != (Endpoint const& r) const {
return !(*this == r);
}
bool operator < (Endpoint const& r) const {
2019-01-09 23:41:02 +08:00
const NetworkAddress& left = getPrimaryAddress();
const NetworkAddress& right = r.getPrimaryAddress();
if (left != right)
return left < right;
else
return token < r.token;
}
2017-05-26 04:48:44 +08:00
template <class Ar>
void serialize(Ar& ar) {
2019-02-21 03:52:47 +08:00
if constexpr (is_fb_function<Ar>) {
serializer(ar, addresses, token);
2019-02-21 03:52:47 +08:00
if constexpr (Ar::isDeserializing) {
choosePrimaryAddress();
}
2019-02-21 03:52:47 +08:00
} else {
2019-06-19 08:55:27 +08:00
if (ar.isDeserializing && !ar.protocolVersion().hasEndpointAddrList()) {
2019-04-10 05:29:21 +08:00
addresses.secondaryAddress = Optional<NetworkAddress>();
serializer(ar, addresses.address, token);
2019-02-21 03:52:47 +08:00
} else {
serializer(ar, addresses, token);
if (ar.isDeserializing) {
choosePrimaryAddress();
}
}
}
2017-05-26 04:48:44 +08:00
}
};
#pragma pack(pop)
class ArenaObjectReader;
2017-05-26 04:48:44 +08:00
class NetworkMessageReceiver {
public:
virtual void receive( ArenaReader& ) = 0;
virtual void receive(ArenaObjectReader&) = 0;
2017-05-26 04:48:44 +08:00
virtual bool isStream() const { return false; }
};
struct TransportData;
struct Peer : public ReferenceCounted<Peer> {
TransportData* transport;
NetworkAddress destination;
UnsentPacketQueue unsent;
ReliablePacketList reliable;
AsyncTrigger dataToSend; // Triggered when unsent.empty() becomes false
Future<Void> connect;
AsyncTrigger resetPing;
AsyncTrigger resetConnection;
bool compatible;
bool outgoingConnectionIdle; // We don't actually have a connection open and aren't trying to open one because we don't have anything to send
double lastConnectTime;
double reconnectionDelay;
int peerReferences;
bool incompatibleProtocolVersionNewer;
int64_t bytesReceived;
double lastDataPacketSentTime;
int outstandingReplies;
ContinuousSample<double> pingLatencies;
2020-10-07 07:07:35 +08:00
int64_t lastLoggedBytesReceived;
explicit Peer(TransportData* transport, NetworkAddress const& destination)
: transport(transport), destination(destination), outgoingConnectionIdle(true), lastConnectTime(0.0),
reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true), outstandingReplies(0),
incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()),
2020-10-07 08:38:09 +08:00
pingLatencies(destination.isPublic() ? FLOW_KNOBS->PING_SAMPLE_AMOUNT : 1), lastLoggedBytesReceived(0) {}
void send(PacketBuffer* pb, ReliablePacket* rp, bool firstUnsent);
void prependConnectPacket();
void discardUnreliablePackets();
void onIncomingConnection( Reference<Peer> self, Reference<IConnection> conn, Future<Void> reader );
};
2017-05-26 04:48:44 +08:00
class FlowTransport {
public:
FlowTransport(uint64_t transportId);
~FlowTransport();
static void createInstance(bool isClient, uint64_t transportId);
2017-05-26 04:48:44 +08:00
// Creates a new FlowTransport and makes FlowTransport::transport() return it. This uses g_network->global() variables,
// so it will be private to a simulation.
2019-05-30 04:43:21 +08:00
static bool isClient() { return g_network->global(INetwork::enClientFailureMonitor) != nullptr; }
2017-05-26 04:48:44 +08:00
void initMetrics();
// Metrics must be initialized after FlowTransport::createInstance has been called
Future<Void> bind( NetworkAddress publicAddress, NetworkAddress listenAddress );
// Starts a server listening on the given listenAddress, and sets publicAddress to be the public
// address of this server. Returns only errors.
NetworkAddress getLocalAddress() const;
// Returns first local NetworkAddress.
2017-05-26 04:48:44 +08:00
NetworkAddressList getLocalAddresses() const;
// Returns all local NetworkAddress.
2017-05-26 04:48:44 +08:00
std::map<NetworkAddress, std::pair<uint64_t, double>>* getIncompatiblePeers();
// Returns the same of all peers that have attempted to connect, but have incompatible protocol versions
void addPeerReference(const Endpoint&, bool isStream);
2018-07-11 04:10:29 +08:00
// Signal that a peer connection is being used, even if no messages are currently being sent to the peer
void removePeerReference(const Endpoint&, bool isStream);
2018-07-11 04:10:29 +08:00
// Signal that a peer connection is no longer being used
void addEndpoint( Endpoint& endpoint, NetworkMessageReceiver*, TaskPriority taskID );
2017-05-26 04:48:44 +08:00
// Sets endpoint to be a new local endpoint which delivers messages to the given receiver
void removeEndpoint( const Endpoint&, NetworkMessageReceiver* );
// The given local endpoint no longer delivers messages to the given receiver or uses resources
void addWellKnownEndpoint( Endpoint& endpoint, NetworkMessageReceiver*, TaskPriority taskID );
2017-05-26 04:48:44 +08:00
// Sets endpoint to a new local endpoint (without changing its token) which delivers messages to the given receiver
// Implementations may have limitations on when this function is called and what endpoint.token may be!
ReliablePacket* sendReliable( ISerializeSource const& what, const Endpoint& destination );
2017-05-26 04:48:44 +08:00
// sendReliable will keep trying to deliver the data to the destination until cancelReliable is
// called. It will retry sending if the connection is closed or the failure manager reports
// the destination become available (edge triggered).
void cancelReliable( ReliablePacket* );
// Makes Packet "unreliable" (either the data or a connection close event will be delivered
2017-05-26 04:48:44 +08:00
// eventually). It can still be used safely to send a reply to a "reliable" request.
Reference<AsyncVar<bool>> getDegraded();
// This async var will be set to true when the process cannot connect to a public network address that the failure monitor thinks is healthy.
void resetConnection( NetworkAddress address );
// Forces the connection with this address to be reset
Reference<Peer> sendUnreliable( ISerializeSource const& what, const Endpoint& destination, bool openConnection );// { cancelReliable(sendReliable(what,destination)); }
2017-05-26 04:48:44 +08:00
int getEndpointCount();
// for tracing only
bool incompatibleOutgoingConnectionsPresent();
2017-05-26 04:48:44 +08:00
static FlowTransport& transport() { return *static_cast<FlowTransport*>((void*) g_network->global(INetwork::enFlowTransport)); }
static NetworkAddress getGlobalLocalAddress() { return transport().getLocalAddress(); }
static NetworkAddressList getGlobalLocalAddresses() { return transport().getLocalAddresses(); }
2017-05-26 04:48:44 +08:00
Endpoint loadedEndpoint(const UID& token);
2017-05-26 04:48:44 +08:00
private:
class TransportData* self;
};
inline bool Endpoint::isLocal() const {
const auto& localAddrs = FlowTransport::transport().getLocalAddresses();
return addresses.address == localAddrs.address || (localAddrs.secondaryAddress.present() && addresses.address == localAddrs.secondaryAddress.get());
2017-05-26 04:48:44 +08:00
}
#endif