Merge pull request #1640 from vishesh/task/client-failmon

Clients will no longer get failure monitoring info from cluster controller
This commit is contained in:
Evan Tschannen 2019-06-13 17:31:17 -07:00 committed by GitHub
commit 6ececa94ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 89 additions and 23 deletions

View File

@ -167,6 +167,11 @@ ACTOR Future<Void> failureMonitorClientLoop(
}
ACTOR Future<Void> failureMonitorClient( Reference<AsyncVar<Optional<struct ClusterInterface>>> ci, bool trackMyStatus ) {
TraceEvent("FailureMonitorStart").detail("IsClient", FlowTransport::transport().isClient());
if (FlowTransport::transport().isClient()) {
wait(Never());
}
state SimpleFailureMonitor* monitor = static_cast<SimpleFailureMonitor*>( &IFailureMonitor::failureMonitor() );
state Reference<FailureMonitorClientState> fmState = Reference<FailureMonitorClientState>(new FailureMonitorClientState());
auto localAddr = g_network->getLocalAddresses();

View File

@ -105,6 +105,9 @@ public:
// Called by FlowTransport when a connection closes and a prior request or reply might be lost
virtual void notifyDisconnect( NetworkAddress const& ) = 0;
// Called to update the failure status of network address directly when running client.
virtual void setStatus(NetworkAddress const& address, FailureStatus const& status) = 0;
// Returns when the known status of endpoint is next equal to status. Returns immediately
// if appropriate.
Future<Void> onStateEqual( Endpoint const& endpoint, FailureStatus status );
@ -114,16 +117,19 @@ public:
return onStateEqual( endpoint, FailureStatus() );
}
static IFailureMonitor& failureMonitor() { return *static_cast<IFailureMonitor*>((void*) g_network->global(INetwork::enFailureMonitor)); }
// Returns the failure monitor that the calling machine should use
// Returns when the status of the given endpoint has continuously been "failed" for sustainedFailureDuration + (elapsedTime*sustainedFailureSlope)
Future<Void> onFailedFor( Endpoint const& endpoint, double sustainedFailureDuration, double sustainedFailureSlope = 0.0 );
// Returns the failure monitor that the calling machine should use
static IFailureMonitor& failureMonitor() {
return *static_cast<IFailureMonitor*>((void*)g_network->global(INetwork::enFailureMonitor));
}
};
// SimpleFailureMonitor is the sole implementation of IFailureMonitor. It has no
// failure detection logic; it just implements the interface and reacts to setStatus() etc.
// Initially all addresses are considered failed, but all endpoints of a non-failed address are considered OK.
class SimpleFailureMonitor : public IFailureMonitor {
public:
SimpleFailureMonitor() : endpointKnownFailed() { }

View File

@ -394,7 +394,7 @@ struct Peer : NonCopyable {
loop {
if(peer->peerReferences == 0 && peer->reliable.empty() && peer->unsent.empty()) {
throw connection_failed();
throw connection_unreferenced();
}
wait( delayJittered( FLOW_KNOBS->CONNECTION_MONITOR_LOOP_TIME ) );
@ -465,26 +465,54 @@ struct Peer : NonCopyable {
TraceEvent(SevDebug, "ConnectionKeeper", conn ? conn->getDebugID() : UID())
.detail("PeerAddr", self->destination)
.detail("ConnSet", (bool)conn);
// This is used only at client side and is used to override waiting for unsent data to update failure monitoring
// status. At client, if an existing connection fails, we retry making a connection and if that fails, then only
// we report that address as failed.
state bool clientReconnectDelay = false;
loop {
try {
if (!conn) { // Always, except for the first loop with an incoming connection
self->outgoingConnectionIdle = true;
// Wait until there is something to send
while ( self->unsent.empty() )
wait( self->dataToSend.onTrigger() );
// Wait until there is something to send.
while (self->unsent.empty()) {
if (FlowTransport::transport().isClient() && self->destination.isPublic() &&
clientReconnectDelay) {
break;
}
wait(self->dataToSend.onTrigger());
}
ASSERT( self->destination.isPublic() );
self->outgoingConnectionIdle = false;
wait( delayJittered( std::max(0.0, self->lastConnectTime+self->reconnectionDelay - now()) ) ); // Don't connect() to the same peer more than once per 2 sec
wait(delayJittered(
std::max(0.0, self->lastConnectTime + self->reconnectionDelay -
now()))); // Don't connect() to the same peer more than once per 2 sec
self->lastConnectTime = now();
TraceEvent("ConnectingTo", conn ? conn->getDebugID() : UID()).suppressFor(1.0).detail("PeerAddr", self->destination);
Reference<IConnection> _conn = wait( timeout( INetworkConnections::net()->connect(self->destination), FLOW_KNOBS->CONNECTION_MONITOR_TIMEOUT, Reference<IConnection>() ) );
if (_conn) {
conn = _conn;
TraceEvent("ConnectionExchangingConnectPacket", conn->getDebugID()).suppressFor(1.0).detail("PeerAddr", self->destination);
self->prependConnectPacket();
if (FlowTransport::transport().isClient()) {
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(false));
}
if (self->unsent.empty()) {
_conn->close();
clientReconnectDelay = false;
continue;
} else {
conn = _conn;
TraceEvent("ConnectionExchangingConnectPacket", conn->getDebugID())
.suppressFor(1.0)
.detail("PeerAddr", self->destination);
self->prependConnectPacket();
}
} else {
TraceEvent("ConnectionTimedOut", conn ? conn->getDebugID() : UID()).suppressFor(1.0).detail("PeerAddr", self->destination);
if (FlowTransport::transport().isClient()) {
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true));
}
throw connection_failed();
}
@ -497,7 +525,9 @@ struct Peer : NonCopyable {
self->transport->countConnEstablished++;
wait( connectionWriter( self, conn ) || reader || connectionMonitor(self) );
} catch (Error& e) {
if (e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled || ( g_network->isSimulated() && e.code() == error_code_checksum_failed ))
if (e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled ||
e.code() == error_code_connection_unreferenced ||
(g_network->isSimulated() && e.code() == error_code_checksum_failed))
self->transport->countConnClosedWithoutError++;
else
self->transport->countConnClosedWithError++;
@ -513,7 +543,9 @@ struct Peer : NonCopyable {
}
self->discardUnreliablePackets();
reader = Future<Void>();
bool ok = e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled || ( g_network->isSimulated() && e.code() == error_code_checksum_failed );
bool ok = e.code() == error_code_connection_failed || e.code() == error_code_actor_cancelled ||
e.code() == error_code_connection_unreferenced ||
(g_network->isSimulated() && e.code() == error_code_checksum_failed);
if(self->compatible) {
TraceEvent(ok ? SevInfo : SevWarnAlways, "ConnectionClosed", conn ? conn->getDebugID() : UID()).error(e, true).suppressFor(1.0).detail("PeerAddr", self->destination);
@ -534,6 +566,9 @@ struct Peer : NonCopyable {
}
if (conn) {
if (FlowTransport::transport().isClient()) {
clientReconnectDelay = true;
}
conn->close();
conn = Reference<IConnection>();
}
@ -986,6 +1021,10 @@ Endpoint FlowTransport::loadedEndpoint( const UID& token ) {
}
void FlowTransport::addPeerReference( const Endpoint& endpoint, NetworkMessageReceiver* receiver ) {
if (FlowTransport::transport().isClient()) {
IFailureMonitor::failureMonitor().setStatus(endpoint.getPrimaryAddress(), FailureStatus(false));
}
if (!receiver->isStream() || !endpoint.getPrimaryAddress().isValid()) return;
Peer* peer = self->getPeer(endpoint.getPrimaryAddress());
if(peer->peerReferences == -1) {
@ -1182,9 +1221,9 @@ bool FlowTransport::incompatibleOutgoingConnectionsPresent() {
return self->numIncompatibleConnections > 0;
}
void FlowTransport::createInstance( uint64_t transportId )
{
void FlowTransport::createInstance(bool isClient, uint64_t transportId) {
g_network->setGlobal(INetwork::enFailureMonitor, (flowGlobalType) new SimpleFailureMonitor());
g_network->setGlobal(INetwork::enClientFailureMonitor, isClient ? (flowGlobalType)1 : nullptr);
g_network->setGlobal(INetwork::enFlowTransport, (flowGlobalType) new FlowTransport(transportId));
g_network->setGlobal(INetwork::enNetworkAddressFunc, (flowGlobalType) &FlowTransport::getGlobalLocalAddress);
g_network->setGlobal(INetwork::enNetworkAddressesFunc, (flowGlobalType) &FlowTransport::getGlobalLocalAddresses);

View File

@ -109,10 +109,12 @@ public:
FlowTransport(uint64_t transportId);
~FlowTransport();
static void createInstance(uint64_t transportId = 0);
static void createInstance(bool isClient, uint64_t transportId = 0);
// Creates a new FlowTransport and makes FlowTransport::transport() return it. This uses g_network->global() variables,
// so it will be private to a simulation.
static bool isClient() { return g_network->global(INetwork::enClientFailureMonitor) != nullptr; }
void initMetrics();
// Metrics must be initialized after FlowTransport::createInstance has been called

View File

@ -1499,7 +1499,10 @@ ACTOR Future<Void> rebootAndCheck( ClusterControllerData* cluster, Optional<Stan
}
ACTOR Future<Void> workerAvailabilityWatch( WorkerInterface worker, ProcessClass startingClass, ClusterControllerData* cluster ) {
state Future<Void> failed = worker.address() == g_network->getLocalAddress() ? Never() : waitFailureClient( worker.waitFailure, SERVER_KNOBS->WORKER_FAILURE_TIME );
state Future<Void> failed =
(worker.address() == g_network->getLocalAddress() || startingClass.classType() == ProcessClass::TesterClass)
? Never()
: waitFailureClient(worker.waitFailure, SERVER_KNOBS->WORKER_FAILURE_TIME);
cluster->updateWorkerList.set( worker.locality.processId(), ProcessData(worker.locality, startingClass, worker.address()) );
// This switching avoids a race where the worker can be added to id_worker map after the workerAvailabilityWatch fails for the worker.
wait(delay(0));

View File

@ -240,7 +240,7 @@ ACTOR Future<ISimulator::KillType> simulatedFDBDRebooter(Reference<ClusterConnec
try {
//SOMEDAY: test lower memory limits, without making them too small and causing the database to stop making progress
FlowTransport::createInstance(1);
FlowTransport::createInstance(processClass == ProcessClass::TesterClass, 1);
Sim2FileSystem::newFileSystem();
if (sslEnabled) {
tlsOptions->register_network();
@ -1402,7 +1402,7 @@ ACTOR void setupAndRun(std::string dataFolder, const char *testFile, bool reboot
ProcessClass(ProcessClass::TesterClass, ProcessClass::CommandLineSource), "", ""),
TaskDefaultYield));
Sim2FileSystem::newFileSystem();
FlowTransport::createInstance(1);
FlowTransport::createInstance(true, 1);
if (tlsOptions->enabled()) {
simInitTLS(tlsOptions);
}

View File

@ -1523,7 +1523,7 @@ int main(int argc, char* argv[]) {
openTraceFile(NetworkAddress(), rollsize, maxLogsSize, logFolder, "trace", logGroup);
} else {
g_network = newNet2(useThreadPool, true, useObjectSerializer);
FlowTransport::createInstance(1);
FlowTransport::createInstance(false, 1);
const bool expectsPublicAddress = (role == FDBD || role == NetworkTestServer || role == Restore);
if (publicAddressStrs.empty()) {

View File

@ -68,6 +68,7 @@ ERROR( serialization_failed, 1044, "Failed to deserialize an object" )
ERROR( transaction_not_permitted, 1045, "Operation not permitted")
ERROR( cluster_not_fully_recovered, 1046, "Cluster not fully recovered")
ERROR( txn_exec_log_anti_quorum, 1047, "Execute Transaction not supported when log anti quorum is configured")
ERROR( connection_unreferenced, 1048, "No peer references for connection" )
ERROR( broken_promise, 1100, "Broken promise" )
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )

View File

@ -351,9 +351,19 @@ public:
// to the network should be through FlowTransport, not directly through these low level interfaces!
enum enumGlobal {
enFailureMonitor = 0, enFlowTransport = 1, enTDMetrics = 2, enNetworkConnections = 3,
enNetworkAddressFunc = 4, enFileSystem = 5, enASIOService = 6, enEventFD = 7, enRunCycleFunc = 8, enASIOTimedOut = 9, enBlobCredentialFiles = 10,
enNetworkAddressesFunc = 11
enFailureMonitor = 0,
enFlowTransport = 1,
enTDMetrics = 2,
enNetworkConnections = 3,
enNetworkAddressFunc = 4,
enFileSystem = 5,
enASIOService = 6,
enEventFD = 7,
enRunCycleFunc = 8,
enASIOTimedOut = 9,
enBlobCredentialFiles = 10,
enNetworkAddressesFunc = 11,
enClientFailureMonitor = 12
};
virtual void longTaskCheck( const char* name ) {}