Bug fix causing clients to sometimes (rarely) not reconnect to upgraded clusters. Reliable packets were being dropped to incompatible peers intentionally, but now this is only done if the peer is newer since successful communication with a newer peer will never be possible.
This commit is contained in:
parent
023799e9db
commit
c593d1c6a2
|
@ -226,9 +226,10 @@ struct Peer : NonCopyable {
|
|||
bool outgoingConnectionIdle; // We don't actually have a connection open and aren't trying to open one because we don't have anything to send
|
||||
double lastConnectTime;
|
||||
double reconnectionDelay;
|
||||
bool incompatibleProtocolVersionNewer;
|
||||
|
||||
explicit Peer( TransportData* transport, NetworkAddress const& destination )
|
||||
: transport(transport), destination(destination), outgoingConnectionIdle(false), lastConnectTime(0.0), reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true)
|
||||
: transport(transport), destination(destination), outgoingConnectionIdle(false), lastConnectTime(0.0), reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true), incompatibleProtocolVersionNewer(false)
|
||||
{
|
||||
connect = connectionKeeper(this);
|
||||
}
|
||||
|
@ -422,6 +423,7 @@ struct Peer : NonCopyable {
|
|||
// Try to recover, even from serious errors, by retrying
|
||||
|
||||
if(self->reliable.empty() && self->unsent.empty()) {
|
||||
TraceEvent("PeerDestroy").detail("PeerAddr", self->destination).error(e, true);
|
||||
self->connect.cancel();
|
||||
self->transport->peers.erase(self->destination);
|
||||
delete self;
|
||||
|
@ -573,6 +575,7 @@ ACTOR static Future<Void> connectionReader(
|
|||
state uint8_t* buffer_end = NULL;
|
||||
state bool expectConnectPacket = true;
|
||||
state bool compatible = false;
|
||||
state bool incompatibleProtocolVersionNewer = false;
|
||||
state NetworkAddress peerAddress;
|
||||
state uint64_t peerProtocolVersion = 0;
|
||||
|
||||
|
@ -613,7 +616,8 @@ ACTOR static Future<Void> connectionReader(
|
|||
connectionId = p->connectionId;
|
||||
}
|
||||
|
||||
if( (p->protocolVersion&compatibleProtocolVersionMask) != (currentProtocolVersion&compatibleProtocolVersionMask) ) {
|
||||
if( (p->protocolVersion & compatibleProtocolVersionMask) != (currentProtocolVersion & compatibleProtocolVersionMask) ) {
|
||||
incompatibleProtocolVersionNewer = p->protocolVersion > currentProtocolVersion;
|
||||
NetworkAddress addr = p->canonicalRemotePort ? NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) : conn->getPeerAddress();
|
||||
if(connectionId != 1) addr.port = 0;
|
||||
|
||||
|
@ -659,6 +663,7 @@ ACTOR static Future<Void> connectionReader(
|
|||
// Outgoing connection; port information should be what we expect
|
||||
TraceEvent("ConnectedOutgoing").detail("PeerAddr", NetworkAddress( p->canonicalRemoteIp, p->canonicalRemotePort ) ).suppressFor(1.0);
|
||||
peer->compatible = compatible;
|
||||
peer->incompatibleProtocolVersionNewer = incompatibleProtocolVersionNewer;
|
||||
if (!compatible)
|
||||
peer->transport->numIncompatibleConnections++;
|
||||
ASSERT( p->canonicalRemotePort == peerAddress.port );
|
||||
|
@ -668,6 +673,7 @@ ACTOR static Future<Void> connectionReader(
|
|||
}
|
||||
peer = transport->getPeer(peerAddress);
|
||||
peer->compatible = compatible;
|
||||
peer->incompatibleProtocolVersionNewer = incompatibleProtocolVersionNewer;
|
||||
if (!compatible)
|
||||
peer->transport->numIncompatibleConnections++;
|
||||
onConnected.send( peer );
|
||||
|
@ -859,7 +865,7 @@ static PacketID sendPacket( TransportData* self, ISerializeSource const& what, c
|
|||
Peer* peer = self->getPeer(destination.address, openConnection);
|
||||
|
||||
// If there isn't an open connection, a public address, or the peer isn't compatible, we can't send
|
||||
if (!peer || (peer->outgoingConnectionIdle && !destination.address.isPublic()) || (!peer->compatible && destination.token != WLTOKEN_PING_PACKET)) {
|
||||
if (!peer || (peer->outgoingConnectionIdle && !destination.address.isPublic()) || (peer->incompatibleProtocolVersionNewer && destination.token != WLTOKEN_PING_PACKET)) {
|
||||
TEST(true); // Can't send to private address without a compatible open connection
|
||||
return (PacketID)NULL;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue