FlowTransport: Don't immediately mark connections failed

In connectionKeeper(), when a connection is failed for
FAILURE_DETECTION_DELAY, then only mark connection as failed.

This is much closer to the original centralized behaviour, and also
adds more confidence on whether the connection is actually failed.
This commit is contained in:
Vishesh Yadav 2020-04-22 19:38:01 -07:00
parent 500a265d20
commit 3a5315d10c
3 changed files with 31 additions and 20 deletions

View File

@ -33,9 +33,6 @@ ACTOR Future<Void> waitForContinuousFailure(IFailureMonitor* monitor, Endpoint e
double sustainedFailureDuration, double slope) {
state double startT = now();
// Since, FailureMonitoring is now localized we should add some slack for `connectionKeeper`
// to try reconnecting.
sustainedFailureDuration += FLOW_KNOBS->FAILURE_DETECTION_DELAY;
loop {
wait(monitor->onFailed(endpoint));
if (monitor->permanentlyFailed(endpoint)) return Void();

View File

@ -437,6 +437,8 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
ASSERT_WE_THINK(FlowTransport::transport().getLocalAddress() != self->destination);
state Optional<double> firstConnFailedTime = Optional<double>();
state int retryConnect = false;
loop {
try {
state Future<Void> delayedHealthUpdateF = Future<Void>();
@ -445,12 +447,13 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
self->outgoingConnectionIdle = true;
// Wait until there is something to send.
while (self->unsent.empty()) {
if (self->destination.isPublic() &&
IFailureMonitor::failureMonitor().getState(self->destination).isFailed()) {
break;
}
// Override waiting, if we are in failed state to update failure monitoring status.
Future<Void> retryConnectF = retryConnect ? delay(FLOW_KNOBS->SERVER_REQUEST_INTERVAL) : Never();
wait (self->dataToSend.onTrigger());
choose {
when(wait(self->dataToSend.onTrigger())) {}
when(wait(retryConnectF)) { break; }
}
}
ASSERT(self->destination.isPublic());
@ -480,6 +483,7 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
when(wait(delayedHealthUpdateF)) {
conn->close();
conn = Reference<IConnection>();
retryConnect = false;
continue;
}
when(wait(self->dataToSend.onTrigger())) {}
@ -546,6 +550,18 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
firstConnFailedTime = now();
}
// Don't immediately mark connection as failed. To stay closed to earlier behaviour of centralized
// failure monitoring, wait until connection stays failed for FLOW_KNOBS->FAILURE_DETECTION_DELAY timeout.
retryConnect = self->destination.isPublic() && e.code() == error_code_connection_failed;
if (e.code() == error_code_connection_failed) {
if (!self->destination.isPublic()) {
// Can't connect back to non-public addresses.
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true));
} else if (now() - firstConnFailedTime.get() > FLOW_KNOBS->FAILURE_DETECTION_DELAY) {
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true));
}
}
self->discardUnreliablePackets();
reader = Future<Void>();
bool ok = e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled ||
@ -566,10 +582,6 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
.detail("PeerAddr", self->destination);
}
if (e.code() == error_code_connection_failed) {
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true));
}
if(self->destination.isPublic()
&& IFailureMonitor::failureMonitor().getState(self->destination).isAvailable()
&& !FlowTransport::transport().isClient())
@ -605,13 +617,20 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
TraceEvent("PeerDestroy").error(e).suppressFor(1.0).detail("PeerAddr", self->destination);
self->connect.cancel();
self->transport->peers.erase(self->destination);
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true));
return Void();
}
}
}
}
Peer::Peer(TransportData* transport, NetworkAddress const& destination)
: transport(transport), destination(destination), outgoingConnectionIdle(true), lastConnectTime(0.0),
reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true), outstandingReplies(0),
incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()) {
IFailureMonitor::failureMonitor().setStatus(destination, FailureStatus(false));
}
void Peer::send(PacketBuffer* pb, ReliablePacket* rp, bool firstUnsent) {
unsent.setWriteBuffer(pb);
if (rp) reliable.insert(rp);
@ -1163,9 +1182,7 @@ void FlowTransport::addPeerReference(const Endpoint& endpoint, bool isStream) {
return;
Reference<Peer> peer = self->getOrOpenPeer(endpoint.getPrimaryAddress());
if(peer->peerReferences == -1) {
IFailureMonitor::failureMonitor().setStatus(endpoint.getPrimaryAddress(), FailureStatus(false));
if (peer->peerReferences == -1) {
peer->peerReferences = 1;
} else {
peer->peerReferences++;

View File

@ -124,10 +124,7 @@ struct Peer : public ReferenceCounted<Peer> {
double lastDataPacketSentTime;
int outstandingReplies;
explicit Peer(TransportData* transport, NetworkAddress const& destination)
: transport(transport), destination(destination), outgoingConnectionIdle(true), lastConnectTime(0.0),
reconnectionDelay(FLOW_KNOBS->INITIAL_RECONNECTION_TIME), compatible(true), outstandingReplies(0),
incompatibleProtocolVersionNewer(false), peerReferences(-1), bytesReceived(0), lastDataPacketSentTime(now()) {}
explicit Peer(TransportData* transport, NetworkAddress const& destination);
void send(PacketBuffer* pb, ReliablePacket* rp, bool firstUnsent);