From c593d1c6a25f96ba38d62155dfa836bc73e9a77c Mon Sep 17 00:00:00 2001 From: Stephen Atherton Date: Fri, 27 Jul 2018 20:42:06 -0700 Subject: [PATCH] Bug fix causing clients to sometimes (rarely) not reconnect to upgraded clusters. Reliable packets were being dropped to incompatible peers intentionally, but now this is only done if the peer is newer since successful communication with a newer peer will never be possible. --- fdbrpc/FlowTransport.actor.cpp | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index 349f43ecb2..9a821b05ae 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -226,9 +226,10 @@ struct Peer : NonCopyable { bool outgoingConnectionIdle; // We don't actually have a connection open and aren't trying to open one because we don't have anything to send double lastConnectTime; double reconnectionDelay; + bool incompatibleProtocolVersionNewer; explicit Peer( TransportData* transport, NetworkAddress const& destination ) - : transport(transport), destination(destination), outgoingConnectionIdle(false), lastConnectTime(0.0), reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true) + : transport(transport), destination(destination), outgoingConnectionIdle(false), lastConnectTime(0.0), reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true), incompatibleProtocolVersionNewer(false) { connect = connectionKeeper(this); } @@ -422,6 +423,7 @@ struct Peer : NonCopyable { // Try to recover, even from serious errors, by retrying if(self->reliable.empty() && self->unsent.empty()) { + TraceEvent("PeerDestroy").detail("PeerAddr", self->destination).error(e, true); self->connect.cancel(); self->transport->peers.erase(self->destination); delete self; @@ -573,6 +575,7 @@ ACTOR static Future connectionReader( state uint8_t* buffer_end = NULL; state bool expectConnectPacket = true; state bool compatible = false; + state bool incompatibleProtocolVersionNewer = false; state NetworkAddress peerAddress; state uint64_t peerProtocolVersion = 0; @@ -613,7 +616,8 @@ ACTOR static Future connectionReader( connectionId = p->connectionId; } - if( (p->protocolVersion&compatibleProtocolVersionMask) != (currentProtocolVersion&compatibleProtocolVersionMask) ) { + if( (p->protocolVersion & compatibleProtocolVersionMask) != (currentProtocolVersion & compatibleProtocolVersionMask) ) { + incompatibleProtocolVersionNewer = p->protocolVersion > currentProtocolVersion; NetworkAddress addr = p->canonicalRemotePort ? NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) : conn->getPeerAddress(); if(connectionId != 1) addr.port = 0; @@ -659,6 +663,7 @@ ACTOR static Future connectionReader( // Outgoing connection; port information should be what we expect TraceEvent("ConnectedOutgoing").detail("PeerAddr", NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) ).suppressFor(1.0); peer->compatible = compatible; + peer->incompatibleProtocolVersionNewer = incompatibleProtocolVersionNewer; if (!compatible) peer->transport->numIncompatibleConnections++; ASSERT( p->canonicalRemotePort == peerAddress.port ); @@ -668,6 +673,7 @@ ACTOR static Future connectionReader( } peer = transport->getPeer(peerAddress); peer->compatible = compatible; + peer->incompatibleProtocolVersionNewer = incompatibleProtocolVersionNewer; if (!compatible) peer->transport->numIncompatibleConnections++; onConnected.send( peer ); @@ -859,7 +865,7 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c Peer* peer = self->getPeer(destination.address, openConnection); // If there isn't an open connection, a public address, or the peer isn't compatible, we can't send - if (!peer || (peer->outgoingConnectionIdle && !destination.address.isPublic()) || (!peer->compatible && destination.token != WLTOKEN_PING_PACKET)) { + if (!peer || (peer->outgoingConnectionIdle && !destination.address.isPublic()) || (peer->incompatibleProtocolVersionNewer && destination.token != WLTOKEN_PING_PACKET)) { TEST(true); // Can't send to private address without a compatible open connection return (PacketID)NULL; }