From 6b4d30c3ae4bfdcb24abc7940d2325a034d9ef61 Mon Sep 17 00:00:00 2001 From: Vishesh Yadav Date: Thu, 18 Apr 2019 14:12:45 -0700 Subject: [PATCH 1/5] failmon: Identify client vs server when starting failure monitoring client --- fdbclient/FailureMonitorClient.actor.cpp | 1 + fdbrpc/FlowTransport.actor.cpp | 10 ++++++++-- fdbrpc/FlowTransport.h | 6 +++++- fdbserver/SimulatedCluster.actor.cpp | 4 ++-- fdbserver/fdbserver.actor.cpp | 2 +- flow/network.h | 2 +- 6 files changed, 18 insertions(+), 7 deletions(-) diff --git a/fdbclient/FailureMonitorClient.actor.cpp b/fdbclient/FailureMonitorClient.actor.cpp index cd51792fcb..86d6431248 100644 --- a/fdbclient/FailureMonitorClient.actor.cpp +++ b/fdbclient/FailureMonitorClient.actor.cpp @@ -169,6 +169,7 @@ ACTOR Future failureMonitorClientLoop( ACTOR Future failureMonitorClient( Reference>> ci, bool trackMyStatus ) { state SimpleFailureMonitor* monitor = static_cast( &IFailureMonitor::failureMonitor() ); state Reference fmState = Reference(new FailureMonitorClientState()); + TraceEvent("FailureMonitorStart").detail("IsClient", FlowTransport::transport().isClient()); auto localAddr = g_network->getLocalAddresses(); monitor->setStatus(localAddr.address, FailureStatus(false)); if(localAddr.secondaryAddress.present()) { diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index 52750230b2..4206af0399 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -1182,9 +1182,15 @@ bool FlowTransport::incompatibleOutgoingConnectionsPresent() { return self->numIncompatibleConnections > 0; } -void FlowTransport::createInstance( uint64_t transportId ) +void FlowTransport::createInstance( bool isClient, uint64_t transportId ) { - g_network->setGlobal(INetwork::enFailureMonitor, (flowGlobalType) new SimpleFailureMonitor()); + if (isClient) { + g_network->setGlobal(INetwork::enFailureMonitor, (flowGlobalType) new SimpleFailureMonitor()); + g_network->setGlobal(INetwork::enClientFailureMonitor, (flowGlobalType)1); + } else { + g_network->setGlobal(INetwork::enFailureMonitor, (flowGlobalType) new SimpleFailureMonitor()); + g_network->setGlobal(INetwork::enClientFailureMonitor, nullptr); + } g_network->setGlobal(INetwork::enFlowTransport, (flowGlobalType) new FlowTransport(transportId)); g_network->setGlobal(INetwork::enNetworkAddressFunc, (flowGlobalType) &FlowTransport::getGlobalLocalAddress); g_network->setGlobal(INetwork::enNetworkAddressesFunc, (flowGlobalType) &FlowTransport::getGlobalLocalAddresses); diff --git a/fdbrpc/FlowTransport.h b/fdbrpc/FlowTransport.h index 5ff872821e..e7a2302af2 100644 --- a/fdbrpc/FlowTransport.h +++ b/fdbrpc/FlowTransport.h @@ -109,10 +109,14 @@ public: FlowTransport(uint64_t transportId); ~FlowTransport(); - static void createInstance(uint64_t transportId = 0); + static void createInstance(bool isClient, uint64_t transportId = 0); // Creates a new FlowTransport and makes FlowTransport::transport() return it. This uses g_network->global() variables, // so it will be private to a simulation. + static bool isClient() { + return g_network->global(INetwork::enClientFailureMonitor) != nullptr; + } + void initMetrics(); // Metrics must be initialized after FlowTransport::createInstance has been called diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index 25650a8dc7..fdcdb5061d 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -240,7 +240,7 @@ ACTOR Future simulatedFDBDRebooter(Referenceregister_network(); @@ -1402,7 +1402,7 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource), "", ""), TaskDefaultYield)); Sim2FileSystem::newFileSystem(); - FlowTransport::createInstance(1); + FlowTransport::createInstance(true, 1); if (tlsOptions->enabled()) { simInitTLS(tlsOptions); } diff --git a/fdbserver/fdbserver.actor.cpp b/fdbserver/fdbserver.actor.cpp index a13e2c815d..e0a35c7c7e 100644 --- a/fdbserver/fdbserver.actor.cpp +++ b/fdbserver/fdbserver.actor.cpp @@ -1523,7 +1523,7 @@ int main(int argc, char* argv[]) { openTraceFile(NetworkAddress(), rollsize, maxLogsSize, logFolder, "trace", logGroup); } else { g_network = newNet2(useThreadPool, true, useObjectSerializer); - FlowTransport::createInstance(1); + FlowTransport::createInstance(false, 1); const bool expectsPublicAddress = (role == FDBD || role == NetworkTestServer || role == Restore); if (publicAddressStrs.empty()) { diff --git a/flow/network.h b/flow/network.h index 4b33c9c8c9..536e2f9ca2 100644 --- a/flow/network.h +++ b/flow/network.h @@ -353,7 +353,7 @@ public: enum enumGlobal { enFailureMonitor = 0, enFlowTransport = 1, enTDMetrics = 2, enNetworkConnections = 3, enNetworkAddressFunc = 4, enFileSystem = 5, enASIOService = 6, enEventFD = 7, enRunCycleFunc = 8, enASIOTimedOut = 9, enBlobCredentialFiles = 10, - enNetworkAddressesFunc = 11 + enNetworkAddressesFunc = 11, enClientFailureMonitor = 12 }; virtual void longTaskCheck( const char* name ) {} From 6fa7081a216fdff251f9125c74cef7da1d9948c1 Mon Sep 17 00:00:00 2001 From: Vishesh Yadav Date: Mon, 20 May 2019 11:54:46 -0700 Subject: [PATCH 2/5] net: Don't make FailureMonitoring requests from client This patch removes the need for clients to continuously contact cluster coordinator for failure monitoring information. Instead, it uses the FlowTransport to monitor the statuses of peers and update FailureMonitor accordingly. --- fdbclient/FailureMonitorClient.actor.cpp | 6 ++- fdbclient/NativeAPI.actor.cpp | 5 +++ fdbrpc/FailureMonitor.actor.cpp | 35 ++++++++++++++++++ fdbrpc/FailureMonitor.h | 19 ++++++++-- fdbrpc/FlowTransport.actor.cpp | 47 +++++++++++++++++++----- fdbserver/ClusterController.actor.cpp | 2 +- flow/error_definitions.h | 1 + 7 files changed, 100 insertions(+), 15 deletions(-) diff --git a/fdbclient/FailureMonitorClient.actor.cpp b/fdbclient/FailureMonitorClient.actor.cpp index 86d6431248..6c2f3015b3 100644 --- a/fdbclient/FailureMonitorClient.actor.cpp +++ b/fdbclient/FailureMonitorClient.actor.cpp @@ -167,9 +167,13 @@ ACTOR Future failureMonitorClientLoop( } ACTOR Future failureMonitorClient( Reference>> ci, bool trackMyStatus ) { + TraceEvent("FailureMonitorStart").detail("IsClient", FlowTransport::transport().isClient()); + if (FlowTransport::transport().isClient()) { + wait (Never()); + } + state SimpleFailureMonitor* monitor = static_cast( &IFailureMonitor::failureMonitor() ); state Reference fmState = Reference(new FailureMonitorClientState()); - TraceEvent("FailureMonitorStart").detail("IsClient", FlowTransport::transport().isClient()); auto localAddr = g_network->getLocalAddresses(); monitor->setStatus(localAddr.address, FailureStatus(false)); if(localAddr.secondaryAddress.present()) { diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index e6d708aefe..9155851765 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -570,6 +570,11 @@ ACTOR static Future monitorClientInfo( Referenceget().present() ? brokenPromiseToNever( clusterInterface->get().get().openDatabase.getReply( req ) ) : Never() ) ) { TraceEvent("ClientInfoChange").detail("ChangeID", ni.id); + if (FlowTransport::transport().isClient()) { + for (const auto& proxy : ni.proxies) { + IFailureMonitor::failureMonitor().setStatus(proxy.address(), FailureStatus(false)); + } + } outInfo->set(ni); } when( wait( clusterInterface->onChange() ) ) { diff --git a/fdbrpc/FailureMonitor.actor.cpp b/fdbrpc/FailureMonitor.actor.cpp index 45ec1d432a..5c75045118 100644 --- a/fdbrpc/FailureMonitor.actor.cpp +++ b/fdbrpc/FailureMonitor.actor.cpp @@ -62,6 +62,36 @@ Future IFailureMonitor::onFailedFor( Endpoint const& endpoint, double sust return waitForContinuousFailure( this, endpoint, sustainedFailureDuration, slope ); } +ACTOR Future expireFailedDelayedMonitor(std::map* expireMap) { + state std::set toRemove; + loop { + for (const auto& p : *expireMap) { + if (p.second <= now()) { + toRemove.insert(p.first); + } + } + + for (const auto& addr : toRemove) { + expireMap->erase(addr); + IFailureMonitor::failureMonitor().setStatus(addr, FailureStatus(false)); + } + + toRemove.clear(); + wait(delay(2)); + } +} + +SimpleFailureMonitor::SimpleFailureMonitor() + : endpointKnownFailed() { + + if (FlowTransport::transport().isClient()) + expireMonitor = expireFailedDelayedMonitor(&expireMap); +} + +SimpleFailureMonitor::~SimpleFailureMonitor() { + expireMonitor.cancel(); +} + void SimpleFailureMonitor::setStatus( NetworkAddress const& address, FailureStatus const& status ) { //if (status.failed) @@ -159,4 +189,9 @@ bool SimpleFailureMonitor::permanentlyFailed( Endpoint const& endpoint ) { void SimpleFailureMonitor::reset() { addressStatus = std::unordered_map< NetworkAddress, FailureStatus >(); endpointKnownFailed.resetNoWaiting(); + expireMap.clear(); +} + +void SimpleFailureMonitor::expireFailedDelayed(const NetworkAddress& address) { + expireMap[address] = now() + 60; } diff --git a/fdbrpc/FailureMonitor.h b/fdbrpc/FailureMonitor.h index 616b5d64fb..ece182f312 100644 --- a/fdbrpc/FailureMonitor.h +++ b/fdbrpc/FailureMonitor.h @@ -105,6 +105,9 @@ public: // Called by FlowTransport when a connection closes and a prior request or reply might be lost virtual void notifyDisconnect( NetworkAddress const& ) = 0; + // Called to update the failure status of network address directly when running client. + virtual void setStatus( NetworkAddress const& address, FailureStatus const& status ) = 0; + // Returns when the known status of endpoint is next equal to status. Returns immediately // if appropriate. Future onStateEqual( Endpoint const& endpoint, FailureStatus status ); @@ -114,11 +117,14 @@ public: return onStateEqual( endpoint, FailureStatus() ); } - static IFailureMonitor& failureMonitor() { return *static_cast((void*) g_network->global(INetwork::enFailureMonitor)); } - // Returns the failure monitor that the calling machine should use - // Returns when the status of the given endpoint has continuously been "failed" for sustainedFailureDuration + (elapsedTime*sustainedFailureSlope) Future onFailedFor( Endpoint const& endpoint, double sustainedFailureDuration, double sustainedFailureSlope = 0.0 ); + + // Expires failed status of peers after certain delay. + virtual void expireFailedDelayed(NetworkAddress const& address) = 0; + + // Returns the failure monitor that the calling machine should use + static IFailureMonitor& failureMonitor() { return *static_cast((void*) g_network->global(INetwork::enFailureMonitor)); } }; // SimpleFailureMonitor is the sole implementation of IFailureMonitor. It has no @@ -126,7 +132,8 @@ public: // Initially all addresses are considered failed, but all endpoints of a non-failed address are considered OK. class SimpleFailureMonitor : public IFailureMonitor { public: - SimpleFailureMonitor() : endpointKnownFailed() { } + SimpleFailureMonitor(); + ~SimpleFailureMonitor(); void setStatus( NetworkAddress const& address, FailureStatus const& status ); void endpointNotFound( Endpoint const& ); virtual void notifyDisconnect( NetworkAddress const& ); @@ -137,12 +144,16 @@ public: virtual Future onDisconnectOrFailure( Endpoint const& endpoint ); virtual bool onlyEndpointFailed( Endpoint const& endpoint ); virtual bool permanentlyFailed( Endpoint const& endpoint ); + virtual void expireFailedDelayed(NetworkAddress const& address); void reset(); private: std::unordered_map< NetworkAddress, FailureStatus > addressStatus; YieldedAsyncMap< Endpoint, bool > endpointKnownFailed; + std::map expireMap; + Future expireMonitor; + friend class OnStateChangedActorActor; }; diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index 4206af0399..3ddc2aee75 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -394,7 +394,7 @@ struct Peer : NonCopyable { loop { if(peer->peerReferences == 0 && peer->reliable.empty() && peer->unsent.empty()) { - throw connection_failed(); + throw connection_unreferenced(); } wait( delayJittered( FLOW_KNOBS->CONNECTION_MONITOR_LOOP_TIME ) ); @@ -470,19 +470,39 @@ struct Peer : NonCopyable { if (!conn) { // Always, except for the first loop with an incoming connection self->outgoingConnectionIdle = true; // Wait until there is something to send - while ( self->unsent.empty() ) - wait( self->dataToSend.onTrigger() ); + + while ( self->unsent.empty() ) { + choose { + when (wait( self->dataToSend.onTrigger() )) { + // + } + when (wait(FlowTransport::transport().isClient() ? delayJittered( std::max(0.0, self->lastConnectTime+self->reconnectionDelay - now())) : Never())) { + if (self->destination.isPublic()) + break; + } + } + } + ASSERT( self->destination.isPublic() ); self->outgoingConnectionIdle = false; - wait( delayJittered( std::max(0.0, self->lastConnectTime+self->reconnectionDelay - now()) ) ); // Don't connect() to the same peer more than once per 2 sec + if (!FlowTransport::transport().isClient()) + wait( delayJittered( std::max(0.0, self->lastConnectTime+self->reconnectionDelay - now()) ) ); // Don't connect() to the same peer more than once per 2 sec self->lastConnectTime = now(); TraceEvent("ConnectingTo", conn ? conn->getDebugID() : UID()).suppressFor(1.0).detail("PeerAddr", self->destination); Reference _conn = wait( timeout( INetworkConnections::net()->connect(self->destination), FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT, Reference() ) ); if (_conn) { - conn = _conn; - TraceEvent("ConnectionExchangingConnectPacket", conn->getDebugID()).suppressFor(1.0).detail("PeerAddr", self->destination); - self->prependConnectPacket(); + if (FlowTransport::transport().isClient()) { + IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(false)); + } + if (self->unsent.empty()) { + _conn->close(); + continue; + } else { + conn = _conn; + TraceEvent("ConnectionExchangingConnectPacket", conn->getDebugID()).suppressFor(1.0).detail("PeerAddr", self->destination); + self->prependConnectPacket(); + } } else { TraceEvent("ConnectionTimedOut", conn ? conn->getDebugID() : UID()).suppressFor(1.0).detail("PeerAddr", self->destination); throw connection_failed(); @@ -497,7 +517,8 @@ struct Peer : NonCopyable { self->transport->countConnEstablished++; wait( connectionWriter( self, conn ) || reader || connectionMonitor(self) ); } catch (Error& e) { - if (e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled || ( g_network->isSimulated() && e.code() == error_code_checksum_failed )) + if (e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled || e.code() == error_code_connection_unreferenced || + ( g_network->isSimulated() && e.code() == error_code_checksum_failed)) self->transport->countConnClosedWithoutError++; else self->transport->countConnClosedWithError++; @@ -513,7 +534,8 @@ struct Peer : NonCopyable { } self->discardUnreliablePackets(); reader = Future(); - bool ok = e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled || ( g_network->isSimulated() && e.code() == error_code_checksum_failed ); + bool ok = e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled || e.code() == error_code_connection_unreferenced || + ( g_network->isSimulated() && e.code() == error_code_checksum_failed ); if(self->compatible) { TraceEvent(ok ? SevInfo : SevWarnAlways, "ConnectionClosed", conn ? conn->getDebugID() : UID()).error(e, true).suppressFor(1.0).detail("PeerAddr", self->destination); @@ -522,6 +544,10 @@ struct Peer : NonCopyable { TraceEvent(ok ? SevInfo : SevWarnAlways, "IncompatibleConnectionClosed", conn ? conn->getDebugID() : UID()).error(e, true).suppressFor(1.0).detail("PeerAddr", self->destination); } + if (FlowTransport::transport().isClient() && e.code() == error_code_connection_failed) { + IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true)); + } + if(self->destination.isPublic() && IFailureMonitor::failureMonitor().getState(self->destination).isAvailable()) { auto& it = self->transport->closedPeers[self->destination]; if(now() - it.second > FLOW_KNOBS->TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY) { @@ -545,6 +571,9 @@ struct Peer : NonCopyable { TraceEvent("PeerDestroy").error(e).suppressFor(1.0).detail("PeerAddr", self->destination); self->connect.cancel(); self->transport->peers.erase(self->destination); + if (FlowTransport::transport().isClient() && e.code() == error_code_connection_failed) { + IFailureMonitor::failureMonitor().expireFailedDelayed(self->destination); + } delete self; return Void(); } diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index 5d4243f581..a6627e8a82 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -1499,7 +1499,7 @@ ACTOR Future rebootAndCheck( ClusterControllerData* cluster, Optional workerAvailabilityWatch( WorkerInterface worker, ProcessClass startingClass, ClusterControllerData* cluster ) { - state Future failed = worker.address() == g_network->getLocalAddress() ? Never() : waitFailureClient( worker.waitFailure, SERVER_KNOBS->WORKER_FAILURE_TIME ); + state Future failed = (worker.address() == g_network->getLocalAddress() || startingClass.classType() == ProcessClass::TesterClass) ? Never() : waitFailureClient( worker.waitFailure, SERVER_KNOBS->WORKER_FAILURE_TIME ); cluster->updateWorkerList.set( worker.locality.processId(), ProcessData(worker.locality, startingClass, worker.address()) ); // This switching avoids a race where the worker can be added to id_worker map after the workerAvailabilityWatch fails for the worker. wait(delay(0)); diff --git a/flow/error_definitions.h b/flow/error_definitions.h index 37ef476fb2..25f000935b 100755 --- a/flow/error_definitions.h +++ b/flow/error_definitions.h @@ -68,6 +68,7 @@ ERROR( serialization_failed, 1044, "Failed to deserialize an object" ) ERROR( transaction_not_permitted, 1045, "Operation not permitted") ERROR( cluster_not_fully_recovered, 1046, "Cluster not fully recovered") ERROR( txn_exec_log_anti_quorum, 1047, "Execute Transaction not supported when log anti quorum is configured") +ERROR( connection_unreferenced, 1048, "No peer references for connection" ) ERROR( broken_promise, 1100, "Broken promise" ) ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" ) From 4316ef9ec6adbf415d93889a0566b966b97a6460 Mon Sep 17 00:00:00 2001 From: Vishesh Yadav Date: Tue, 21 May 2019 16:39:08 -0700 Subject: [PATCH 3/5] failMon: For clients remove expireFailure and report failures only during connect --- fdbclient/NativeAPI.actor.cpp | 5 ----- fdbrpc/FailureMonitor.actor.cpp | 30 ++------------------------- fdbrpc/FailureMonitor.h | 8 +------- fdbrpc/FlowTransport.actor.cpp | 36 +++++++++++++++++++-------------- 4 files changed, 24 insertions(+), 55 deletions(-) diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index 9155851765..e6d708aefe 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -570,11 +570,6 @@ ACTOR static Future monitorClientInfo( Referenceget().present() ? brokenPromiseToNever( clusterInterface->get().get().openDatabase.getReply( req ) ) : Never() ) ) { TraceEvent("ClientInfoChange").detail("ChangeID", ni.id); - if (FlowTransport::transport().isClient()) { - for (const auto& proxy : ni.proxies) { - IFailureMonitor::failureMonitor().setStatus(proxy.address(), FailureStatus(false)); - } - } outInfo->set(ni); } when( wait( clusterInterface->onChange() ) ) { diff --git a/fdbrpc/FailureMonitor.actor.cpp b/fdbrpc/FailureMonitor.actor.cpp index 5c75045118..f9938a125b 100644 --- a/fdbrpc/FailureMonitor.actor.cpp +++ b/fdbrpc/FailureMonitor.actor.cpp @@ -62,34 +62,13 @@ Future IFailureMonitor::onFailedFor( Endpoint const& endpoint, double sust return waitForContinuousFailure( this, endpoint, sustainedFailureDuration, slope ); } -ACTOR Future expireFailedDelayedMonitor(std::map* expireMap) { - state std::set toRemove; - loop { - for (const auto& p : *expireMap) { - if (p.second <= now()) { - toRemove.insert(p.first); - } - } - - for (const auto& addr : toRemove) { - expireMap->erase(addr); - IFailureMonitor::failureMonitor().setStatus(addr, FailureStatus(false)); - } - - toRemove.clear(); - wait(delay(2)); - } -} - SimpleFailureMonitor::SimpleFailureMonitor() : endpointKnownFailed() { - - if (FlowTransport::transport().isClient()) - expireMonitor = expireFailedDelayedMonitor(&expireMap); + // } SimpleFailureMonitor::~SimpleFailureMonitor() { - expireMonitor.cancel(); + // } void SimpleFailureMonitor::setStatus( NetworkAddress const& address, FailureStatus const& status ) { @@ -189,9 +168,4 @@ bool SimpleFailureMonitor::permanentlyFailed( Endpoint const& endpoint ) { void SimpleFailureMonitor::reset() { addressStatus = std::unordered_map< NetworkAddress, FailureStatus >(); endpointKnownFailed.resetNoWaiting(); - expireMap.clear(); -} - -void SimpleFailureMonitor::expireFailedDelayed(const NetworkAddress& address) { - expireMap[address] = now() + 60; } diff --git a/fdbrpc/FailureMonitor.h b/fdbrpc/FailureMonitor.h index ece182f312..8db72f5eab 100644 --- a/fdbrpc/FailureMonitor.h +++ b/fdbrpc/FailureMonitor.h @@ -120,9 +120,6 @@ public: // Returns when the status of the given endpoint has continuously been "failed" for sustainedFailureDuration + (elapsedTime*sustainedFailureSlope) Future onFailedFor( Endpoint const& endpoint, double sustainedFailureDuration, double sustainedFailureSlope = 0.0 ); - // Expires failed status of peers after certain delay. - virtual void expireFailedDelayed(NetworkAddress const& address) = 0; - // Returns the failure monitor that the calling machine should use static IFailureMonitor& failureMonitor() { return *static_cast((void*) g_network->global(INetwork::enFailureMonitor)); } }; @@ -130,6 +127,7 @@ public: // SimpleFailureMonitor is the sole implementation of IFailureMonitor. It has no // failure detection logic; it just implements the interface and reacts to setStatus() etc. // Initially all addresses are considered failed, but all endpoints of a non-failed address are considered OK. + class SimpleFailureMonitor : public IFailureMonitor { public: SimpleFailureMonitor(); @@ -144,16 +142,12 @@ public: virtual Future onDisconnectOrFailure( Endpoint const& endpoint ); virtual bool onlyEndpointFailed( Endpoint const& endpoint ); virtual bool permanentlyFailed( Endpoint const& endpoint ); - virtual void expireFailedDelayed(NetworkAddress const& address); void reset(); private: std::unordered_map< NetworkAddress, FailureStatus > addressStatus; YieldedAsyncMap< Endpoint, bool > endpointKnownFailed; - std::map expireMap; - Future expireMonitor; - friend class OnStateChangedActorActor; }; diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index 3ddc2aee75..08bd6daa08 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -465,21 +465,23 @@ struct Peer : NonCopyable { TraceEvent(SevDebug, "ConnectionKeeper", conn ? conn->getDebugID() : UID()) .detail("PeerAddr", self->destination) .detail("ConnSet", (bool)conn); + + // This is used only at client side and is used to override waiting for unsent data to update failure monitoring status. Overriding is only + // useful when making first connection ever, or an existing connection fails. If `connect` itself fails, it is clear peer is down anyway. + state bool clientReconnectDelay = false; loop { try { if (!conn) { // Always, except for the first loop with an incoming connection self->outgoingConnectionIdle = true; - // Wait until there is something to send + // Wait until there is something to send. while ( self->unsent.empty() ) { + Future clientReconnectDelayF = FlowTransport::transport().isClient() && self->destination.isPublic() && clientReconnectDelay + ? delay(0) + : Never(); choose { - when (wait( self->dataToSend.onTrigger() )) { - // - } - when (wait(FlowTransport::transport().isClient() ? delayJittered( std::max(0.0, self->lastConnectTime+self->reconnectionDelay - now())) : Never())) { - if (self->destination.isPublic()) - break; - } + when (wait( self->dataToSend.onTrigger() )) { } + when (wait( clientReconnectDelayF )) { break; } } } @@ -497,6 +499,7 @@ struct Peer : NonCopyable { } if (self->unsent.empty()) { _conn->close(); + clientReconnectDelay = false; continue; } else { conn = _conn; @@ -505,6 +508,9 @@ struct Peer : NonCopyable { } } else { TraceEvent("ConnectionTimedOut", conn ? conn->getDebugID() : UID()).suppressFor(1.0).detail("PeerAddr", self->destination); + if (FlowTransport::transport().isClient()) { + IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true)); + } throw connection_failed(); } @@ -544,10 +550,6 @@ struct Peer : NonCopyable { TraceEvent(ok ? SevInfo : SevWarnAlways, "IncompatibleConnectionClosed", conn ? conn->getDebugID() : UID()).error(e, true).suppressFor(1.0).detail("PeerAddr", self->destination); } - if (FlowTransport::transport().isClient() && e.code() == error_code_connection_failed) { - IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true)); - } - if(self->destination.isPublic() && IFailureMonitor::failureMonitor().getState(self->destination).isAvailable()) { auto& it = self->transport->closedPeers[self->destination]; if(now() - it.second > FLOW_KNOBS->TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY) { @@ -560,6 +562,9 @@ struct Peer : NonCopyable { } if (conn) { + if (FlowTransport::transport().isClient()) { + clientReconnectDelay = true; + } conn->close(); conn = Reference(); } @@ -571,9 +576,6 @@ struct Peer : NonCopyable { TraceEvent("PeerDestroy").error(e).suppressFor(1.0).detail("PeerAddr", self->destination); self->connect.cancel(); self->transport->peers.erase(self->destination); - if (FlowTransport::transport().isClient() && e.code() == error_code_connection_failed) { - IFailureMonitor::failureMonitor().expireFailedDelayed(self->destination); - } delete self; return Void(); } @@ -1015,6 +1017,10 @@ Endpoint FlowTransport::loadedEndpoint( const UID& token ) { } void FlowTransport::addPeerReference( const Endpoint& endpoint, NetworkMessageReceiver* receiver ) { + if (FlowTransport::transport().isClient()) { + IFailureMonitor::failureMonitor().setStatus(endpoint.getPrimaryAddress(), FailureStatus(false)); + } + if (!receiver->isStream() || !endpoint.getPrimaryAddress().isValid()) return; Peer* peer = self->getPeer(endpoint.getPrimaryAddress()); if(peer->peerReferences == -1) { From a8e408e268addca09d91f0febcaa7fe4d4aa2840 Mon Sep 17 00:00:00 2001 From: Vishesh Yadav Date: Wed, 29 May 2019 13:43:21 -0700 Subject: [PATCH 4/5] run clang-format on changes --- fdbclient/FailureMonitorClient.actor.cpp | 2 +- fdbrpc/FailureMonitor.actor.cpp | 9 ------ fdbrpc/FailureMonitor.h | 9 +++--- fdbrpc/FlowTransport.actor.cpp | 40 ++++++++++++++---------- fdbrpc/FlowTransport.h | 4 +-- fdbserver/ClusterController.actor.cpp | 5 ++- flow/network.h | 16 ++++++++-- 7 files changed, 48 insertions(+), 37 deletions(-) diff --git a/fdbclient/FailureMonitorClient.actor.cpp b/fdbclient/FailureMonitorClient.actor.cpp index 6c2f3015b3..3be7a4dccd 100644 --- a/fdbclient/FailureMonitorClient.actor.cpp +++ b/fdbclient/FailureMonitorClient.actor.cpp @@ -169,7 +169,7 @@ ACTOR Future failureMonitorClientLoop( ACTOR Future failureMonitorClient( Reference>> ci, bool trackMyStatus ) { TraceEvent("FailureMonitorStart").detail("IsClient", FlowTransport::transport().isClient()); if (FlowTransport::transport().isClient()) { - wait (Never()); + wait(Never()); } state SimpleFailureMonitor* monitor = static_cast( &IFailureMonitor::failureMonitor() ); diff --git a/fdbrpc/FailureMonitor.actor.cpp b/fdbrpc/FailureMonitor.actor.cpp index f9938a125b..45ec1d432a 100644 --- a/fdbrpc/FailureMonitor.actor.cpp +++ b/fdbrpc/FailureMonitor.actor.cpp @@ -62,15 +62,6 @@ Future IFailureMonitor::onFailedFor( Endpoint const& endpoint, double sust return waitForContinuousFailure( this, endpoint, sustainedFailureDuration, slope ); } -SimpleFailureMonitor::SimpleFailureMonitor() - : endpointKnownFailed() { - // -} - -SimpleFailureMonitor::~SimpleFailureMonitor() { - // -} - void SimpleFailureMonitor::setStatus( NetworkAddress const& address, FailureStatus const& status ) { //if (status.failed) diff --git a/fdbrpc/FailureMonitor.h b/fdbrpc/FailureMonitor.h index 8db72f5eab..23fb38624f 100644 --- a/fdbrpc/FailureMonitor.h +++ b/fdbrpc/FailureMonitor.h @@ -106,7 +106,7 @@ public: virtual void notifyDisconnect( NetworkAddress const& ) = 0; // Called to update the failure status of network address directly when running client. - virtual void setStatus( NetworkAddress const& address, FailureStatus const& status ) = 0; + virtual void setStatus(NetworkAddress const& address, FailureStatus const& status) = 0; // Returns when the known status of endpoint is next equal to status. Returns immediately // if appropriate. @@ -121,7 +121,9 @@ public: Future onFailedFor( Endpoint const& endpoint, double sustainedFailureDuration, double sustainedFailureSlope = 0.0 ); // Returns the failure monitor that the calling machine should use - static IFailureMonitor& failureMonitor() { return *static_cast((void*) g_network->global(INetwork::enFailureMonitor)); } + static IFailureMonitor& failureMonitor() { + return *static_cast((void*)g_network->global(INetwork::enFailureMonitor)); + } }; // SimpleFailureMonitor is the sole implementation of IFailureMonitor. It has no @@ -130,8 +132,7 @@ public: class SimpleFailureMonitor : public IFailureMonitor { public: - SimpleFailureMonitor(); - ~SimpleFailureMonitor(); + SimpleFailureMonitor() : endpointKnownFailed() { } void setStatus( NetworkAddress const& address, FailureStatus const& status ); void endpointNotFound( Endpoint const& ); virtual void notifyDisconnect( NetworkAddress const& ); diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index 08bd6daa08..de31898b83 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -466,8 +466,9 @@ struct Peer : NonCopyable { .detail("PeerAddr", self->destination) .detail("ConnSet", (bool)conn); - // This is used only at client side and is used to override waiting for unsent data to update failure monitoring status. Overriding is only - // useful when making first connection ever, or an existing connection fails. If `connect` itself fails, it is clear peer is down anyway. + // This is used only at client side and is used to override waiting for unsent data to update failure monitoring + // status. At client, if an existing connection fails, we retry making a connection and if that fails, then only + // we report that address as failed. state bool clientReconnectDelay = false; loop { try { @@ -475,20 +476,24 @@ struct Peer : NonCopyable { self->outgoingConnectionIdle = true; // Wait until there is something to send. - while ( self->unsent.empty() ) { - Future clientReconnectDelayF = FlowTransport::transport().isClient() && self->destination.isPublic() && clientReconnectDelay - ? delay(0) - : Never(); + while (self->unsent.empty()) { + Future clientReconnectDelayF = FlowTransport::transport().isClient() && + self->destination.isPublic() && + clientReconnectDelay + ? delay(0) + : Never(); choose { - when (wait( self->dataToSend.onTrigger() )) { } - when (wait( clientReconnectDelayF )) { break; } + when(wait(self->dataToSend.onTrigger())) {} + when(wait(clientReconnectDelayF)) { break; } } } ASSERT( self->destination.isPublic() ); self->outgoingConnectionIdle = false; if (!FlowTransport::transport().isClient()) - wait( delayJittered( std::max(0.0, self->lastConnectTime+self->reconnectionDelay - now()) ) ); // Don't connect() to the same peer more than once per 2 sec + wait(delayJittered( + std::max(0.0, self->lastConnectTime + self->reconnectionDelay - + now()))); // Don't connect() to the same peer more than once per 2 sec self->lastConnectTime = now(); TraceEvent("ConnectingTo", conn ? conn->getDebugID() : UID()).suppressFor(1.0).detail("PeerAddr", self->destination); @@ -503,7 +508,9 @@ struct Peer : NonCopyable { continue; } else { conn = _conn; - TraceEvent("ConnectionExchangingConnectPacket", conn->getDebugID()).suppressFor(1.0).detail("PeerAddr", self->destination); + TraceEvent("ConnectionExchangingConnectPacket", conn->getDebugID()) + .suppressFor(1.0) + .detail("PeerAddr", self->destination); self->prependConnectPacket(); } } else { @@ -523,8 +530,9 @@ struct Peer : NonCopyable { self->transport->countConnEstablished++; wait( connectionWriter( self, conn ) || reader || connectionMonitor(self) ); } catch (Error& e) { - if (e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled || e.code() == error_code_connection_unreferenced || - ( g_network->isSimulated() && e.code() == error_code_checksum_failed)) + if (e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled || + e.code() == error_code_connection_unreferenced || + (g_network->isSimulated() && e.code() == error_code_checksum_failed)) self->transport->countConnClosedWithoutError++; else self->transport->countConnClosedWithError++; @@ -540,8 +548,9 @@ struct Peer : NonCopyable { } self->discardUnreliablePackets(); reader = Future(); - bool ok = e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled || e.code() == error_code_connection_unreferenced || - ( g_network->isSimulated() && e.code() == error_code_checksum_failed ); + bool ok = e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled || + e.code() == error_code_connection_unreferenced || + (g_network->isSimulated() && e.code() == error_code_checksum_failed); if(self->compatible) { TraceEvent(ok ? SevInfo : SevWarnAlways, "ConnectionClosed", conn ? conn->getDebugID() : UID()).error(e, true).suppressFor(1.0).detail("PeerAddr", self->destination); @@ -1217,8 +1226,7 @@ bool FlowTransport::incompatibleOutgoingConnectionsPresent() { return self->numIncompatibleConnections > 0; } -void FlowTransport::createInstance( bool isClient, uint64_t transportId ) -{ +void FlowTransport::createInstance(bool isClient, uint64_t transportId) { if (isClient) { g_network->setGlobal(INetwork::enFailureMonitor, (flowGlobalType) new SimpleFailureMonitor()); g_network->setGlobal(INetwork::enClientFailureMonitor, (flowGlobalType)1); diff --git a/fdbrpc/FlowTransport.h b/fdbrpc/FlowTransport.h index e7a2302af2..98145de583 100644 --- a/fdbrpc/FlowTransport.h +++ b/fdbrpc/FlowTransport.h @@ -113,9 +113,7 @@ public: // Creates a new FlowTransport and makes FlowTransport::transport() return it. This uses g_network->global() variables, // so it will be private to a simulation. - static bool isClient() { - return g_network->global(INetwork::enClientFailureMonitor) != nullptr; - } + static bool isClient() { return g_network->global(INetwork::enClientFailureMonitor) != nullptr; } void initMetrics(); // Metrics must be initialized after FlowTransport::createInstance has been called diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index a6627e8a82..9fc12d502e 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -1499,7 +1499,10 @@ ACTOR Future rebootAndCheck( ClusterControllerData* cluster, Optional workerAvailabilityWatch( WorkerInterface worker, ProcessClass startingClass, ClusterControllerData* cluster ) { - state Future failed = (worker.address() == g_network->getLocalAddress() || startingClass.classType() == ProcessClass::TesterClass) ? Never() : waitFailureClient( worker.waitFailure, SERVER_KNOBS->WORKER_FAILURE_TIME ); + state Future failed = + (worker.address() == g_network->getLocalAddress() || startingClass.classType() == ProcessClass::TesterClass) + ? Never() + : waitFailureClient(worker.waitFailure, SERVER_KNOBS->WORKER_FAILURE_TIME); cluster->updateWorkerList.set( worker.locality.processId(), ProcessData(worker.locality, startingClass, worker.address()) ); // This switching avoids a race where the worker can be added to id_worker map after the workerAvailabilityWatch fails for the worker. wait(delay(0)); diff --git a/flow/network.h b/flow/network.h index 536e2f9ca2..2c7094b8f2 100644 --- a/flow/network.h +++ b/flow/network.h @@ -351,9 +351,19 @@ public: // to the network should be through FlowTransport, not directly through these low level interfaces! enum enumGlobal { - enFailureMonitor = 0, enFlowTransport = 1, enTDMetrics = 2, enNetworkConnections = 3, - enNetworkAddressFunc = 4, enFileSystem = 5, enASIOService = 6, enEventFD = 7, enRunCycleFunc = 8, enASIOTimedOut = 9, enBlobCredentialFiles = 10, - enNetworkAddressesFunc = 11, enClientFailureMonitor = 12 + enFailureMonitor = 0, + enFlowTransport = 1, + enTDMetrics = 2, + enNetworkConnections = 3, + enNetworkAddressFunc = 4, + enFileSystem = 5, + enASIOService = 6, + enEventFD = 7, + enRunCycleFunc = 8, + enASIOTimedOut = 9, + enBlobCredentialFiles = 10, + enNetworkAddressesFunc = 11, + enClientFailureMonitor = 12 }; virtual void longTaskCheck( const char* name ) {} From 42fafe8a426aab2278132e70d3168cc24c4ac4d9 Mon Sep 17 00:00:00 2001 From: Vishesh Yadav Date: Tue, 11 Jun 2019 18:58:00 -0700 Subject: [PATCH 5/5] Addressed review comments --- fdbrpc/FlowTransport.actor.cpp | 28 +++++++++------------------- 1 file changed, 9 insertions(+), 19 deletions(-) diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index de31898b83..25b2cf28b9 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -477,23 +477,18 @@ struct Peer : NonCopyable { // Wait until there is something to send. while (self->unsent.empty()) { - Future clientReconnectDelayF = FlowTransport::transport().isClient() && - self->destination.isPublic() && - clientReconnectDelay - ? delay(0) - : Never(); - choose { - when(wait(self->dataToSend.onTrigger())) {} - when(wait(clientReconnectDelayF)) { break; } + if (FlowTransport::transport().isClient() && self->destination.isPublic() && + clientReconnectDelay) { + break; } + wait(self->dataToSend.onTrigger()); } ASSERT( self->destination.isPublic() ); self->outgoingConnectionIdle = false; - if (!FlowTransport::transport().isClient()) - wait(delayJittered( - std::max(0.0, self->lastConnectTime + self->reconnectionDelay - - now()))); // Don't connect() to the same peer more than once per 2 sec + wait(delayJittered( + std::max(0.0, self->lastConnectTime + self->reconnectionDelay - + now()))); // Don't connect() to the same peer more than once per 2 sec self->lastConnectTime = now(); TraceEvent("ConnectingTo", conn ? conn->getDebugID() : UID()).suppressFor(1.0).detail("PeerAddr", self->destination); @@ -1227,13 +1222,8 @@ bool FlowTransport::incompatibleOutgoingConnectionsPresent() { } void FlowTransport::createInstance(bool isClient, uint64_t transportId) { - if (isClient) { - g_network->setGlobal(INetwork::enFailureMonitor, (flowGlobalType) new SimpleFailureMonitor()); - g_network->setGlobal(INetwork::enClientFailureMonitor, (flowGlobalType)1); - } else { - g_network->setGlobal(INetwork::enFailureMonitor, (flowGlobalType) new SimpleFailureMonitor()); - g_network->setGlobal(INetwork::enClientFailureMonitor, nullptr); - } + g_network->setGlobal(INetwork::enFailureMonitor, (flowGlobalType) new SimpleFailureMonitor()); + g_network->setGlobal(INetwork::enClientFailureMonitor, isClient ? (flowGlobalType)1 : nullptr); g_network->setGlobal(INetwork::enFlowTransport, (flowGlobalType) new FlowTransport(transportId)); g_network->setGlobal(INetwork::enNetworkAddressFunc, (flowGlobalType) &FlowTransport::getGlobalLocalAddress); g_network->setGlobal(INetwork::enNetworkAddressesFunc, (flowGlobalType) &FlowTransport::getGlobalLocalAddresses);