Cleanup old Failure Monitoring code
This commit is contained in:
parent
85c24dc074
commit
6e6cfaff16
|
@ -18,8 +18,6 @@ set(FDBCLIENT_SRCS
|
|||
DatabaseConfiguration.h
|
||||
DatabaseContext.h
|
||||
EventTypes.actor.h
|
||||
FailureMonitorClient.actor.cpp
|
||||
FailureMonitorClient.h
|
||||
FDBOptions.h
|
||||
FDBTypes.h
|
||||
FileBackupAgent.actor.cpp
|
||||
|
|
|
@ -1,181 +0,0 @@
|
|||
/*
|
||||
* FailureMonitorClient.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "fdbclient/FailureMonitorClient.h"
|
||||
#include "fdbrpc/FailureMonitor.h"
|
||||
#include "fdbclient/ClusterInterface.h"
|
||||
#include "flow/actorcompiler.h" // has to be last include
|
||||
#include <unordered_set>
|
||||
|
||||
struct FailureMonitorClientState : ReferenceCounted<FailureMonitorClientState> {
|
||||
std::unordered_set<NetworkAddress> knownAddrs;
|
||||
double serverFailedTimeout;
|
||||
|
||||
FailureMonitorClientState() {
|
||||
serverFailedTimeout = CLIENT_KNOBS->FAILURE_TIMEOUT_DELAY;
|
||||
}
|
||||
};
|
||||
|
||||
ACTOR Future<Void> failureMonitorClientLoop(
|
||||
SimpleFailureMonitor* monitor,
|
||||
ClusterInterface controller,
|
||||
Reference<FailureMonitorClientState> fmState,
|
||||
bool trackMyStatus)
|
||||
{
|
||||
state Version version = 0;
|
||||
state Future<FailureMonitoringReply> request = Never();
|
||||
state Future<Void> nextRequest = delay(0, TaskPriority::FailureMonitor);
|
||||
state Future<Void> requestTimeout = Never();
|
||||
state double before = now();
|
||||
state double waitfor = 0;
|
||||
|
||||
state NetworkAddressList controlAddr = controller.failureMonitoring.getEndpoint().addresses;
|
||||
monitor->setStatus(controlAddr.address, FailureStatus(false));
|
||||
fmState->knownAddrs.insert(controlAddr.address);
|
||||
if(controlAddr.secondaryAddress.present()) {
|
||||
monitor->setStatus(controlAddr.secondaryAddress.get(), FailureStatus(false));
|
||||
fmState->knownAddrs.insert(controlAddr.secondaryAddress.get());
|
||||
}
|
||||
|
||||
//The cluster controller's addresses (controller.failureMonitoring.getEndpoint().addresses) are treated specially because we can declare that it is down independently
|
||||
//of the response from the cluster controller. It still needs to be in knownAddrs in case the cluster controller changes, so the next cluster controller resets its state
|
||||
|
||||
try {
|
||||
loop {
|
||||
choose {
|
||||
when( FailureMonitoringReply reply = wait( request ) ) {
|
||||
g_network->setCurrentTask(TaskPriority::DefaultDelay);
|
||||
request = Never();
|
||||
requestTimeout = Never();
|
||||
if (reply.allOthersFailed) {
|
||||
// Reset all systems *not* mentioned in the reply to the default (failed) state
|
||||
fmState->knownAddrs.erase( controller.failureMonitoring.getEndpoint().addresses.address );
|
||||
if(controller.failureMonitoring.getEndpoint().addresses.secondaryAddress.present()) {
|
||||
fmState->knownAddrs.erase( controller.failureMonitoring.getEndpoint().addresses.secondaryAddress.get() );
|
||||
}
|
||||
|
||||
std::set<NetworkAddress> changedAddresses;
|
||||
for(int c=0; c<reply.changes.size(); c++) {
|
||||
changedAddresses.insert( reply.changes[c].addresses.address );
|
||||
if(reply.changes[c].addresses.secondaryAddress.present()) {
|
||||
changedAddresses.insert( reply.changes[c].addresses.secondaryAddress.get() );
|
||||
}
|
||||
}
|
||||
for(auto& it : fmState->knownAddrs)
|
||||
if (!changedAddresses.count( it ))
|
||||
monitor->setStatus( it, FailureStatus() );
|
||||
fmState->knownAddrs.clear();
|
||||
} else {
|
||||
ASSERT( version != 0 );
|
||||
}
|
||||
|
||||
if( monitor->getState( controller.failureMonitoring.getEndpoint() ).isFailed() )
|
||||
TraceEvent("FailureMonitoringServerUp").detail("OldServer",controller.id());
|
||||
|
||||
monitor->setStatus(controlAddr.address, FailureStatus(false));
|
||||
fmState->knownAddrs.insert(controlAddr.address);
|
||||
if(controlAddr.secondaryAddress.present()) {
|
||||
monitor->setStatus(controlAddr.secondaryAddress.get(), FailureStatus(false));
|
||||
fmState->knownAddrs.insert(controlAddr.secondaryAddress.get());
|
||||
}
|
||||
|
||||
//if (version != reply.failureInformationVersion)
|
||||
// printf("Client '%s': update from %lld to %lld (%d changes, aof=%d)\n", g_network->getLocalAddress().toString().c_str(), version, reply.failureInformationVersion, reply.changes.size(), reply.allOthersFailed);
|
||||
|
||||
version = reply.failureInformationVersion;
|
||||
fmState->serverFailedTimeout = reply.considerServerFailedTimeoutMS * .001;
|
||||
for(int c=0; c<reply.changes.size(); c++) {
|
||||
//printf("Client '%s': status of '%s' is now '%s'\n", g_network->getLocalAddress().toString().c_str(), reply.changes[c].address.toString().c_str(), reply.changes[c].status.failed ? "Failed" : "OK");
|
||||
auto& addrList = reply.changes[c].addresses;
|
||||
monitor->setStatus( addrList.address, reply.changes[c].status );
|
||||
if(addrList.secondaryAddress.present()) {
|
||||
monitor->setStatus( addrList.secondaryAddress.get(), reply.changes[c].status );
|
||||
}
|
||||
if (reply.changes[c].status != FailureStatus()) {
|
||||
fmState->knownAddrs.insert( addrList.address );
|
||||
if(addrList.secondaryAddress.present()) {
|
||||
fmState->knownAddrs.insert( addrList.secondaryAddress.get() );
|
||||
}
|
||||
} else {
|
||||
fmState->knownAddrs.erase( addrList.address );
|
||||
if(addrList.secondaryAddress.present()) {
|
||||
fmState->knownAddrs.erase( addrList.secondaryAddress.get() );
|
||||
}
|
||||
}
|
||||
}
|
||||
before = now();
|
||||
waitfor = reply.clientRequestIntervalMS * .001;
|
||||
nextRequest = delayJittered( waitfor, TaskPriority::FailureMonitor );
|
||||
}
|
||||
when( wait( requestTimeout ) ) {
|
||||
g_network->setCurrentTask(TaskPriority::DefaultDelay);
|
||||
requestTimeout = Never();
|
||||
TraceEvent(SevWarn, "FailureMonitoringServerDown").detail("OldServerID",controller.id());
|
||||
monitor->setStatus(controlAddr.address, FailureStatus(true));
|
||||
fmState->knownAddrs.erase(controlAddr.address);
|
||||
if(controlAddr.secondaryAddress.present()) {
|
||||
monitor->setStatus(controlAddr.secondaryAddress.get(), FailureStatus(true));
|
||||
fmState->knownAddrs.erase(controlAddr.secondaryAddress.get());
|
||||
}
|
||||
}
|
||||
when( wait( nextRequest ) ) {
|
||||
g_network->setCurrentTask(TaskPriority::DefaultDelay);
|
||||
nextRequest = Never();
|
||||
|
||||
double elapsed = now() - before;
|
||||
double slowThreshold = .200 + waitfor + FLOW_KNOBS->MAX_BUGGIFIED_DELAY;
|
||||
double warnAlwaysThreshold = CLIENT_KNOBS->FAILURE_MIN_DELAY/2;
|
||||
|
||||
if (elapsed > slowThreshold && deterministicRandom()->random01() < elapsed / warnAlwaysThreshold) {
|
||||
TraceEvent(elapsed > warnAlwaysThreshold ? SevWarnAlways : SevWarn, "FailureMonitorClientSlow").detail("Elapsed", elapsed).detail("Expected", waitfor);
|
||||
}
|
||||
|
||||
FailureMonitoringRequest req;
|
||||
req.failureInformationVersion = version;
|
||||
req.addresses = g_network->getLocalAddresses();
|
||||
if (trackMyStatus)
|
||||
req.senderStatus = FailureStatus(false);
|
||||
request = controller.failureMonitoring.getReply( req, TaskPriority::FailureMonitor );
|
||||
if(!controller.failureMonitoring.getEndpoint().isLocal())
|
||||
requestTimeout = delay( fmState->serverFailedTimeout, TaskPriority::FailureMonitor );
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Error& e) {
|
||||
if (e.code() == error_code_broken_promise) // broken promise from clustercontroller means it has died (and hopefully will be replaced)
|
||||
return Void();
|
||||
TraceEvent(SevError, "FailureMonitorClientError").error(e);
|
||||
throw; // goes nowhere
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> failureMonitorClient( Reference<AsyncVar<Optional<struct ClusterInterface>>> ci, bool trackMyStatus ) {
|
||||
TraceEvent("FailureMonitorStart").detail("IsClient", FlowTransport::transport().isClient());
|
||||
|
||||
state SimpleFailureMonitor* monitor = static_cast<SimpleFailureMonitor*>(&IFailureMonitor::failureMonitor());
|
||||
state Reference<FailureMonitorClientState> fmState = Reference<FailureMonitorClientState>(new FailureMonitorClientState());
|
||||
auto localAddr = g_network->getLocalAddresses();
|
||||
monitor->setStatus(localAddr.address, FailureStatus(false));
|
||||
if(localAddr.secondaryAddress.present()) {
|
||||
monitor->setStatus(localAddr.secondaryAddress.get(), FailureStatus(false));
|
||||
}
|
||||
|
||||
return Never();
|
||||
}
|
|
@ -1,31 +0,0 @@
|
|||
/*
|
||||
* FailureMonitorClient.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef FDBCLIENT_FAILUREMONITORCLIENT_H
|
||||
#define FDBCLIENT_FAILUREMONITORCLIENT_H
|
||||
#pragma once
|
||||
|
||||
#include "flow/flow.h"
|
||||
|
||||
// Communicates with the given cluster controller to reassure it about this machine's status
|
||||
// and to obtain status information about other machines, which is sent to g_network->failureMonitor()
|
||||
Future<Void> failureMonitorClient( Reference<AsyncVar<Optional<struct ClusterInterface>>> const&, bool const& trackMyStatus );
|
||||
|
||||
#endif
|
|
@ -26,7 +26,6 @@
|
|||
#include "fdbclient/ClusterInterface.h"
|
||||
#include "fdbclient/CoordinationInterface.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "fdbclient/FailureMonitorClient.h"
|
||||
#include "fdbclient/KeyRangeMap.h"
|
||||
#include "fdbclient/Knobs.h"
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
|
|
|
@ -21,7 +21,6 @@
|
|||
#include "flow/flow.h"
|
||||
#include "fdbclient/CoordinationInterface.h"
|
||||
#include "fdbclient/MonitorLeader.h"
|
||||
#include "fdbclient/FailureMonitorClient.h"
|
||||
#include "fdbclient/ClusterInterface.h"
|
||||
#include "fdbclient/StatusClient.h"
|
||||
#include "fdbclient/Status.h"
|
||||
|
|
|
@ -48,7 +48,6 @@
|
|||
<ClInclude Include="HTTP.h" />
|
||||
<ClInclude Include="KeyBackedTypes.h" />
|
||||
<ClInclude Include="MetricLogger.h" />
|
||||
<ClInclude Include="FailureMonitorClient.h" />
|
||||
<ClInclude Include="IClientApi.h" />
|
||||
<ClInclude Include="JsonBuilder.h" />
|
||||
<ClInclude Include="JSONDoc.h" />
|
||||
|
@ -111,7 +110,6 @@
|
|||
<ActorCompiler Include="BlobStore.actor.cpp" />
|
||||
<ActorCompiler Include="DatabaseBackupAgent.actor.cpp" />
|
||||
<ClCompile Include="DatabaseConfiguration.cpp" />
|
||||
<ActorCompiler Include="FailureMonitorClient.actor.cpp" />
|
||||
<ClCompile Include="FDBOptions.g.cpp" />
|
||||
<ActorCompiler Include="FileBackupAgent.actor.cpp" />
|
||||
<ActorCompiler Include="HTTP.actor.cpp" />
|
||||
|
|
|
@ -1732,6 +1732,7 @@ ACTOR Future<Void> failureDetectionServer( UID uniqueID, ClusterControllerData*
|
|||
|
||||
loop choose {
|
||||
when ( FailureMonitoringRequest req = waitNext( requests ) ) {
|
||||
// TODO: Handling this request should no longer be necessary.
|
||||
++self->failureMonitoringRequests;
|
||||
if ( req.senderStatus.present() ) {
|
||||
// Update the status of requester, if necessary
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
|
||||
#include <fstream>
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "fdbclient/FailureMonitorClient.h"
|
||||
#include "fdbclient/DatabaseContext.h"
|
||||
#include "fdbserver/TesterInterface.actor.h"
|
||||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
|
|
|
@ -31,7 +31,6 @@
|
|||
#include "flow/SystemMonitor.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "fdbclient/FailureMonitorClient.h"
|
||||
#include "fdbserver/CoordinationInterface.h"
|
||||
#include "fdbserver/WorkerInterface.actor.h"
|
||||
#include "fdbclient/RestoreWorkerInterface.actor.h"
|
||||
|
|
|
@ -33,7 +33,6 @@
|
|||
#include "fdbserver/Status.h"
|
||||
#include "fdbserver/QuietDatabase.h"
|
||||
#include "fdbclient/MonitorLeader.h"
|
||||
#include "fdbclient/FailureMonitorClient.h"
|
||||
#include "fdbserver/CoordinationInterface.h"
|
||||
#include "fdbclient/ManagementAPI.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
@ -1154,7 +1153,6 @@ ACTOR Future<Void> runTests( Reference<ClusterConnectionFile> connFile, test_typ
|
|||
vector<Future<Void>> actors;
|
||||
actors.push_back( reportErrors(monitorLeader( connFile, cc ), "MonitorLeader") );
|
||||
actors.push_back( reportErrors(extractClusterInterface( cc,ci ),"ExtractClusterInterface") );
|
||||
actors.push_back( reportErrors(failureMonitorClient( ci, false ),"FailureMonitorClient") );
|
||||
|
||||
if(whatToRun == TEST_TYPE_CONSISTENCY_CHECK) {
|
||||
TestSpec spec;
|
||||
|
|
|
@ -37,7 +37,6 @@
|
|||
#include "fdbserver/ServerDBInfo.h"
|
||||
#include "fdbserver/FDBExecHelper.actor.h"
|
||||
#include "fdbserver/CoordinationInterface.h"
|
||||
#include "fdbclient/FailureMonitorClient.h"
|
||||
#include "fdbclient/MonitorLeader.h"
|
||||
#include "fdbclient/ClientWorkerInterface.h"
|
||||
#include "flow/Profiler.h"
|
||||
|
@ -1507,7 +1506,6 @@ ACTOR Future<Void> fdbd(
|
|||
actors.push_back(reportErrors(monitorAndWriteCCPriorityInfo(fitnessFilePath, asyncPriorityInfo), "MonitorAndWriteCCPriorityInfo"));
|
||||
actors.push_back( reportErrors( processClass == ProcessClass::TesterClass ? monitorLeader( connFile, cc ) : clusterController( connFile, cc , asyncPriorityInfo, recoveredDiskFiles.getFuture(), localities ), "ClusterController") );
|
||||
actors.push_back( reportErrors(extractClusterInterface( cc, ci ), "ExtractClusterInterface") );
|
||||
actors.push_back( reportErrors(failureMonitorClient( ci, true ), "FailureMonitorClient") );
|
||||
actors.push_back( reportErrorsExcept(workerServer(connFile, cc, localities, asyncPriorityInfo, processClass, dataFolder, memoryLimit, metricsConnFile, metricsPrefix, recoveredDiskFiles, memoryProfileThreshold, coordFolder, whitelistBinPaths), "WorkerServer", UID(), &normalWorkerErrors()) );
|
||||
state Future<Void> firstConnect = reportErrors( printOnFirstConnected(ci), "ClusterFirstConnectedError" );
|
||||
|
||||
|
|
Loading…
Reference in New Issue