Merge pull request #587 from etschannen/feature-remote-logs

close unneeded connections
This commit is contained in:
Alec Grieser 2018-07-10 13:27:15 -07:00 committed by GitHub
commit d5a23642a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 44 additions and 4 deletions

View File

@ -226,9 +226,10 @@ struct Peer : NonCopyable {
bool outgoingConnectionIdle; // We don't actually have a connection open and aren't trying to open one because we don't have anything to send
double lastConnectTime;
double reconnectionDelay;
int peerReferences;
explicit Peer( TransportData* transport, NetworkAddress const& destination )
: transport(transport), destination(destination), outgoingConnectionIdle(false), lastConnectTime(0.0), reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true)
: transport(transport), destination(destination), outgoingConnectionIdle(false), lastConnectTime(0.0), reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true), peerReferences(-1)
{
connect = connectionKeeper(this);
}
@ -305,6 +306,10 @@ struct Peer : NonCopyable {
state RequestStream< ReplyPromise<Void> > remotePing( Endpoint( peer->destination, WLTOKEN_PING_PACKET ) );
loop {
if(peer->peerReferences == 0 && peer->reliable.empty() && peer->unsent.empty()) {
throw connection_failed();
}
Void _ = wait( delayJittered( FLOW_KNOBS->CONNECTION_MONITOR_LOOP_TIME ) );
// SOMEDAY: Stop monitoring and close the connection after a long period of inactivity with no reliable or onDisconnect requests outstanding
@ -421,7 +426,7 @@ struct Peer : NonCopyable {
if (e.code() == error_code_actor_cancelled) throw;
// Try to recover, even from serious errors, by retrying
if(self->reliable.empty() && self->unsent.empty()) {
if(self->peerReferences <= 0 && self->reliable.empty() && self->unsent.empty()) {
self->connect.cancel();
self->transport->peers.erase(self->destination);
delete self;
@ -809,6 +814,30 @@ void FlowTransport::loadedEndpoint( Endpoint& endpoint ) {
endpoint.address = g_currentDeliveryPeerAddress;
}
void FlowTransport::addPeerReference( const Endpoint& endpoint, NetworkMessageReceiver* receiver ) {
if (!receiver->isStream() || !endpoint.address.isValid()) return;
Peer* peer = self->getPeer(endpoint.address);
if(peer->peerReferences == -1) {
peer->peerReferences = 1;
} else {
peer->peerReferences++;
}
}
void FlowTransport::removePeerReference( const Endpoint& endpoint, NetworkMessageReceiver* receiver ) {
if (!receiver->isStream() || !endpoint.address.isValid()) return;
Peer* peer = self->getPeer(endpoint.address, false);
if(peer) {
peer->peerReferences--;
if(peer->peerReferences < 0) {
TraceEvent(SevError, "InvalidPeerReferences").detail("References", peer->peerReferences).detail("Address", endpoint.address).detail("Token", endpoint.token);
}
if(peer->peerReferences == 0 && peer->reliable.empty() && peer->unsent.empty()) {
peer->incompatibleDataRead.trigger();
}
}
}
void FlowTransport::addEndpoint( Endpoint& endpoint, NetworkMessageReceiver* receiver, uint32_t taskID ) {
endpoint.token = g_random->randomUniqueID();
if (receiver->isStream()) {

View File

@ -83,6 +83,12 @@ public:
std::map<NetworkAddress, std::pair<uint64_t, double>>* getIncompatiblePeers();
// Returns the same of all peers that have attempted to connect, but have incompatible protocol versions
void addPeerReference( const Endpoint&, NetworkMessageReceiver* );
// Signal that a peer connection is being used, even if no messages are currently being sent to the peer
void removePeerReference( const Endpoint&, NetworkMessageReceiver* );
// Signal that a peer connection is no longer being used
void addEndpoint( Endpoint& endpoint, NetworkMessageReceiver*, uint32_t taskID );
// Sets endpoint to be a new local endpoint which delivers messages to the given receiver

View File

@ -34,10 +34,15 @@ struct FlowReceiver : private NetworkMessageReceiver {
bool m_isLocalEndpoint;
FlowReceiver() : m_isLocalEndpoint(false) {}
FlowReceiver(Endpoint const& remoteEndpoint) : endpoint(remoteEndpoint), m_isLocalEndpoint(false) {}
FlowReceiver(Endpoint const& remoteEndpoint) : endpoint(remoteEndpoint), m_isLocalEndpoint(false) {
FlowTransport::transport().addPeerReference(endpoint, this);
}
~FlowReceiver() {
if (m_isLocalEndpoint)
if (m_isLocalEndpoint) {
FlowTransport::transport().removeEndpoint(endpoint, this);
} else {
FlowTransport::transport().removePeerReference(endpoint, this);
}
}
bool isLocalEndpoint() { return m_isLocalEndpoint; }