From 3a5315d10cdae98154b47f5c2f892839eefd3953 Mon Sep 17 00:00:00 2001
From: Vishesh Yadav <vishesh_yadav@apple.com>
Date: Wed, 22 Apr 2020 19:38:01 -0700
Subject: [PATCH] FlowTransport: Don't immediately mark connections failed

In connectionKeeper(), when a connection is failed for
FAILURE_DETECTION_DELAY, then only mark connection as failed.

This is much closer to the original centralized behaviour, and also
adds more confidence on whether the connection is actually failed.
---
 fdbrpc/FailureMonitor.actor.cpp |  3 ---
 fdbrpc/FlowTransport.actor.cpp  | 43 +++++++++++++++++++++++----------
 fdbrpc/FlowTransport.h          |  5 +---
 3 files changed, 31 insertions(+), 20 deletions(-)

diff --git a/fdbrpc/FailureMonitor.actor.cpp b/fdbrpc/FailureMonitor.actor.cpp
index 7d985fe854..fcea9ec014 100644
--- a/fdbrpc/FailureMonitor.actor.cpp
+++ b/fdbrpc/FailureMonitor.actor.cpp
@@ -33,9 +33,6 @@ ACTOR Future<Void> waitForContinuousFailure(IFailureMonitor* monitor, Endpoint e
                                             double sustainedFailureDuration, double slope) {
 	state double startT = now();
 
-	// Since, FailureMonitoring is now localized we should add some slack for `connectionKeeper`
-	// to try reconnecting.
-	sustainedFailureDuration += FLOW_KNOBS->FAILURE_DETECTION_DELAY;
 	loop {
 		wait(monitor->onFailed(endpoint));
 		if (monitor->permanentlyFailed(endpoint)) return Void();
diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp
index 219aafc1a1..915f744bef 100644
--- a/fdbrpc/FlowTransport.actor.cpp
+++ b/fdbrpc/FlowTransport.actor.cpp
@@ -437,6 +437,8 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
 	ASSERT_WE_THINK(FlowTransport::transport().getLocalAddress() != self->destination);
 
 	state Optional<double> firstConnFailedTime = Optional<double>();
+	state int retryConnect = false;
+
 	loop {
 		try {
 			state Future<Void> delayedHealthUpdateF = Future<Void>();
@@ -445,12 +447,13 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
 				self->outgoingConnectionIdle = true;
 				// Wait until there is something to send.
 				while (self->unsent.empty()) {
-					if (self->destination.isPublic() &&
-					    IFailureMonitor::failureMonitor().getState(self->destination).isFailed()) {
-						break;
-					}
+					// Override waiting, if we are in failed state to update failure monitoring status.
+					Future<Void> retryConnectF = retryConnect ? delay(FLOW_KNOBS->SERVER_REQUEST_INTERVAL) : Never();
 
-					wait (self->dataToSend.onTrigger());
+					choose {
+						when(wait(self->dataToSend.onTrigger())) {}
+						when(wait(retryConnectF)) { break; }
+					}
 				}
 
 				ASSERT(self->destination.isPublic());
@@ -480,6 +483,7 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
 									when(wait(delayedHealthUpdateF)) {
 										conn->close();
 										conn = Reference<IConnection>();
+										retryConnect = false;
 										continue;
 									}
 									when(wait(self->dataToSend.onTrigger())) {}
@@ -546,6 +550,18 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
 				firstConnFailedTime = now();
 			}
 
+			// Don't immediately mark connection as failed. To stay closed to earlier behaviour of centralized
+			// failure monitoring, wait until connection stays failed for FLOW_KNOBS->FAILURE_DETECTION_DELAY timeout.
+			retryConnect = self->destination.isPublic() && e.code() == error_code_connection_failed;
+			if (e.code() == error_code_connection_failed) {
+				if (!self->destination.isPublic()) {
+					// Can't connect back to non-public addresses.
+					IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true));
+				} else if (now() - firstConnFailedTime.get() > FLOW_KNOBS->FAILURE_DETECTION_DELAY) {
+					IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true));
+				}
+			}
+
 			self->discardUnreliablePackets();
 			reader = Future<Void>();
 			bool ok = e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled ||
@@ -566,10 +582,6 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
 						.detail("PeerAddr", self->destination);
 			}
 
-			if (e.code() == error_code_connection_failed) {
-				IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true));
-			}
-
 			if(self->destination.isPublic() 
 				&& IFailureMonitor::failureMonitor().getState(self->destination).isAvailable()
 				&& !FlowTransport::transport().isClient()) 
@@ -605,13 +617,20 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
 				TraceEvent("PeerDestroy").error(e).suppressFor(1.0).detail("PeerAddr", self->destination);
 				self->connect.cancel();
 				self->transport->peers.erase(self->destination);
-				IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true));
 				return Void();
 			}
 		}
 	}
 }
 
+Peer::Peer(TransportData* transport, NetworkAddress const& destination)
+  : transport(transport), destination(destination), outgoingConnectionIdle(true), lastConnectTime(0.0),
+    reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true), outstandingReplies(0),
+    incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()) {
+
+	IFailureMonitor::failureMonitor().setStatus(destination, FailureStatus(false));
+}
+
 void Peer::send(PacketBuffer* pb, ReliablePacket* rp, bool firstUnsent) {
 	unsent.setWriteBuffer(pb);
 	if (rp) reliable.insert(rp);
@@ -1163,9 +1182,7 @@ void FlowTransport::addPeerReference(const Endpoint& endpoint, bool isStream) {
 		return;
 
 	Reference<Peer> peer = self->getOrOpenPeer(endpoint.getPrimaryAddress());
-
-	if(peer->peerReferences == -1) {
-		IFailureMonitor::failureMonitor().setStatus(endpoint.getPrimaryAddress(), FailureStatus(false));
+	if (peer->peerReferences == -1) {
 		peer->peerReferences = 1;
 	} else {
 		peer->peerReferences++;
diff --git a/fdbrpc/FlowTransport.h b/fdbrpc/FlowTransport.h
index 3fd39cadeb..597fcab626 100644
--- a/fdbrpc/FlowTransport.h
+++ b/fdbrpc/FlowTransport.h
@@ -124,10 +124,7 @@ struct Peer : public ReferenceCounted<Peer> {
 	double lastDataPacketSentTime;
 	int outstandingReplies;
 
-	explicit Peer(TransportData* transport, NetworkAddress const& destination)
-	  : transport(transport), destination(destination), outgoingConnectionIdle(true), lastConnectTime(0.0),
-	    reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true), outstandingReplies(0),
-	    incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()) {}
+	explicit Peer(TransportData* transport, NetworkAddress const& destination);
 
 	void send(PacketBuffer* pb, ReliablePacket* rp, bool firstUnsent);