Fixing delayed health updater to mark address failed when too many connections triggers before new connection starts, instead of after

This commit is contained in:
Josh Slocum 2022-03-11 16:50:23 -06:00
parent 82eef0c7f7
commit a2bbb188c3
1 changed files with 18 additions and 13 deletions

View File

@ -545,28 +545,20 @@ ACTOR Future<Void> connectionWriter(Reference<Peer> self, Reference<IConnection>
}
}
ACTOR Future<Void> delayedHealthUpdate(NetworkAddress address) {
ACTOR Future<Void> delayedHealthUpdate(NetworkAddress address, bool* tooManyConnectionsClosed) {
state double start = now();
state bool delayed = false;
loop {
if (FLOW_KNOBS->HEALTH_MONITOR_MARK_FAILED_UNSTABLE_CONNECTIONS &&
FlowTransport::transport().healthMonitor()->tooManyConnectionsClosed(address) && address.isPublic()) {
if (!delayed) {
TraceEvent("TooManyConnectionsClosedMarkFailed")
.detail("Dest", address)
.detail("StartTime", start)
.detail("ClosedCount", FlowTransport::transport().healthMonitor()->closedConnectionsCount(address));
IFailureMonitor::failureMonitor().setStatus(address, FailureStatus(true));
}
delayed = true;
wait(delayJittered(FLOW_KNOBS->MAX_RECONNECTION_TIME * 2.0));
} else {
if (delayed) {
if (*tooManyConnectionsClosed) {
TraceEvent("TooManyConnectionsClosedMarkAvailable")
.detail("Dest", address)
.detail("StartTime", start)
.detail("TimeElapsed", now() - start)
.detail("ClosedCount", FlowTransport::transport().healthMonitor()->closedConnectionsCount(address));
*tooManyConnectionsClosed = false;
}
IFailureMonitor::failureMonitor().setStatus(address, FailureStatus(false));
break;
@ -586,6 +578,7 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
state Future<Void> delayedHealthUpdateF;
state Optional<double> firstConnFailedTime = Optional<double>();
state int retryConnect = false;
state bool tooManyConnectionsClosed = false;
loop {
try {
@ -635,7 +628,8 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(false));
}
if (self->unsent.empty()) {
delayedHealthUpdateF = delayedHealthUpdate(self->destination);
delayedHealthUpdateF =
delayedHealthUpdate(self->destination, &tooManyConnectionsClosed);
choose {
when(wait(delayedHealthUpdateF)) {
conn->close();
@ -675,7 +669,7 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
try {
self->transport->countConnEstablished++;
if (!delayedHealthUpdateF.isValid())
delayedHealthUpdateF = delayedHealthUpdate(self->destination);
delayedHealthUpdateF = delayedHealthUpdate(self->destination, &tooManyConnectionsClosed);
wait(connectionWriter(self, conn) || reader || connectionMonitor(self) ||
self->resetConnection.onTrigger());
TraceEvent("ConnectionReset", conn ? conn->getDebugID() : UID())
@ -761,6 +755,17 @@ ACTOR Future<Void> connectionKeeper(Reference<Peer> self,
if (conn) {
if (self->destination.isPublic() && e.code() == error_code_connection_failed) {
FlowTransport::transport().healthMonitor()->reportPeerClosed(self->destination);
if (FLOW_KNOBS->HEALTH_MONITOR_MARK_FAILED_UNSTABLE_CONNECTIONS &&
FlowTransport::transport().healthMonitor()->tooManyConnectionsClosed(self->destination) &&
self->destination.isPublic()) {
TraceEvent("TooManyConnectionsClosedMarkFailed")
.detail("Dest", self->destination)
.detail(
"ClosedCount",
FlowTransport::transport().healthMonitor()->closedConnectionsCount(self->destination));
tooManyConnectionsClosed = true;
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true));
}
}
conn->close();