From c25be5699aee4c70399460335045639ebdfae758 Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Tue, 10 Jul 2018 13:10:29 -0700 Subject: [PATCH 1/2] close unneeded connections --- fdbrpc/FlowTransport.actor.cpp | 30 ++++++++++++++++++++++++++++-- fdbrpc/FlowTransport.h | 6 ++++++ fdbrpc/fdbrpc.h | 9 +++++++-- 3 files changed, 41 insertions(+), 4 deletions(-) diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index 5b1bee6e60..7452f0f605 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -226,9 +226,10 @@ struct Peer : NonCopyable { bool outgoingConnectionIdle; // We don't actually have a connection open and aren't trying to open one because we don't have anything to send double lastConnectTime; double reconnectionDelay; + int peerReferences; explicit Peer( TransportData* transport, NetworkAddress const& destination ) - : transport(transport), destination(destination), outgoingConnectionIdle(false), lastConnectTime(0.0), reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true) + : transport(transport), destination(destination), outgoingConnectionIdle(false), lastConnectTime(0.0), reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true), peerReferences(-1) { connect = connectionKeeper(this); } @@ -305,6 +306,10 @@ struct Peer : NonCopyable { state RequestStream< ReplyPromise > remotePing( Endpoint( peer->destination, WLTOKEN_PING_PACKET ) ); loop { + if(peer->peerReferences == 0 && peer->reliable.empty() && peer->unsent.empty()) { + throw connection_failed(); + } + Void _ = wait( delayJittered( FLOW_KNOBS->CONNECTION_MONITOR_LOOP_TIME ) ); // SOMEDAY: Stop monitoring and close the connection after a long period of inactivity with no reliable or onDisconnect requests outstanding @@ -421,7 +426,7 @@ struct Peer : NonCopyable { if (e.code() == error_code_actor_cancelled) throw; // Try to recover, even from serious errors, by retrying - if(self->reliable.empty() && self->unsent.empty()) { + if(self->peerReferences <= 0 && self->reliable.empty() && self->unsent.empty()) { self->connect.cancel(); self->transport->peers.erase(self->destination); delete self; @@ -809,6 +814,27 @@ void FlowTransport::loadedEndpoint( Endpoint& endpoint ) { endpoint.address = g_currentDeliveryPeerAddress; } +void FlowTransport::addPeerReference( const Endpoint& endpoint, NetworkMessageReceiver* receiver ) { + if (!receiver->isStream() || !endpoint.address.isValid()) return; + Peer* peer = self->getPeer(endpoint.address); + if(peer->peerReferences == -1) { + peer->peerReferences = 1; + } else { + peer->peerReferences++; + } +} + +void FlowTransport::removePeerReference( const Endpoint& endpoint, NetworkMessageReceiver* receiver ) { + if (!receiver->isStream() || !endpoint.address.isValid()) return; + Peer* peer = self->getPeer(endpoint.address, false); + if(peer) { + peer->peerReferences--; + if(peer->peerReferences == 0 && peer->reliable.empty() && peer->unsent.empty()) { + peer->incompatibleDataRead.trigger(); + } + } +} + void FlowTransport::addEndpoint( Endpoint& endpoint, NetworkMessageReceiver* receiver, uint32_t taskID ) { endpoint.token = g_random->randomUniqueID(); if (receiver->isStream()) { diff --git a/fdbrpc/FlowTransport.h b/fdbrpc/FlowTransport.h index 4a7209d88a..76fab1cf75 100644 --- a/fdbrpc/FlowTransport.h +++ b/fdbrpc/FlowTransport.h @@ -83,6 +83,12 @@ public: std::map>* getIncompatiblePeers(); // Returns the same of all peers that have attempted to connect, but have incompatible protocol versions + void addPeerReference( const Endpoint&, NetworkMessageReceiver* ); + // Signal that a peer connection is being used, even if no messages are currently being sent to the peer + + void removePeerReference( const Endpoint&, NetworkMessageReceiver* ); + // Signal that a peer connection is no longer being used + void addEndpoint( Endpoint& endpoint, NetworkMessageReceiver*, uint32_t taskID ); // Sets endpoint to be a new local endpoint which delivers messages to the given receiver diff --git a/fdbrpc/fdbrpc.h b/fdbrpc/fdbrpc.h index bf9f32a512..f5957bc347 100644 --- a/fdbrpc/fdbrpc.h +++ b/fdbrpc/fdbrpc.h @@ -34,10 +34,15 @@ struct FlowReceiver : private NetworkMessageReceiver { bool m_isLocalEndpoint; FlowReceiver() : m_isLocalEndpoint(false) {} - FlowReceiver(Endpoint const& remoteEndpoint) : endpoint(remoteEndpoint), m_isLocalEndpoint(false) {} + FlowReceiver(Endpoint const& remoteEndpoint) : endpoint(remoteEndpoint), m_isLocalEndpoint(false) { + FlowTransport::transport().addPeerReference(endpoint, this); + } ~FlowReceiver() { - if (m_isLocalEndpoint) + if (m_isLocalEndpoint) { FlowTransport::transport().removeEndpoint(endpoint, this); + } else { + FlowTransport::transport().removePeerReference(endpoint, this); + } } bool isLocalEndpoint() { return m_isLocalEndpoint; } From a35d5e30d9f9220067866a7e158097d575f7ac4c Mon Sep 17 00:00:00 2001 From: Evan Tschannen Date: Tue, 10 Jul 2018 13:26:28 -0700 Subject: [PATCH 2/2] Added a SevError trace event in case peer references becomes negative --- fdbrpc/FlowTransport.actor.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index 7452f0f605..90231769b8 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -829,6 +829,9 @@ void FlowTransport::removePeerReference( const Endpoint& endpoint, NetworkMessag Peer* peer = self->getPeer(endpoint.address, false); if(peer) { peer->peerReferences--; + if(peer->peerReferences < 0) { + TraceEvent(SevError, "InvalidPeerReferences").detail("References", peer->peerReferences).detail("Address", endpoint.address).detail("Token", endpoint.token); + } if(peer->peerReferences == 0 && peer->reliable.empty() && peer->unsent.empty()) { peer->incompatibleDataRead.trigger(); }