diff --git a/fdbrpc/FailureMonitor.actor.cpp b/fdbrpc/FailureMonitor.actor.cpp index 7d985fe854..fcea9ec014 100644 --- a/fdbrpc/FailureMonitor.actor.cpp +++ b/fdbrpc/FailureMonitor.actor.cpp @@ -33,9 +33,6 @@ ACTOR Future waitForContinuousFailure(IFailureMonitor* monitor, Endpoint e double sustainedFailureDuration, double slope) { state double startT = now(); - // Since, FailureMonitoring is now localized we should add some slack for `connectionKeeper` - // to try reconnecting. - sustainedFailureDuration += FLOW_KNOBS->FAILURE_DETECTION_DELAY; loop { wait(monitor->onFailed(endpoint)); if (monitor->permanentlyFailed(endpoint)) return Void(); diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index 219aafc1a1..915f744bef 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -437,6 +437,8 @@ ACTOR Future connectionKeeper( Reference self, ASSERT_WE_THINK(FlowTransport::transport().getLocalAddress() != self->destination); state Optional firstConnFailedTime = Optional(); + state int retryConnect = false; + loop { try { state Future delayedHealthUpdateF = Future(); @@ -445,12 +447,13 @@ ACTOR Future connectionKeeper( Reference self, self->outgoingConnectionIdle = true; // Wait until there is something to send. while (self->unsent.empty()) { - if (self->destination.isPublic() && - IFailureMonitor::failureMonitor().getState(self->destination).isFailed()) { - break; - } + // Override waiting, if we are in failed state to update failure monitoring status. + Future retryConnectF = retryConnect ? delay(FLOW_KNOBS->SERVER_REQUEST_INTERVAL) : Never(); - wait (self->dataToSend.onTrigger()); + choose { + when(wait(self->dataToSend.onTrigger())) {} + when(wait(retryConnectF)) { break; } + } } ASSERT(self->destination.isPublic()); @@ -480,6 +483,7 @@ ACTOR Future connectionKeeper( Reference self, when(wait(delayedHealthUpdateF)) { conn->close(); conn = Reference(); + retryConnect = false; continue; } when(wait(self->dataToSend.onTrigger())) {} @@ -546,6 +550,18 @@ ACTOR Future connectionKeeper( Reference self, firstConnFailedTime = now(); } + // Don't immediately mark connection as failed. To stay closed to earlier behaviour of centralized + // failure monitoring, wait until connection stays failed for FLOW_KNOBS->FAILURE_DETECTION_DELAY timeout. + retryConnect = self->destination.isPublic() && e.code() == error_code_connection_failed; + if (e.code() == error_code_connection_failed) { + if (!self->destination.isPublic()) { + // Can't connect back to non-public addresses. + IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true)); + } else if (now() - firstConnFailedTime.get() > FLOW_KNOBS->FAILURE_DETECTION_DELAY) { + IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true)); + } + } + self->discardUnreliablePackets(); reader = Future(); bool ok = e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled || @@ -566,10 +582,6 @@ ACTOR Future connectionKeeper( Reference self, .detail("PeerAddr", self->destination); } - if (e.code() == error_code_connection_failed) { - IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true)); - } - if(self->destination.isPublic() && IFailureMonitor::failureMonitor().getState(self->destination).isAvailable() && !FlowTransport::transport().isClient()) @@ -605,13 +617,20 @@ ACTOR Future connectionKeeper( Reference self, TraceEvent("PeerDestroy").error(e).suppressFor(1.0).detail("PeerAddr", self->destination); self->connect.cancel(); self->transport->peers.erase(self->destination); - IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true)); return Void(); } } } } +Peer::Peer(TransportData* transport, NetworkAddress const& destination) + : transport(transport), destination(destination), outgoingConnectionIdle(true), lastConnectTime(0.0), + reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true), outstandingReplies(0), + incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()) { + + IFailureMonitor::failureMonitor().setStatus(destination, FailureStatus(false)); +} + void Peer::send(PacketBuffer* pb, ReliablePacket* rp, bool firstUnsent) { unsent.setWriteBuffer(pb); if (rp) reliable.insert(rp); @@ -1163,9 +1182,7 @@ void FlowTransport::addPeerReference(const Endpoint& endpoint, bool isStream) { return; Reference peer = self->getOrOpenPeer(endpoint.getPrimaryAddress()); - - if(peer->peerReferences == -1) { - IFailureMonitor::failureMonitor().setStatus(endpoint.getPrimaryAddress(), FailureStatus(false)); + if (peer->peerReferences == -1) { peer->peerReferences = 1; } else { peer->peerReferences++; diff --git a/fdbrpc/FlowTransport.h b/fdbrpc/FlowTransport.h index 3fd39cadeb..597fcab626 100644 --- a/fdbrpc/FlowTransport.h +++ b/fdbrpc/FlowTransport.h @@ -124,10 +124,7 @@ struct Peer : public ReferenceCounted { double lastDataPacketSentTime; int outstandingReplies; - explicit Peer(TransportData* transport, NetworkAddress const& destination) - : transport(transport), destination(destination), outgoingConnectionIdle(true), lastConnectTime(0.0), - reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true), outstandingReplies(0), - incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()) {} + explicit Peer(TransportData* transport, NetworkAddress const& destination); void send(PacketBuffer* pb, ReliablePacket* rp, bool firstUnsent);