diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index 852bac71d4..478b501454 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -157,7 +157,7 @@ public: countConnClosedWithoutError.init(LiteralStringRef("Net2.CountConnClosedWithoutError")); } - struct Peer* getPeer( NetworkAddress const& address, bool doConnect = true ); + struct Peer* getPeer( NetworkAddress const& address, bool openConnection = true ); NetworkAddress localAddress; std::map peers; @@ -212,11 +212,9 @@ static_assert( sizeof(ConnectPacket) == CONNECT_PACKET_V2_SIZE, "ConnectPacket p static Future connectionReader( TransportData* const& transport, Reference const& conn, Peer* const& peer, Promise const& onConnected ); -static PacketID sendPacket( TransportData* self, ISerializeSource const& what, const Endpoint& destination, bool reliable ); +static PacketID sendPacket( TransportData* self, ISerializeSource const& what, const Endpoint& destination, bool reliable, bool openConnection ); struct Peer : NonCopyable { - // FIXME: Peers don't die! - TransportData* transport; NetworkAddress destination; UnsentPacketQueue unsent; @@ -229,12 +227,10 @@ struct Peer : NonCopyable { double lastConnectTime; double reconnectionDelay; - explicit Peer( TransportData* transport, NetworkAddress const& destination, bool doConnect = true ) - : transport(transport), destination(destination), outgoingConnectionIdle(!doConnect), lastConnectTime(0.0), reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true) + explicit Peer( TransportData* transport, NetworkAddress const& destination ) + : transport(transport), destination(destination), outgoingConnectionIdle(false), lastConnectTime(0.0), reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true) { - if(doConnect) { - connect = connectionKeeper(this); - } + connect = connectionKeeper(this); } void send(PacketBuffer* pb, ReliablePacket* rp, bool firstUnsent) { @@ -422,6 +418,13 @@ struct Peer : NonCopyable { IFailureMonitor::failureMonitor().notifyDisconnect( self->destination ); //< Clients might send more packets in response, which needs to go out on the next connection if (e.code() == error_code_actor_cancelled) throw; // Try to recover, even from serious errors, by retrying + + if(self->reliable.empty() && self->unsent.empty()) { + self->connect.cancel(); + self->transport->peers.erase(self->destination); + delete self; + return Void(); + } } } } @@ -452,7 +455,7 @@ ACTOR static void deliver( TransportData* self, Endpoint destination, ArenaReade sendPacket( self, SerializeSource( Endpoint( self->localAddress, destination.token ) ), Endpoint( destination.address, WLTOKEN_ENDPOINT_NOT_FOUND), - false ); + false, true ); } if( inReadSocket ) @@ -734,10 +737,17 @@ ACTOR static Future listen( TransportData* self, NetworkAddress listenAddr } } -Peer* TransportData::getPeer( NetworkAddress const& address, bool doConnect ) { - auto& peer = peers[address]; - if (!peer) peer = new Peer(this, address, doConnect); - return peer; +Peer* TransportData::getPeer( NetworkAddress const& address, bool openConnection ) { + auto peer = peers.find(address); + if (peer != peers.end()) { + return peer->second; + } + if(!openConnection) { + return NULL; + } + Peer* newPeer = new Peer(this, address); + peers[address] = newPeer; + return newPeer; } ACTOR static Future multiVersionCleanupWorker( TransportData* self ) { @@ -821,7 +831,7 @@ void FlowTransport::addWellKnownEndpoint( Endpoint& endpoint, NetworkMessageRece ASSERT( endpoint.token == otoken ); } -static PacketID sendPacket( TransportData* self, ISerializeSource const& what, const Endpoint& destination, bool reliable ) { +static PacketID sendPacket( TransportData* self, ISerializeSource const& what, const Endpoint& destination, bool reliable, bool openConnection ) { if (destination.address == self->localAddress) { TEST(true); // "Loopback" delivery // SOMEDAY: Would it be better to avoid (de)serialization by doing this check in flow? @@ -844,10 +854,10 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c ++self->countPacketsGenerated; - Peer* peer = self->getPeer(destination.address); + Peer* peer = self->getPeer(destination.address, openConnection); // If there isn't an open connection, a public address, or the peer isn't compatible, we can't send - if ((peer->outgoingConnectionIdle && !destination.address.isPublic()) || (!peer->compatible && destination.token != WLTOKEN_PING_PACKET)) { + if (!peer || (peer->outgoingConnectionIdle && !destination.address.isPublic()) || (!peer->compatible && destination.token != WLTOKEN_PING_PACKET)) { TEST(true); // Can't send to private address without a compatible open connection return (PacketID)NULL; } @@ -935,7 +945,7 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c } PacketID FlowTransport::sendReliable( ISerializeSource const& what, const Endpoint& destination ) { - return sendPacket( self, what, destination, true ); + return sendPacket( self, what, destination, true, true ); } void FlowTransport::cancelReliable( PacketID pid ) { @@ -944,8 +954,8 @@ void FlowTransport::cancelReliable( PacketID pid ) { // SOMEDAY: Call reliable.compact() if a lot of memory is wasted in PacketBuffers by formerly reliable packets mixed with a few reliable ones. Don't forget to delref the new PacketBuffers since they are unsent. } -void FlowTransport::sendUnreliable( ISerializeSource const& what, const Endpoint& destination ) { - sendPacket( self, what, destination, false ); +void FlowTransport::sendUnreliable( ISerializeSource const& what, const Endpoint& destination, bool openConnection ) { + sendPacket( self, what, destination, false, openConnection ); } int FlowTransport::getEndpointCount() { diff --git a/fdbrpc/FlowTransport.h b/fdbrpc/FlowTransport.h index d8d7560a96..4a7209d88a 100644 --- a/fdbrpc/FlowTransport.h +++ b/fdbrpc/FlowTransport.h @@ -102,7 +102,7 @@ public: // Makes PacketID "unreliable" (either the data or a connection close event will be delivered // eventually). It can still be used safely to send a reply to a "reliable" request. - void sendUnreliable( ISerializeSource const& what, const Endpoint& destination );// { cancelReliable(sendReliable(what,destination)); } + void sendUnreliable( ISerializeSource const& what, const Endpoint& destination, bool openConnection = true );// { cancelReliable(sendReliable(what,destination)); } int getEndpointCount(); // for tracing only diff --git a/fdbrpc/genericactors.actor.h b/fdbrpc/genericactors.actor.h index 4bb99d8c25..9e72d59a41 100644 --- a/fdbrpc/genericactors.actor.h +++ b/fdbrpc/genericactors.actor.h @@ -84,11 +84,11 @@ ACTOR template void networkSender( Future input, Endpoint endpoint ) { try { T value = wait( input ); - FlowTransport::transport().sendUnreliable( SerializeBoolAnd(true, value), endpoint ); + FlowTransport::transport().sendUnreliable( SerializeBoolAnd(true, value), endpoint, false ); } catch (Error& err) { //if (err.code() == error_code_broken_promise) return; ASSERT( err.code() != error_code_actor_cancelled ); - FlowTransport::transport().sendUnreliable( SerializeBoolAnd(false, err), endpoint ); + FlowTransport::transport().sendUnreliable( SerializeBoolAnd(false, err), endpoint, false ); } }