failMon: For clients remove expireFailure and report failures only during connect

This commit is contained in:
Vishesh Yadav 2019-05-21 16:39:08 -07:00
parent 6fa7081a21
commit 4316ef9ec6
4 changed files with 24 additions and 55 deletions

View File

@ -570,11 +570,6 @@ ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<Cluster
choose {
when( ClientDBInfo ni = wait( clusterInterface->get().present() ? brokenPromiseToNever( clusterInterface->get().get().openDatabase.getReply( req ) ) : Never() ) ) {
TraceEvent("ClientInfoChange").detail("ChangeID", ni.id);
if (FlowTransport::transport().isClient()) {
for (const auto& proxy : ni.proxies) {
IFailureMonitor::failureMonitor().setStatus(proxy.address(), FailureStatus(false));
}
}
outInfo->set(ni);
}
when( wait( clusterInterface->onChange() ) ) {

View File

@ -62,34 +62,13 @@ Future<Void> IFailureMonitor::onFailedFor( Endpoint const& endpoint, double sust
return waitForContinuousFailure( this, endpoint, sustainedFailureDuration, slope );
}
ACTOR Future<Void> expireFailedDelayedMonitor(std::map<NetworkAddress, double>* expireMap) {
state std::set<NetworkAddress> toRemove;
loop {
for (const auto& p : *expireMap) {
if (p.second <= now()) {
toRemove.insert(p.first);
}
}
for (const auto& addr : toRemove) {
expireMap->erase(addr);
IFailureMonitor::failureMonitor().setStatus(addr, FailureStatus(false));
}
toRemove.clear();
wait(delay(2));
}
}
SimpleFailureMonitor::SimpleFailureMonitor()
: endpointKnownFailed() {
if (FlowTransport::transport().isClient())
expireMonitor = expireFailedDelayedMonitor(&expireMap);
//
}
SimpleFailureMonitor::~SimpleFailureMonitor() {
expireMonitor.cancel();
//
}
void SimpleFailureMonitor::setStatus( NetworkAddress const& address, FailureStatus const& status ) {
@ -189,9 +168,4 @@ bool SimpleFailureMonitor::permanentlyFailed( Endpoint const& endpoint ) {
void SimpleFailureMonitor::reset() {
addressStatus = std::unordered_map< NetworkAddress, FailureStatus >();
endpointKnownFailed.resetNoWaiting();
expireMap.clear();
}
void SimpleFailureMonitor::expireFailedDelayed(const NetworkAddress& address) {
expireMap[address] = now() + 60;
}

View File

@ -120,9 +120,6 @@ public:
// Returns when the status of the given endpoint has continuously been "failed" for sustainedFailureDuration + (elapsedTime*sustainedFailureSlope)
Future<Void> onFailedFor( Endpoint const& endpoint, double sustainedFailureDuration, double sustainedFailureSlope = 0.0 );
// Expires failed status of peers after certain delay.
virtual void expireFailedDelayed(NetworkAddress const& address) = 0;
// Returns the failure monitor that the calling machine should use
static IFailureMonitor& failureMonitor() { return *static_cast<IFailureMonitor*>((void*) g_network->global(INetwork::enFailureMonitor)); }
};
@ -130,6 +127,7 @@ public:
// SimpleFailureMonitor is the sole implementation of IFailureMonitor. It has no
// failure detection logic; it just implements the interface and reacts to setStatus() etc.
// Initially all addresses are considered failed, but all endpoints of a non-failed address are considered OK.
class SimpleFailureMonitor : public IFailureMonitor {
public:
SimpleFailureMonitor();
@ -144,16 +142,12 @@ public:
virtual Future<Void> onDisconnectOrFailure( Endpoint const& endpoint );
virtual bool onlyEndpointFailed( Endpoint const& endpoint );
virtual bool permanentlyFailed( Endpoint const& endpoint );
virtual void expireFailedDelayed(NetworkAddress const& address);
void reset();
private:
std::unordered_map< NetworkAddress, FailureStatus > addressStatus;
YieldedAsyncMap< Endpoint, bool > endpointKnownFailed;
std::map<NetworkAddress, double> expireMap;
Future<Void> expireMonitor;
friend class OnStateChangedActorActor;
};

View File

@ -465,21 +465,23 @@ struct Peer : NonCopyable {
TraceEvent(SevDebug, "ConnectionKeeper", conn ? conn->getDebugID() : UID())
.detail("PeerAddr", self->destination)
.detail("ConnSet", (bool)conn);
// This is used only at client side and is used to override waiting for unsent data to update failure monitoring status. Overriding is only
// useful when making first connection ever, or an existing connection fails. If `connect` itself fails, it is clear peer is down anyway.
state bool clientReconnectDelay = false;
loop {
try {
if (!conn) { // Always, except for the first loop with an incoming connection
self->outgoingConnectionIdle = true;
// Wait until there is something to send
// Wait until there is something to send.
while ( self->unsent.empty() ) {
Future<Void> clientReconnectDelayF = FlowTransport::transport().isClient() && self->destination.isPublic() && clientReconnectDelay
? delay(0)
: Never();
choose {
when (wait( self->dataToSend.onTrigger() )) {
//
}
when (wait(FlowTransport::transport().isClient() ? delayJittered( std::max(0.0, self->lastConnectTime+self->reconnectionDelay - now())) : Never())) {
if (self->destination.isPublic())
break;
}
when (wait( self->dataToSend.onTrigger() )) { }
when (wait( clientReconnectDelayF )) { break; }
}
}
@ -497,6 +499,7 @@ struct Peer : NonCopyable {
}
if (self->unsent.empty()) {
_conn->close();
clientReconnectDelay = false;
continue;
} else {
conn = _conn;
@ -505,6 +508,9 @@ struct Peer : NonCopyable {
}
} else {
TraceEvent("ConnectionTimedOut", conn ? conn->getDebugID() : UID()).suppressFor(1.0).detail("PeerAddr", self->destination);
if (FlowTransport::transport().isClient()) {
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true));
}
throw connection_failed();
}
@ -544,10 +550,6 @@ struct Peer : NonCopyable {
TraceEvent(ok ? SevInfo : SevWarnAlways, "IncompatibleConnectionClosed", conn ? conn->getDebugID() : UID()).error(e, true).suppressFor(1.0).detail("PeerAddr", self->destination);
}
if (FlowTransport::transport().isClient() && e.code() == error_code_connection_failed) {
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true));
}
if(self->destination.isPublic() && IFailureMonitor::failureMonitor().getState(self->destination).isAvailable()) {
auto& it = self->transport->closedPeers[self->destination];
if(now() - it.second > FLOW_KNOBS->TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY) {
@ -560,6 +562,9 @@ struct Peer : NonCopyable {
}
if (conn) {
if (FlowTransport::transport().isClient()) {
clientReconnectDelay = true;
}
conn->close();
conn = Reference<IConnection>();
}
@ -571,9 +576,6 @@ struct Peer : NonCopyable {
TraceEvent("PeerDestroy").error(e).suppressFor(1.0).detail("PeerAddr", self->destination);
self->connect.cancel();
self->transport->peers.erase(self->destination);
if (FlowTransport::transport().isClient() && e.code() == error_code_connection_failed) {
IFailureMonitor::failureMonitor().expireFailedDelayed(self->destination);
}
delete self;
return Void();
}
@ -1015,6 +1017,10 @@ Endpoint FlowTransport::loadedEndpoint( const UID& token ) {
}
void FlowTransport::addPeerReference( const Endpoint& endpoint, NetworkMessageReceiver* receiver ) {
if (FlowTransport::transport().isClient()) {
IFailureMonitor::failureMonitor().setStatus(endpoint.getPrimaryAddress(), FailureStatus(false));
}
if (!receiver->isStream() || !endpoint.getPrimaryAddress().isValid()) return;
Peer* peer = self->getPeer(endpoint.getPrimaryAddress());
if(peer->peerReferences == -1) {