fix: destroy peers that are inactive

do not open new connections to send replies
This commit is contained in:
Evan Tschannen 2018-07-08 10:26:41 -07:00
parent 4dd18afb84
commit 0e97ce79b4
3 changed files with 33 additions and 23 deletions

View File

@ -157,7 +157,7 @@ public:
countConnClosedWithoutError.init(LiteralStringRef("Net2.CountConnClosedWithoutError"));
}
struct Peer* getPeer( NetworkAddress const& address, bool doConnect = true );
struct Peer* getPeer( NetworkAddress const& address, bool openConnection = true );
NetworkAddress localAddress;
std::map<NetworkAddress, struct Peer*> peers;
@ -212,11 +212,9 @@ static_assert( sizeof(ConnectPacket) == CONNECT_PACKET_V2_SIZE, "ConnectPacket p
static Future<Void> connectionReader( TransportData* const& transport, Reference<IConnection> const& conn, Peer* const& peer, Promise<Peer*> const& onConnected );
static PacketID sendPacket( TransportData* self, ISerializeSource const& what, const Endpoint& destination, bool reliable );
static PacketID sendPacket( TransportData* self, ISerializeSource const& what, const Endpoint& destination, bool reliable, bool openConnection );
struct Peer : NonCopyable {
// FIXME: Peers don't die!
TransportData* transport;
NetworkAddress destination;
UnsentPacketQueue unsent;
@ -229,12 +227,10 @@ struct Peer : NonCopyable {
double lastConnectTime;
double reconnectionDelay;
explicit Peer( TransportData* transport, NetworkAddress const& destination, bool doConnect = true )
: transport(transport), destination(destination), outgoingConnectionIdle(!doConnect), lastConnectTime(0.0), reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true)
explicit Peer( TransportData* transport, NetworkAddress const& destination )
: transport(transport), destination(destination), outgoingConnectionIdle(false), lastConnectTime(0.0), reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true)
{
if(doConnect) {
connect = connectionKeeper(this);
}
connect = connectionKeeper(this);
}
void send(PacketBuffer* pb, ReliablePacket* rp, bool firstUnsent) {
@ -422,6 +418,13 @@ struct Peer : NonCopyable {
IFailureMonitor::failureMonitor().notifyDisconnect( self->destination ); //< Clients might send more packets in response, which needs to go out on the next connection
if (e.code() == error_code_actor_cancelled) throw;
// Try to recover, even from serious errors, by retrying
if(self->reliable.empty() && self->unsent.empty()) {
self->connect.cancel();
self->transport->peers.erase(self->destination);
delete self;
return Void();
}
}
}
}
@ -452,7 +455,7 @@ ACTOR static void deliver( TransportData* self, Endpoint destination, ArenaReade
sendPacket( self,
SerializeSource<Endpoint>( Endpoint( self->localAddress, destination.token ) ),
Endpoint( destination.address, WLTOKEN_ENDPOINT_NOT_FOUND),
false );
false, true );
}
if( inReadSocket )
@ -734,10 +737,17 @@ ACTOR static Future<Void> listen( TransportData* self, NetworkAddress listenAddr
}
}
Peer* TransportData::getPeer( NetworkAddress const& address, bool doConnect ) {
auto& peer = peers[address];
if (!peer) peer = new Peer(this, address, doConnect);
return peer;
Peer* TransportData::getPeer( NetworkAddress const& address, bool openConnection ) {
auto peer = peers.find(address);
if (peer != peers.end()) {
return peer->second;
}
if(!openConnection) {
return NULL;
}
Peer* newPeer = new Peer(this, address);
peers[address] = newPeer;
return newPeer;
}
ACTOR static Future<Void> multiVersionCleanupWorker( TransportData* self ) {
@ -821,7 +831,7 @@ void FlowTransport::addWellKnownEndpoint( Endpoint& endpoint, NetworkMessageRece
ASSERT( endpoint.token == otoken );
}
static PacketID sendPacket( TransportData* self, ISerializeSource const& what, const Endpoint& destination, bool reliable ) {
static PacketID sendPacket( TransportData* self, ISerializeSource const& what, const Endpoint& destination, bool reliable, bool openConnection ) {
if (destination.address == self->localAddress) {
TEST(true); // "Loopback" delivery
// SOMEDAY: Would it be better to avoid (de)serialization by doing this check in flow?
@ -844,10 +854,10 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c
++self->countPacketsGenerated;
Peer* peer = self->getPeer(destination.address);
Peer* peer = self->getPeer(destination.address, openConnection);
// If there isn't an open connection, a public address, or the peer isn't compatible, we can't send
if ((peer->outgoingConnectionIdle && !destination.address.isPublic()) || (!peer->compatible && destination.token != WLTOKEN_PING_PACKET)) {
if (!peer || (peer->outgoingConnectionIdle && !destination.address.isPublic()) || (!peer->compatible && destination.token != WLTOKEN_PING_PACKET)) {
TEST(true); // Can't send to private address without a compatible open connection
return (PacketID)NULL;
}
@ -935,7 +945,7 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c
}
PacketID FlowTransport::sendReliable( ISerializeSource const& what, const Endpoint& destination ) {
return sendPacket( self, what, destination, true );
return sendPacket( self, what, destination, true, true );
}
void FlowTransport::cancelReliable( PacketID pid ) {
@ -944,8 +954,8 @@ void FlowTransport::cancelReliable( PacketID pid ) {
// SOMEDAY: Call reliable.compact() if a lot of memory is wasted in PacketBuffers by formerly reliable packets mixed with a few reliable ones. Don't forget to delref the new PacketBuffers since they are unsent.
}
void FlowTransport::sendUnreliable( ISerializeSource const& what, const Endpoint& destination ) {
sendPacket( self, what, destination, false );
void FlowTransport::sendUnreliable( ISerializeSource const& what, const Endpoint& destination, bool openConnection ) {
sendPacket( self, what, destination, false, openConnection );
}
int FlowTransport::getEndpointCount() {

View File

@ -102,7 +102,7 @@ public:
// Makes PacketID "unreliable" (either the data or a connection close event will be delivered
// eventually). It can still be used safely to send a reply to a "reliable" request.
void sendUnreliable( ISerializeSource const& what, const Endpoint& destination );// { cancelReliable(sendReliable(what,destination)); }
void sendUnreliable( ISerializeSource const& what, const Endpoint& destination, bool openConnection = true );// { cancelReliable(sendReliable(what,destination)); }
int getEndpointCount();
// for tracing only

View File

@ -84,11 +84,11 @@ ACTOR template <class T>
void networkSender( Future<T> input, Endpoint endpoint ) {
try {
T value = wait( input );
FlowTransport::transport().sendUnreliable( SerializeBoolAnd<T>(true, value), endpoint );
FlowTransport::transport().sendUnreliable( SerializeBoolAnd<T>(true, value), endpoint, false );
} catch (Error& err) {
//if (err.code() == error_code_broken_promise) return;
ASSERT( err.code() != error_code_actor_cancelled );
FlowTransport::transport().sendUnreliable( SerializeBoolAnd<Error>(false, err), endpoint );
FlowTransport::transport().sendUnreliable( SerializeBoolAnd<Error>(false, err), endpoint, false );
}
}