diff --git a/fdbclient/FailureMonitorClient.actor.cpp b/fdbclient/FailureMonitorClient.actor.cpp index cd51792fcb..3be7a4dccd 100644 --- a/fdbclient/FailureMonitorClient.actor.cpp +++ b/fdbclient/FailureMonitorClient.actor.cpp @@ -167,6 +167,11 @@ ACTOR Future failureMonitorClientLoop( } ACTOR Future failureMonitorClient( Reference>> ci, bool trackMyStatus ) { + TraceEvent("FailureMonitorStart").detail("IsClient", FlowTransport::transport().isClient()); + if (FlowTransport::transport().isClient()) { + wait(Never()); + } + state SimpleFailureMonitor* monitor = static_cast( &IFailureMonitor::failureMonitor() ); state Reference fmState = Reference(new FailureMonitorClientState()); auto localAddr = g_network->getLocalAddresses(); diff --git a/fdbrpc/FailureMonitor.h b/fdbrpc/FailureMonitor.h index 616b5d64fb..23fb38624f 100644 --- a/fdbrpc/FailureMonitor.h +++ b/fdbrpc/FailureMonitor.h @@ -105,6 +105,9 @@ public: // Called by FlowTransport when a connection closes and a prior request or reply might be lost virtual void notifyDisconnect( NetworkAddress const& ) = 0; + // Called to update the failure status of network address directly when running client. + virtual void setStatus(NetworkAddress const& address, FailureStatus const& status) = 0; + // Returns when the known status of endpoint is next equal to status. Returns immediately // if appropriate. Future onStateEqual( Endpoint const& endpoint, FailureStatus status ); @@ -114,16 +117,19 @@ public: return onStateEqual( endpoint, FailureStatus() ); } - static IFailureMonitor& failureMonitor() { return *static_cast((void*) g_network->global(INetwork::enFailureMonitor)); } - // Returns the failure monitor that the calling machine should use - // Returns when the status of the given endpoint has continuously been "failed" for sustainedFailureDuration + (elapsedTime*sustainedFailureSlope) Future onFailedFor( Endpoint const& endpoint, double sustainedFailureDuration, double sustainedFailureSlope = 0.0 ); + + // Returns the failure monitor that the calling machine should use + static IFailureMonitor& failureMonitor() { + return *static_cast((void*)g_network->global(INetwork::enFailureMonitor)); + } }; // SimpleFailureMonitor is the sole implementation of IFailureMonitor. It has no // failure detection logic; it just implements the interface and reacts to setStatus() etc. // Initially all addresses are considered failed, but all endpoints of a non-failed address are considered OK. + class SimpleFailureMonitor : public IFailureMonitor { public: SimpleFailureMonitor() : endpointKnownFailed() { } diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index 52750230b2..25b2cf28b9 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -394,7 +394,7 @@ struct Peer : NonCopyable { loop { if(peer->peerReferences == 0 && peer->reliable.empty() && peer->unsent.empty()) { - throw connection_failed(); + throw connection_unreferenced(); } wait( delayJittered( FLOW_KNOBS->CONNECTION_MONITOR_LOOP_TIME ) ); @@ -465,26 +465,54 @@ struct Peer : NonCopyable { TraceEvent(SevDebug, "ConnectionKeeper", conn ? conn->getDebugID() : UID()) .detail("PeerAddr", self->destination) .detail("ConnSet", (bool)conn); + + // This is used only at client side and is used to override waiting for unsent data to update failure monitoring + // status. At client, if an existing connection fails, we retry making a connection and if that fails, then only + // we report that address as failed. + state bool clientReconnectDelay = false; loop { try { if (!conn) { // Always, except for the first loop with an incoming connection self->outgoingConnectionIdle = true; - // Wait until there is something to send - while ( self->unsent.empty() ) - wait( self->dataToSend.onTrigger() ); + + // Wait until there is something to send. + while (self->unsent.empty()) { + if (FlowTransport::transport().isClient() && self->destination.isPublic() && + clientReconnectDelay) { + break; + } + wait(self->dataToSend.onTrigger()); + } + ASSERT( self->destination.isPublic() ); self->outgoingConnectionIdle = false; - wait( delayJittered( std::max(0.0, self->lastConnectTime+self->reconnectionDelay - now()) ) ); // Don't connect() to the same peer more than once per 2 sec + wait(delayJittered( + std::max(0.0, self->lastConnectTime + self->reconnectionDelay - + now()))); // Don't connect() to the same peer more than once per 2 sec self->lastConnectTime = now(); TraceEvent("ConnectingTo", conn ? conn->getDebugID() : UID()).suppressFor(1.0).detail("PeerAddr", self->destination); Reference _conn = wait( timeout( INetworkConnections::net()->connect(self->destination), FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT, Reference() ) ); if (_conn) { - conn = _conn; - TraceEvent("ConnectionExchangingConnectPacket", conn->getDebugID()).suppressFor(1.0).detail("PeerAddr", self->destination); - self->prependConnectPacket(); + if (FlowTransport::transport().isClient()) { + IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(false)); + } + if (self->unsent.empty()) { + _conn->close(); + clientReconnectDelay = false; + continue; + } else { + conn = _conn; + TraceEvent("ConnectionExchangingConnectPacket", conn->getDebugID()) + .suppressFor(1.0) + .detail("PeerAddr", self->destination); + self->prependConnectPacket(); + } } else { TraceEvent("ConnectionTimedOut", conn ? conn->getDebugID() : UID()).suppressFor(1.0).detail("PeerAddr", self->destination); + if (FlowTransport::transport().isClient()) { + IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true)); + } throw connection_failed(); } @@ -497,7 +525,9 @@ struct Peer : NonCopyable { self->transport->countConnEstablished++; wait( connectionWriter( self, conn ) || reader || connectionMonitor(self) ); } catch (Error& e) { - if (e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled || ( g_network->isSimulated() && e.code() == error_code_checksum_failed )) + if (e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled || + e.code() == error_code_connection_unreferenced || + (g_network->isSimulated() && e.code() == error_code_checksum_failed)) self->transport->countConnClosedWithoutError++; else self->transport->countConnClosedWithError++; @@ -513,7 +543,9 @@ struct Peer : NonCopyable { } self->discardUnreliablePackets(); reader = Future(); - bool ok = e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled || ( g_network->isSimulated() && e.code() == error_code_checksum_failed ); + bool ok = e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled || + e.code() == error_code_connection_unreferenced || + (g_network->isSimulated() && e.code() == error_code_checksum_failed); if(self->compatible) { TraceEvent(ok ? SevInfo : SevWarnAlways, "ConnectionClosed", conn ? conn->getDebugID() : UID()).error(e, true).suppressFor(1.0).detail("PeerAddr", self->destination); @@ -534,6 +566,9 @@ struct Peer : NonCopyable { } if (conn) { + if (FlowTransport::transport().isClient()) { + clientReconnectDelay = true; + } conn->close(); conn = Reference(); } @@ -986,6 +1021,10 @@ Endpoint FlowTransport::loadedEndpoint( const UID& token ) { } void FlowTransport::addPeerReference( const Endpoint& endpoint, NetworkMessageReceiver* receiver ) { + if (FlowTransport::transport().isClient()) { + IFailureMonitor::failureMonitor().setStatus(endpoint.getPrimaryAddress(), FailureStatus(false)); + } + if (!receiver->isStream() || !endpoint.getPrimaryAddress().isValid()) return; Peer* peer = self->getPeer(endpoint.getPrimaryAddress()); if(peer->peerReferences == -1) { @@ -1182,9 +1221,9 @@ bool FlowTransport::incompatibleOutgoingConnectionsPresent() { return self->numIncompatibleConnections > 0; } -void FlowTransport::createInstance( uint64_t transportId ) -{ +void FlowTransport::createInstance(bool isClient, uint64_t transportId) { g_network->setGlobal(INetwork::enFailureMonitor, (flowGlobalType) new SimpleFailureMonitor()); + g_network->setGlobal(INetwork::enClientFailureMonitor, isClient ? (flowGlobalType)1 : nullptr); g_network->setGlobal(INetwork::enFlowTransport, (flowGlobalType) new FlowTransport(transportId)); g_network->setGlobal(INetwork::enNetworkAddressFunc, (flowGlobalType) &FlowTransport::getGlobalLocalAddress); g_network->setGlobal(INetwork::enNetworkAddressesFunc, (flowGlobalType) &FlowTransport::getGlobalLocalAddresses); diff --git a/fdbrpc/FlowTransport.h b/fdbrpc/FlowTransport.h index 5ff872821e..98145de583 100644 --- a/fdbrpc/FlowTransport.h +++ b/fdbrpc/FlowTransport.h @@ -109,10 +109,12 @@ public: FlowTransport(uint64_t transportId); ~FlowTransport(); - static void createInstance(uint64_t transportId = 0); + static void createInstance(bool isClient, uint64_t transportId = 0); // Creates a new FlowTransport and makes FlowTransport::transport() return it. This uses g_network->global() variables, // so it will be private to a simulation. + static bool isClient() { return g_network->global(INetwork::enClientFailureMonitor) != nullptr; } + void initMetrics(); // Metrics must be initialized after FlowTransport::createInstance has been called diff --git a/fdbserver/ClusterController.actor.cpp b/fdbserver/ClusterController.actor.cpp index 5d4243f581..9fc12d502e 100644 --- a/fdbserver/ClusterController.actor.cpp +++ b/fdbserver/ClusterController.actor.cpp @@ -1499,7 +1499,10 @@ ACTOR Future rebootAndCheck( ClusterControllerData* cluster, Optional workerAvailabilityWatch( WorkerInterface worker, ProcessClass startingClass, ClusterControllerData* cluster ) { - state Future failed = worker.address() == g_network->getLocalAddress() ? Never() : waitFailureClient( worker.waitFailure, SERVER_KNOBS->WORKER_FAILURE_TIME ); + state Future failed = + (worker.address() == g_network->getLocalAddress() || startingClass.classType() == ProcessClass::TesterClass) + ? Never() + : waitFailureClient(worker.waitFailure, SERVER_KNOBS->WORKER_FAILURE_TIME); cluster->updateWorkerList.set( worker.locality.processId(), ProcessData(worker.locality, startingClass, worker.address()) ); // This switching avoids a race where the worker can be added to id_worker map after the workerAvailabilityWatch fails for the worker. wait(delay(0)); diff --git a/fdbserver/SimulatedCluster.actor.cpp b/fdbserver/SimulatedCluster.actor.cpp index 25650a8dc7..fdcdb5061d 100644 --- a/fdbserver/SimulatedCluster.actor.cpp +++ b/fdbserver/SimulatedCluster.actor.cpp @@ -240,7 +240,7 @@ ACTOR Future simulatedFDBDRebooter(Referenceregister_network(); @@ -1402,7 +1402,7 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource), "", ""), TaskDefaultYield)); Sim2FileSystem::newFileSystem(); - FlowTransport::createInstance(1); + FlowTransport::createInstance(true, 1); if (tlsOptions->enabled()) { simInitTLS(tlsOptions); } diff --git a/fdbserver/fdbserver.actor.cpp b/fdbserver/fdbserver.actor.cpp index a13e2c815d..e0a35c7c7e 100644 --- a/fdbserver/fdbserver.actor.cpp +++ b/fdbserver/fdbserver.actor.cpp @@ -1523,7 +1523,7 @@ int main(int argc, char* argv[]) { openTraceFile(NetworkAddress(), rollsize, maxLogsSize, logFolder, "trace", logGroup); } else { g_network = newNet2(useThreadPool, true, useObjectSerializer); - FlowTransport::createInstance(1); + FlowTransport::createInstance(false, 1); const bool expectsPublicAddress = (role == FDBD || role == NetworkTestServer || role == Restore); if (publicAddressStrs.empty()) { diff --git a/flow/error_definitions.h b/flow/error_definitions.h index 37ef476fb2..25f000935b 100755 --- a/flow/error_definitions.h +++ b/flow/error_definitions.h @@ -68,6 +68,7 @@ ERROR( serialization_failed, 1044, "Failed to deserialize an object" ) ERROR( transaction_not_permitted, 1045, "Operation not permitted") ERROR( cluster_not_fully_recovered, 1046, "Cluster not fully recovered") ERROR( txn_exec_log_anti_quorum, 1047, "Execute Transaction not supported when log anti quorum is configured") +ERROR( connection_unreferenced, 1048, "No peer references for connection" ) ERROR( broken_promise, 1100, "Broken promise" ) ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" ) diff --git a/flow/network.h b/flow/network.h index 4b33c9c8c9..2c7094b8f2 100644 --- a/flow/network.h +++ b/flow/network.h @@ -351,9 +351,19 @@ public: // to the network should be through FlowTransport, not directly through these low level interfaces! enum enumGlobal { - enFailureMonitor = 0, enFlowTransport = 1, enTDMetrics = 2, enNetworkConnections = 3, - enNetworkAddressFunc = 4, enFileSystem = 5, enASIOService = 6, enEventFD = 7, enRunCycleFunc = 8, enASIOTimedOut = 9, enBlobCredentialFiles = 10, - enNetworkAddressesFunc = 11 + enFailureMonitor = 0, + enFlowTransport = 1, + enTDMetrics = 2, + enNetworkConnections = 3, + enNetworkAddressFunc = 4, + enFileSystem = 5, + enASIOService = 6, + enEventFD = 7, + enRunCycleFunc = 8, + enASIOTimedOut = 9, + enBlobCredentialFiles = 10, + enNetworkAddressesFunc = 11, + enClientFailureMonitor = 12 }; virtual void longTaskCheck( const char* name ) {}